You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/24 08:38:22 UTC
git commit: TEZ-490. Rename SimpleInput / SimpleOutput to be MR
specific (part of TEZ-398). (sseth)
Updated Branches:
refs/heads/TEZ-398 c5a8a3c6e -> 5d86b9350
TEZ-490. Rename SimpleInput / SimpleOutput to be MR specific (part of
TEZ-398). (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5d86b935
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5d86b935
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5d86b935
Branch: refs/heads/TEZ-398
Commit: 5d86b9350555819b26110c200c8f3cdda6893020
Parents: c5a8a3c
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 23 23:37:53 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 23 23:37:53 2013 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/YarnTezDagChild.java | 20 +-
.../broadcast/input/BroadcastKVReader.java | 2 +-
.../apache/hadoop/mapred/LocalJobRunnerTez.java | 8 +-
.../org/apache/tez/mapreduce/input/MRInput.java | 438 +++++++++++++++++++
.../tez/mapreduce/input/MRInputLegacy.java | 36 ++
.../apache/tez/mapreduce/input/SimpleInput.java | 438 -------------------
.../tez/mapreduce/input/SimpleInputLegacy.java | 36 --
.../apache/tez/mapreduce/output/MROutput.java | 326 ++++++++++++++
.../tez/mapreduce/output/SimpleOutput.java | 326 --------------
.../apache/tez/mapreduce/processor/MRTask.java | 10 +-
.../tez/mapreduce/processor/MRTaskReporter.java | 2 +-
.../mapreduce/processor/map/MapProcessor.java | 50 +--
.../processor/reduce/ReduceProcessor.java | 6 +-
.../processor/map/TestMapProcessor.java | 6 +-
.../processor/reduce/TestReduceProcessor.java | 8 +-
15 files changed, 856 insertions(+), 856 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index f32fa6b..1967462 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -85,8 +85,8 @@ import org.apache.tez.engine.common.objectregistry.ObjectRegistryModule;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.common.security.TokenCache;
import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -497,20 +497,20 @@ public class YarnTezDagChild {
// FIXME need Input/Output vertices else we have this hack
if (taskSpec.getInputs().isEmpty()) {
- InputDescriptor simpleInputDesc =
- new InputDescriptor(SimpleInputLegacy.class.getName());
- simpleInputDesc.setUserPayload(
+ InputDescriptor mrInputDesc =
+ new InputDescriptor(MRInputLegacy.class.getName());
+ mrInputDesc.setUserPayload(
taskSpec.getProcessorDescriptor().getUserPayload());
taskSpec.getInputs().add(
- new InputSpec("null", simpleInputDesc, 0));
+ new InputSpec("null", mrInputDesc, 0));
}
if (taskSpec.getOutputs().isEmpty()) {
- OutputDescriptor simpleOutputDesc =
- new OutputDescriptor(SimpleOutput.class.getName());
- simpleOutputDesc.setUserPayload(
+ OutputDescriptor mrOutputDesc =
+ new OutputDescriptor(MROutput.class.getName());
+ mrOutputDesc.setUserPayload(
taskSpec.getProcessorDescriptor().getUserPayload());
taskSpec.getOutputs().add(
- new OutputSpec("null", simpleOutputDesc, 0));
+ new OutputSpec("null", mrOutputDesc, 0));
}
String [] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
index 0b86a8e..2c53e75 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
@@ -187,7 +187,7 @@ public class BroadcastKVReader<K, V> implements KVReader {
- // TODO NEWTEZ Move this into a common class. Also used in SImpleInput
+ // TODO NEWTEZ Move this into a common class. Also used in MRInput
private class SimpleValueIterator implements Iterator<V> {
private V value;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index f59e836..f2b0a38 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -83,8 +83,8 @@
//import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
//import org.apache.tez.mapreduce.hadoop.IDConverter;
//import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
-//import org.apache.tez.mapreduce.input.SimpleInput;
-//import org.apache.tez.mapreduce.output.SimpleOutput;
+//import org.apache.tez.mapreduce.input.MRInput;
+//import org.apache.tez.mapreduce.output.MROutput;
//import org.apache.tez.mapreduce.processor.map.MapProcessor;
//import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
//
@@ -252,7 +252,7 @@
// tezMapId, user, localConf.getJobName(), "TODO_vertexName",
// mapProcessorDesc,
// Collections.singletonList(new InputSpec("srcVertex", 0,
-// SimpleInput.class.getName())),
+// MRInput.class.getName())),
// Collections.singletonList(new OutputSpec("tgtVertex", 0,
// LocalOnFileSorterOutput.class.getName())));
//
@@ -458,7 +458,7 @@
// Collections.singletonList(new InputSpec("TODO_srcVertexName",
// mapIds.size(), LocalMergedInput.class.getName())),
// Collections.singletonList(new OutputSpec("TODO_targetVertex",
-// 0, SimpleOutput.class.getName())));
+// 0, MROutput.class.getName())));
//
// // move map output to reduce input
// for (int i = 0; i < mapIds.size(); i++) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
new file mode 100644
index 0000000..6066d93
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.input;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Event;
+import org.apache.tez.engine.api.KVReader;
+import org.apache.tez.engine.api.LogicalInput;
+import org.apache.tez.engine.api.TezInputContext;
+import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
+import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * {@link MRInput} is an {@link Input} which provides key/values pairs
+ * for the consumer.
+ *
+ * It is compatible with all standard Apache Hadoop MapReduce
+ * {@link InputFormat} implementations.
+ */
+
+public class MRInput implements LogicalInput {
+
+ private static final Log LOG = LogFactory.getLog(MRInput.class);
+
+
+ private TezInputContext inputContext;
+
+ private JobConf jobConf;
+ private Configuration incrementalConf;
+ private boolean recordReaderCreated = false;
+
+ boolean useNewApi;
+
+ org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
+
+ @SuppressWarnings("rawtypes")
+ private org.apache.hadoop.mapreduce.InputFormat newInputFormat;
+ @SuppressWarnings("rawtypes")
+ private org.apache.hadoop.mapreduce.RecordReader newRecordReader;
+ protected org.apache.hadoop.mapreduce.InputSplit newInputSplit;
+
+ @SuppressWarnings("rawtypes")
+ private InputFormat oldInputFormat;
+ @SuppressWarnings("rawtypes")
+ protected RecordReader oldRecordReader;
+
+ protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
+
+ private TezCounter inputRecordCounter;
+ private TezCounter fileInputByteCounter;
+ private List<Statistics> fsStats;
+
+ @Override
+ public List<Event> initialize(TezInputContext inputContext) throws IOException {
+ this.inputContext = inputContext;
+ Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+ this.jobConf = new JobConf(conf);
+
+ // Read split information.
+ TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
+ TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()];
+ this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
+ thisTaskMetaInfo.getStartOffset());
+
+ // TODO NEWTEZ Rename this to be specific to MRInput. This Input, in
+ // theory, can be used by the MapProcessor, ReduceProcessor or a custom
+ // processor. (The processor could provide the counter though)
+ this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
+ this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
+
+ useNewApi = this.jobConf.getUseNewMapper();
+
+ if (useNewApi) {
+ TaskAttemptContext taskAttemptContext = createTaskAttemptContext();
+ Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
+ try {
+ inputFormatClazz = taskAttemptContext.getInputFormatClass();
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Unable to instantiate InputFormat class", e);
+ }
+
+ newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf);
+
+ newInputSplit = getNewSplitDetails(splitMetaInfo);
+
+ List<Statistics> matchedStats = null;
+ if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+ matchedStats = Utils.getFsStatistics(
+ ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
+ newInputSplit).getPath(), this.jobConf);
+ }
+ fsStats = matchedStats;
+
+ try {
+ newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
+ newRecordReader.initialize(newInputSplit, taskAttemptContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while creating record reader", e);
+ }
+ } else { // OLD API
+ oldInputFormat = this.jobConf.getInputFormat();
+ InputSplit oldInputSplit =
+ getOldSplitDetails(splitMetaInfo);
+
+
+ List<Statistics> matchedStats = null;
+ if (oldInputSplit instanceof FileSplit) {
+ matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
+ }
+ fsStats = matchedStats;
+
+ long bytesInPrev = getInputBytes();
+ oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
+ this.jobConf, new MRReporter(inputContext, oldInputSplit));
+ long bytesInCurr = getInputBytes();
+ fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+ setIncrementalConfigParams(oldInputSplit);
+ }
+ return null;
+ }
+
+ @Override
+ public KVReader getReader() throws IOException {
+ Preconditions
+ .checkState(recordReaderCreated == false,
+ "Only a single instance of record reader can be created for this input.");
+ recordReaderCreated = true;
+ return new MRInputKVReader();
+ }
+
+
+ @Override
+ public void handleEvents(List<Event> inputEvents) {
+ // Not expecting any events at the moment.
+ }
+
+
+ @Override
+ public void setNumPhysicalInputs(int numInputs) {
+ // Not required at the moment. May be required if splits are sent via events.
+ }
+
+ @Override
+ public List<Event> close() throws IOException {
+ long bytesInPrev = getInputBytes();
+ if (useNewApi) {
+ newRecordReader.close();
+ } else {
+ oldRecordReader.close();
+ }
+ long bytesInCurr = getInputBytes();
+ fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+
+ return null;
+ }
+
+ /**
+ * {@link MRInput} sets some additional parameters like split location when using
+ * the new API. This methods returns the list of additional updates, and
+ * should be used by Processors using the old MapReduce API with {@link MRInput}.
+ *
+ * @return the additional fields set by {@link MRInput}
+ */
+ public Configuration getConfigUpdates() {
+ return new Configuration(incrementalConf);
+ }
+
+ public float getProgress() throws IOException, InterruptedException {
+ if (useNewApi) {
+ return newRecordReader.getProgress();
+ } else {
+ return oldRecordReader.getProgress();
+ }
+ }
+
+
+ private TaskAttemptContext createTaskAttemptContext() {
+ return new TaskAttemptContextImpl(this.jobConf, inputContext, true);
+ }
+
+
+ private static class SimpleValueIterator implements Iterator<Object> {
+
+ private Object value;
+
+ public void setValue(Object value) {
+ this.value = value;
+ }
+
+ public boolean hasNext() {
+ return value != null;
+ }
+
+ public Object next() {
+ Object value = this.value;
+ this.value = null;
+ return value;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class SimpleIterable implements Iterable<Object> {
+ private final Iterator<Object> iterator;
+ public SimpleIterable(Iterator<Object> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public Iterator<Object> iterator() {
+ return iterator;
+ }
+ }
+
+
+
+
+ @SuppressWarnings("unchecked")
+ private InputSplit getOldSplitDetails(TaskSplitIndex splitMetaInfo)
+ throws IOException {
+ Path file = new Path(splitMetaInfo.getSplitLocation());
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ file = fs.makeQualified(file);
+ LOG.info("Reading input split file from : " + file);
+ long offset = splitMetaInfo.getStartOffset();
+
+ FSDataInputStream inFile = fs.open(file);
+ inFile.seek(offset);
+ String className = Text.readString(inFile);
+ Class<org.apache.hadoop.mapred.InputSplit> cls;
+ try {
+ cls =
+ (Class<org.apache.hadoop.mapred.InputSplit>)
+ jobConf.getClassByName(className);
+ } catch (ClassNotFoundException ce) {
+ IOException wrap = new IOException("Split class " + className +
+ " not found");
+ wrap.initCause(ce);
+ throw wrap;
+ }
+ SerializationFactory factory = new SerializationFactory(jobConf);
+ Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer =
+ (Deserializer<org.apache.hadoop.mapred.InputSplit>)
+ factory.getDeserializer(cls);
+ deserializer.open(inFile);
+ org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
+ long pos = inFile.getPos();
+ inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+ .increment(pos - offset);
+ inFile.close();
+ return split;
+ }
+
+ @SuppressWarnings("unchecked")
+ private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetails(
+ TaskSplitIndex splitMetaInfo) throws IOException {
+ Path file = new Path(splitMetaInfo.getSplitLocation());
+ long offset = splitMetaInfo.getStartOffset();
+
+ // Split information read from local filesystem.
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ file = fs.makeQualified(file);
+ LOG.info("Reading input split file from : " + file);
+ FSDataInputStream inFile = fs.open(file);
+ inFile.seek(offset);
+ String className = Text.readString(inFile);
+ Class<org.apache.hadoop.mapreduce.InputSplit> cls;
+ try {
+ cls =
+ (Class<org.apache.hadoop.mapreduce.InputSplit>)
+ jobConf.getClassByName(className);
+ } catch (ClassNotFoundException ce) {
+ IOException wrap = new IOException("Split class " + className +
+ " not found");
+ wrap.initCause(ce);
+ throw wrap;
+ }
+ SerializationFactory factory = new SerializationFactory(jobConf);
+ Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer =
+ (Deserializer<org.apache.hadoop.mapreduce.InputSplit>)
+ factory.getDeserializer(cls);
+ deserializer.open(inFile);
+ org.apache.hadoop.mapreduce.InputSplit split =
+ deserializer.deserialize(null);
+ long pos = inFile.getPos();
+ inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+ .increment(pos - offset);
+ inFile.close();
+ return split;
+ }
+
+ private void setIncrementalConfigParams(InputSplit inputSplit) {
+ if (inputSplit instanceof FileSplit) {
+ FileSplit fileSplit = (FileSplit) inputSplit;
+ this.incrementalConf = new Configuration(false);
+
+ this.incrementalConf.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath()
+ .toString());
+ this.incrementalConf.setLong(JobContext.MAP_INPUT_START,
+ fileSplit.getStart());
+ this.incrementalConf.setLong(JobContext.MAP_INPUT_PATH,
+ fileSplit.getLength());
+ }
+ LOG.info("Processing split: " + inputSplit);
+ }
+
+ private long getInputBytes() {
+ if (fsStats == null) return 0;
+ long bytesRead = 0;
+ for (Statistics stat: fsStats) {
+ bytesRead = bytesRead + stat.getBytesRead();
+ }
+ return bytesRead;
+ }
+
+ protected TaskSplitMetaInfo[] readSplits(Configuration conf)
+ throws IOException {
+ TaskSplitMetaInfo[] allTaskSplitMetaInfo;
+ allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(conf,
+ FileSystem.getLocal(conf));
+ return allTaskSplitMetaInfo;
+ }
+
+ private class MRInputKVReader implements KVReader {
+
+ Object key;
+ Object value;
+
+ private SimpleValueIterator valueIterator = new SimpleValueIterator();
+ private SimpleIterable valueIterable = new SimpleIterable(valueIterator);
+
+ private final boolean localNewApi;
+
+ MRInputKVReader() {
+ localNewApi = useNewApi;
+ if (!localNewApi) {
+ key = oldRecordReader.createKey();
+ value =oldRecordReader.createValue();
+ }
+ }
+
+ // Setup the values iterator once, and set value on the same object each time
+ // to prevent lots of objects being created.
+
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean next() throws IOException {
+ boolean hasNext = false;
+ long bytesInPrev = getInputBytes();
+ if (localNewApi) {
+ try {
+ hasNext = newRecordReader.nextKeyValue();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while checking for next key-value", e);
+ }
+ } else {
+ hasNext = oldRecordReader.next(key, value);
+ }
+ long bytesInCurr = getInputBytes();
+ fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+
+ if (hasNext) {
+ inputRecordCounter.increment(1);
+ }
+
+ return hasNext;
+ }
+
+ @Override
+ public KVRecord getCurrentKV() throws IOException {
+ KVRecord kvRecord = null;
+ if (localNewApi) {
+ try {
+ valueIterator.setValue(newRecordReader.getCurrentValue());
+ kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while fetching next key-value", e);
+ }
+
+ } else {
+ valueIterator.setValue(value);
+ kvRecord = new KVRecord(key, valueIterable);
+ }
+ return kvRecord;
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
new file mode 100644
index 0000000..5923746
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.input;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.mapred.RecordReader;
+
+public class MRInputLegacy extends MRInput {
+
+ @Private
+ public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
+ return this.newInputSplit;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Private
+ public RecordReader getOldRecordReader() {
+ return this.oldRecordReader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
deleted file mode 100644
index 598f801..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.mapreduce.input;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVReader;
-import org.apache.tez.engine.api.LogicalInput;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.mapreduce.common.Utils;
-import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
-import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
-
-import com.google.common.base.Preconditions;
-
-/**
- * {@link SimpleInput} is an {@link Input} which provides key/values pairs
- * for the consumer.
- *
- * It is compatible with all standard Apache Hadoop MapReduce
- * {@link InputFormat} implementations.
- */
-
-public class SimpleInput implements LogicalInput {
-
- private static final Log LOG = LogFactory.getLog(SimpleInput.class);
-
-
- private TezInputContext inputContext;
-
- private JobConf jobConf;
- private Configuration incrementalConf;
- private boolean recordReaderCreated = false;
-
- boolean useNewApi;
-
- org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
-
- @SuppressWarnings("rawtypes")
- private org.apache.hadoop.mapreduce.InputFormat newInputFormat;
- @SuppressWarnings("rawtypes")
- private org.apache.hadoop.mapreduce.RecordReader newRecordReader;
- protected org.apache.hadoop.mapreduce.InputSplit newInputSplit;
-
- @SuppressWarnings("rawtypes")
- private InputFormat oldInputFormat;
- @SuppressWarnings("rawtypes")
- protected RecordReader oldRecordReader;
-
- protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
-
- private TezCounter inputRecordCounter;
- private TezCounter fileInputByteCounter;
- private List<Statistics> fsStats;
-
- @Override
- public List<Event> initialize(TezInputContext inputContext) throws IOException {
- this.inputContext = inputContext;
- Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
- this.jobConf = new JobConf(conf);
-
- // Read split information.
- TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
- TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()];
- this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
- thisTaskMetaInfo.getStartOffset());
-
- // TODO NEWTEZ Rename this to be specific to SimpleInput. This Input, in
- // theory, can be used by the MapProcessor, ReduceProcessor or a custom
- // processor. (The processor could provide the counter though)
- this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
- this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
-
- useNewApi = this.jobConf.getUseNewMapper();
-
- if (useNewApi) {
- TaskAttemptContext taskAttemptContext = createTaskAttemptContext();
- Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
- try {
- inputFormatClazz = taskAttemptContext.getInputFormatClass();
- } catch (ClassNotFoundException e) {
- throw new IOException("Unable to instantiate InputFormat class", e);
- }
-
- newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf);
-
- newInputSplit = getNewSplitDetails(splitMetaInfo);
-
- List<Statistics> matchedStats = null;
- if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
- matchedStats = Utils.getFsStatistics(
- ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
- newInputSplit).getPath(), this.jobConf);
- }
- fsStats = matchedStats;
-
- try {
- newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
- newRecordReader.initialize(newInputSplit, taskAttemptContext);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while creating record reader", e);
- }
- } else { // OLD API
- oldInputFormat = this.jobConf.getInputFormat();
- InputSplit oldInputSplit =
- getOldSplitDetails(splitMetaInfo);
-
-
- List<Statistics> matchedStats = null;
- if (oldInputSplit instanceof FileSplit) {
- matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
- }
- fsStats = matchedStats;
-
- long bytesInPrev = getInputBytes();
- oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
- this.jobConf, new MRReporter(inputContext, oldInputSplit));
- long bytesInCurr = getInputBytes();
- fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
- setIncrementalConfigParams(oldInputSplit);
- }
- return null;
- }
-
- @Override
- public KVReader getReader() throws IOException {
- Preconditions
- .checkState(recordReaderCreated == false,
- "Only a single instance of record reader can be created for this input.");
- recordReaderCreated = true;
- return new MRInputKVReader();
- }
-
-
- @Override
- public void handleEvents(List<Event> inputEvents) {
- // Not expecting any events at the moment.
- }
-
-
- @Override
- public void setNumPhysicalInputs(int numInputs) {
- // Not required at the moment. May be required if splits are sent via events.
- }
-
- @Override
- public List<Event> close() throws IOException {
- long bytesInPrev = getInputBytes();
- if (useNewApi) {
- newRecordReader.close();
- } else {
- oldRecordReader.close();
- }
- long bytesInCurr = getInputBytes();
- fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-
- return null;
- }
-
- /**
- * SimpleInputs sets some additional parameters like split location when using
- * the new API. This methods returns the list of additional updates, and
- * should be used by Processors using the old MapReduce API with SimpleInput.
- *
- * @return the additional fields set by SimpleInput
- */
- public Configuration getConfigUpdates() {
- return new Configuration(incrementalConf);
- }
-
- public float getProgress() throws IOException, InterruptedException {
- if (useNewApi) {
- return newRecordReader.getProgress();
- } else {
- return oldRecordReader.getProgress();
- }
- }
-
-
- private TaskAttemptContext createTaskAttemptContext() {
- return new TaskAttemptContextImpl(this.jobConf, inputContext, true);
- }
-
-
- private static class SimpleValueIterator implements Iterator<Object> {
-
- private Object value;
-
- public void setValue(Object value) {
- this.value = value;
- }
-
- public boolean hasNext() {
- return value != null;
- }
-
- public Object next() {
- Object value = this.value;
- this.value = null;
- return value;
- }
-
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- private static class SimpleIterable implements Iterable<Object> {
- private final Iterator<Object> iterator;
- public SimpleIterable(Iterator<Object> iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public Iterator<Object> iterator() {
- return iterator;
- }
- }
-
-
-
-
- @SuppressWarnings("unchecked")
- private InputSplit getOldSplitDetails(TaskSplitIndex splitMetaInfo)
- throws IOException {
- Path file = new Path(splitMetaInfo.getSplitLocation());
- FileSystem fs = FileSystem.getLocal(jobConf);
- file = fs.makeQualified(file);
- LOG.info("Reading input split file from : " + file);
- long offset = splitMetaInfo.getStartOffset();
-
- FSDataInputStream inFile = fs.open(file);
- inFile.seek(offset);
- String className = Text.readString(inFile);
- Class<org.apache.hadoop.mapred.InputSplit> cls;
- try {
- cls =
- (Class<org.apache.hadoop.mapred.InputSplit>)
- jobConf.getClassByName(className);
- } catch (ClassNotFoundException ce) {
- IOException wrap = new IOException("Split class " + className +
- " not found");
- wrap.initCause(ce);
- throw wrap;
- }
- SerializationFactory factory = new SerializationFactory(jobConf);
- Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer =
- (Deserializer<org.apache.hadoop.mapred.InputSplit>)
- factory.getDeserializer(cls);
- deserializer.open(inFile);
- org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
- long pos = inFile.getPos();
- inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
- .increment(pos - offset);
- inFile.close();
- return split;
- }
-
- @SuppressWarnings("unchecked")
- private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetails(
- TaskSplitIndex splitMetaInfo) throws IOException {
- Path file = new Path(splitMetaInfo.getSplitLocation());
- long offset = splitMetaInfo.getStartOffset();
-
- // Split information read from local filesystem.
- FileSystem fs = FileSystem.getLocal(jobConf);
- file = fs.makeQualified(file);
- LOG.info("Reading input split file from : " + file);
- FSDataInputStream inFile = fs.open(file);
- inFile.seek(offset);
- String className = Text.readString(inFile);
- Class<org.apache.hadoop.mapreduce.InputSplit> cls;
- try {
- cls =
- (Class<org.apache.hadoop.mapreduce.InputSplit>)
- jobConf.getClassByName(className);
- } catch (ClassNotFoundException ce) {
- IOException wrap = new IOException("Split class " + className +
- " not found");
- wrap.initCause(ce);
- throw wrap;
- }
- SerializationFactory factory = new SerializationFactory(jobConf);
- Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer =
- (Deserializer<org.apache.hadoop.mapreduce.InputSplit>)
- factory.getDeserializer(cls);
- deserializer.open(inFile);
- org.apache.hadoop.mapreduce.InputSplit split =
- deserializer.deserialize(null);
- long pos = inFile.getPos();
- inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
- .increment(pos - offset);
- inFile.close();
- return split;
- }
-
- private void setIncrementalConfigParams(InputSplit inputSplit) {
- if (inputSplit instanceof FileSplit) {
- FileSplit fileSplit = (FileSplit) inputSplit;
- this.incrementalConf = new Configuration(false);
-
- this.incrementalConf.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath()
- .toString());
- this.incrementalConf.setLong(JobContext.MAP_INPUT_START,
- fileSplit.getStart());
- this.incrementalConf.setLong(JobContext.MAP_INPUT_PATH,
- fileSplit.getLength());
- }
- LOG.info("Processing split: " + inputSplit);
- }
-
- private long getInputBytes() {
- if (fsStats == null) return 0;
- long bytesRead = 0;
- for (Statistics stat: fsStats) {
- bytesRead = bytesRead + stat.getBytesRead();
- }
- return bytesRead;
- }
-
- protected TaskSplitMetaInfo[] readSplits(Configuration conf)
- throws IOException {
- TaskSplitMetaInfo[] allTaskSplitMetaInfo;
- allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(conf,
- FileSystem.getLocal(conf));
- return allTaskSplitMetaInfo;
- }
-
- private class MRInputKVReader implements KVReader {
-
- Object key;
- Object value;
-
- private SimpleValueIterator valueIterator = new SimpleValueIterator();
- private SimpleIterable valueIterable = new SimpleIterable(valueIterator);
-
- private final boolean localNewApi;
-
- MRInputKVReader() {
- localNewApi = useNewApi;
- if (!localNewApi) {
- key = oldRecordReader.createKey();
- value =oldRecordReader.createValue();
- }
- }
-
- // Setup the values iterator once, and set value on the same object each time
- // to prevent lots of objects being created.
-
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean next() throws IOException {
- boolean hasNext = false;
- long bytesInPrev = getInputBytes();
- if (localNewApi) {
- try {
- hasNext = newRecordReader.nextKeyValue();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while checking for next key-value", e);
- }
- } else {
- hasNext = oldRecordReader.next(key, value);
- }
- long bytesInCurr = getInputBytes();
- fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-
- if (hasNext) {
- inputRecordCounter.increment(1);
- }
-
- return hasNext;
- }
-
- @Override
- public KVRecord getCurrentKV() throws IOException {
- KVRecord kvRecord = null;
- if (localNewApi) {
- try {
- valueIterator.setValue(newRecordReader.getCurrentValue());
- kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while fetching next key-value", e);
- }
-
- } else {
- valueIterator.setValue(value);
- kvRecord = new KVRecord(key, valueIterable);
- }
- return kvRecord;
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java
deleted file mode 100644
index 4e61aa7..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.input;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.mapred.RecordReader;
-
-public class SimpleInputLegacy extends SimpleInput {
-
- @Private
- public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
- return this.newInputSplit;
- }
-
- @SuppressWarnings("rawtypes")
- @Private
- public RecordReader getOldRecordReader() {
- return this.oldRecordReader;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
new file mode 100644
index 0000000..e6bdbe6
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -0,0 +1,326 @@
+package org.apache.tez.mapreduce.output;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Event;
+import org.apache.tez.engine.api.KVWriter;
+import org.apache.tez.engine.api.LogicalOutput;
+import org.apache.tez.engine.api.TezOutputContext;
+import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
+import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+public class MROutput implements LogicalOutput {
+
+ private static final Log LOG = LogFactory.getLog(MROutput.class);
+
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ private TezOutputContext outputContext;
+ private JobConf jobConf;
+ boolean useNewApi;
+ private AtomicBoolean closed = new AtomicBoolean(false);
+
+ @SuppressWarnings("rawtypes")
+ org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
+ @SuppressWarnings("rawtypes")
+ org.apache.hadoop.mapreduce.RecordWriter newRecordWriter;
+
+ @SuppressWarnings("rawtypes")
+ org.apache.hadoop.mapred.OutputFormat oldOutputFormat;
+ @SuppressWarnings("rawtypes")
+ org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
+
+ private TezCounter outputRecordCounter;
+ private TezCounter fileOutputByteCounter;
+ private List<Statistics> fsStats;
+
+ private TaskAttemptContext newApiTaskAttemptContext;
+ private org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext;
+
+ private boolean isMapperOutput;
+
+ private OutputCommitter committer;
+
+ @Override
+ public List<Event> initialize(TezOutputContext outputContext)
+ throws IOException, InterruptedException {
+ LOG.info("Initializing Simple Output");
+ this.outputContext = outputContext;
+ Configuration conf = TezUtils.createConfFromUserPayload(
+ outputContext.getUserPayload());
+ this.jobConf = new JobConf(conf);
+ this.useNewApi = this.jobConf.getUseNewMapper();
+ this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
+ false);
+ jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+ outputContext.getDAGAttemptNumber());
+
+ outputRecordCounter = outputContext.getCounters().findCounter(
+ TaskCounter.MAP_OUTPUT_RECORDS);
+ fileOutputByteCounter = outputContext.getCounters().findCounter(
+ FileOutputFormatCounter.BYTES_WRITTEN);
+
+ if (useNewApi) {
+ newApiTaskAttemptContext = createTaskAttemptContext();
+ try {
+ newOutputFormat =
+ ReflectionUtils.newInstance(
+ newApiTaskAttemptContext.getOutputFormatClass(), jobConf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+
+ List<Statistics> matchedStats = null;
+ if (newOutputFormat instanceof
+ org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
+ matchedStats =
+ Utils.getFsStatistics(
+ org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+ .getOutputPath(newApiTaskAttemptContext),
+ jobConf);
+ }
+ fsStats = matchedStats;
+
+ long bytesOutPrev = getOutputBytes();
+ try {
+ newRecordWriter =
+ newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while creating record writer", e);
+ }
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ } else {
+ TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(
+ outputContext.getApplicationId().getClusterTimestamp()),
+ outputContext.getApplicationId().getId(),
+ (isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
+ outputContext.getTaskIndex()),
+ outputContext.getTaskAttemptNumber());
+ jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
+ jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
+ jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
+ jobConf.setInt(JobContext.TASK_PARTITION,
+ taskAttemptId.getTaskID().getId());
+ jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
+
+ oldApiTaskAttemptContext =
+ new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
+ jobConf, taskAttemptId,
+ new MRTaskReporter(outputContext));
+ oldOutputFormat = jobConf.getOutputFormat();
+
+ List<Statistics> matchedStats = null;
+ if (oldOutputFormat
+ instanceof org.apache.hadoop.mapred.FileOutputFormat) {
+ matchedStats =
+ Utils.getFsStatistics(
+ org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
+ jobConf),
+ jobConf);
+ }
+ fsStats = matchedStats;
+
+ FileSystem fs = FileSystem.get(jobConf);
+ String finalName = getOutputName();
+
+ long bytesOutPrev = getOutputBytes();
+ oldRecordWriter =
+ oldOutputFormat.getRecordWriter(
+ fs, jobConf, finalName, new MRReporter(outputContext));
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ }
+ initCommitter(jobConf, useNewApi);
+
+ LOG.info("Initialized Simple Output"
+ + ", using_new_api: " + useNewApi);
+ return null;
+ }
+
+ public void initCommitter(JobConf job, boolean useNewApi)
+ throws IOException, InterruptedException {
+
+ if (useNewApi) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("using new api for output committer");
+ }
+
+ OutputFormat<?, ?> outputFormat = null;
+ try {
+ outputFormat = ReflectionUtils.newInstance(
+ newApiTaskAttemptContext.getOutputFormatClass(), job);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Unknown OutputFormat", cnfe);
+ }
+ this.committer = outputFormat.getOutputCommitter(
+ newApiTaskAttemptContext);
+ } else {
+ this.committer = job.getOutputCommitter();
+ }
+
+ Path outputPath = FileOutputFormat.getOutputPath(job);
+ if (outputPath != null) {
+ if ((this.committer instanceof FileOutputCommitter)) {
+ FileOutputFormat.setWorkOutputPath(job,
+ ((FileOutputCommitter) this.committer).getTaskAttemptPath(
+ oldApiTaskAttemptContext));
+ } else {
+ FileOutputFormat.setWorkOutputPath(job, outputPath);
+ }
+ }
+ if (useNewApi) {
+ this.committer.setupTask(newApiTaskAttemptContext);
+ } else {
+ this.committer.setupTask(oldApiTaskAttemptContext);
+ }
+ }
+
+ public boolean isCommitRequired() throws IOException {
+ if (useNewApi) {
+ return committer.needsTaskCommit(newApiTaskAttemptContext);
+ } else {
+ return committer.needsTaskCommit(oldApiTaskAttemptContext);
+ }
+ }
+
+ private TaskAttemptContext createTaskAttemptContext() {
+ return new TaskAttemptContextImpl(this.jobConf, outputContext,
+ isMapperOutput);
+ }
+
+ private long getOutputBytes() {
+ if (fsStats == null) return 0;
+ long bytesWritten = 0;
+ for (Statistics stat: fsStats) {
+ bytesWritten = bytesWritten + stat.getBytesWritten();
+ }
+ return bytesWritten;
+ }
+
+ private String getOutputName() {
+ return "part-" + NUMBER_FORMAT.format(outputContext.getTaskIndex());
+ }
+
+ @Override
+ public KVWriter getWriter() throws IOException {
+ return new KVWriter() {
+ private final boolean useNewWriter = useNewApi;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ long bytesOutPrev = getOutputBytes();
+ if (useNewWriter) {
+ try {
+ newRecordWriter.write(key, value);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while writing next key-value",e);
+ }
+ } else {
+ oldRecordWriter.write(key, value);
+ }
+
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ outputRecordCounter.increment(1);
+ }
+ };
+ }
+
+ @Override
+ public void handleEvents(List<Event> outputEvents) {
+ // Not expecting any events at the moment.
+ }
+
+ @Override
+ public synchronized List<Event> close() throws IOException {
+ if (closed.getAndSet(true)) {
+ return null;
+ }
+
+ LOG.info("Closing Simple Output");
+ long bytesOutPrev = getOutputBytes();
+ if (useNewApi) {
+ try {
+ newRecordWriter.close(newApiTaskAttemptContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while closing record writer", e);
+ }
+ } else {
+ oldRecordWriter.close(null);
+ }
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ LOG.info("Closed Simple Output");
+ return null;
+ }
+
+ @Override
+ public void setNumPhysicalOutputs(int numOutputs) {
+ // Nothing to do for now
+ }
+
+ /**
+ * MROutput expects that a Processor call commit prior to the
+ * Processor's completion
+ * @throws IOException
+ */
+ public void commit() throws IOException {
+ close();
+ if (useNewApi) {
+ committer.commitTask(newApiTaskAttemptContext);
+ } else {
+ committer.commitTask(oldApiTaskAttemptContext);
+ }
+ }
+
+
+ /**
+ * MROutput expects that a Processor call abort in case of any error
+ * ( including an error during commit ) prior to the Processor's completion
+ * @throws IOException
+ */
+ public void abort() throws IOException {
+ close();
+ if (useNewApi) {
+ committer.abortTask(newApiTaskAttemptContext);
+ } else {
+ committer.abortTask(oldApiTaskAttemptContext);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
deleted file mode 100644
index d82c9e2..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
+++ /dev/null
@@ -1,326 +0,0 @@
-package org.apache.tez.mapreduce.output;
-
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVWriter;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.mapreduce.common.Utils;
-import org.apache.tez.mapreduce.hadoop.MRConfig;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
-import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
-import org.apache.tez.mapreduce.processor.MRTaskReporter;
-
-public class SimpleOutput implements LogicalOutput {
-
- private static final Log LOG = LogFactory.getLog(SimpleOutput.class);
-
- private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
- static {
- NUMBER_FORMAT.setMinimumIntegerDigits(5);
- NUMBER_FORMAT.setGroupingUsed(false);
- }
-
- private TezOutputContext outputContext;
- private JobConf jobConf;
- boolean useNewApi;
- private AtomicBoolean closed = new AtomicBoolean(false);
-
- @SuppressWarnings("rawtypes")
- org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
- @SuppressWarnings("rawtypes")
- org.apache.hadoop.mapreduce.RecordWriter newRecordWriter;
-
- @SuppressWarnings("rawtypes")
- org.apache.hadoop.mapred.OutputFormat oldOutputFormat;
- @SuppressWarnings("rawtypes")
- org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
-
- private TezCounter outputRecordCounter;
- private TezCounter fileOutputByteCounter;
- private List<Statistics> fsStats;
-
- private TaskAttemptContext newApiTaskAttemptContext;
- private org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext;
-
- private boolean isMapperOutput;
-
- private OutputCommitter committer;
-
- @Override
- public List<Event> initialize(TezOutputContext outputContext)
- throws IOException, InterruptedException {
- LOG.info("Initializing Simple Output");
- this.outputContext = outputContext;
- Configuration conf = TezUtils.createConfFromUserPayload(
- outputContext.getUserPayload());
- this.jobConf = new JobConf(conf);
- this.useNewApi = this.jobConf.getUseNewMapper();
- this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
- false);
- jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
- outputContext.getDAGAttemptNumber());
-
- outputRecordCounter = outputContext.getCounters().findCounter(
- TaskCounter.MAP_OUTPUT_RECORDS);
- fileOutputByteCounter = outputContext.getCounters().findCounter(
- FileOutputFormatCounter.BYTES_WRITTEN);
-
- if (useNewApi) {
- newApiTaskAttemptContext = createTaskAttemptContext();
- try {
- newOutputFormat =
- ReflectionUtils.newInstance(
- newApiTaskAttemptContext.getOutputFormatClass(), jobConf);
- } catch (ClassNotFoundException cnfe) {
- throw new IOException(cnfe);
- }
-
- List<Statistics> matchedStats = null;
- if (newOutputFormat instanceof
- org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
- matchedStats =
- Utils.getFsStatistics(
- org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
- .getOutputPath(newApiTaskAttemptContext),
- jobConf);
- }
- fsStats = matchedStats;
-
- long bytesOutPrev = getOutputBytes();
- try {
- newRecordWriter =
- newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while creating record writer", e);
- }
- long bytesOutCurr = getOutputBytes();
- fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
- } else {
- TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(
- outputContext.getApplicationId().getClusterTimestamp()),
- outputContext.getApplicationId().getId(),
- (isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
- outputContext.getTaskIndex()),
- outputContext.getTaskAttemptNumber());
- jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
- jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
- jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
- jobConf.setInt(JobContext.TASK_PARTITION,
- taskAttemptId.getTaskID().getId());
- jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
-
- oldApiTaskAttemptContext =
- new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
- jobConf, taskAttemptId,
- new MRTaskReporter(outputContext));
- oldOutputFormat = jobConf.getOutputFormat();
-
- List<Statistics> matchedStats = null;
- if (oldOutputFormat
- instanceof org.apache.hadoop.mapred.FileOutputFormat) {
- matchedStats =
- Utils.getFsStatistics(
- org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
- jobConf),
- jobConf);
- }
- fsStats = matchedStats;
-
- FileSystem fs = FileSystem.get(jobConf);
- String finalName = getOutputName();
-
- long bytesOutPrev = getOutputBytes();
- oldRecordWriter =
- oldOutputFormat.getRecordWriter(
- fs, jobConf, finalName, new MRReporter(outputContext));
- long bytesOutCurr = getOutputBytes();
- fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
- }
- initCommitter(jobConf, useNewApi);
-
- LOG.info("Initialized Simple Output"
- + ", using_new_api: " + useNewApi);
- return null;
- }
-
- public void initCommitter(JobConf job, boolean useNewApi)
- throws IOException, InterruptedException {
-
- if (useNewApi) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("using new api for output committer");
- }
-
- OutputFormat<?, ?> outputFormat = null;
- try {
- outputFormat = ReflectionUtils.newInstance(
- newApiTaskAttemptContext.getOutputFormatClass(), job);
- } catch (ClassNotFoundException cnfe) {
- throw new IOException("Unknown OutputFormat", cnfe);
- }
- this.committer = outputFormat.getOutputCommitter(
- newApiTaskAttemptContext);
- } else {
- this.committer = job.getOutputCommitter();
- }
-
- Path outputPath = FileOutputFormat.getOutputPath(job);
- if (outputPath != null) {
- if ((this.committer instanceof FileOutputCommitter)) {
- FileOutputFormat.setWorkOutputPath(job,
- ((FileOutputCommitter) this.committer).getTaskAttemptPath(
- oldApiTaskAttemptContext));
- } else {
- FileOutputFormat.setWorkOutputPath(job, outputPath);
- }
- }
- if (useNewApi) {
- this.committer.setupTask(newApiTaskAttemptContext);
- } else {
- this.committer.setupTask(oldApiTaskAttemptContext);
- }
- }
-
- public boolean isCommitRequired() throws IOException {
- if (useNewApi) {
- return committer.needsTaskCommit(newApiTaskAttemptContext);
- } else {
- return committer.needsTaskCommit(oldApiTaskAttemptContext);
- }
- }
-
- private TaskAttemptContext createTaskAttemptContext() {
- return new TaskAttemptContextImpl(this.jobConf, outputContext,
- isMapperOutput);
- }
-
- private long getOutputBytes() {
- if (fsStats == null) return 0;
- long bytesWritten = 0;
- for (Statistics stat: fsStats) {
- bytesWritten = bytesWritten + stat.getBytesWritten();
- }
- return bytesWritten;
- }
-
- private String getOutputName() {
- return "part-" + NUMBER_FORMAT.format(outputContext.getTaskIndex());
- }
-
- @Override
- public KVWriter getWriter() throws IOException {
- return new KVWriter() {
- private final boolean useNewWriter = useNewApi;
-
- @SuppressWarnings("unchecked")
- @Override
- public void write(Object key, Object value) throws IOException {
- long bytesOutPrev = getOutputBytes();
- if (useNewWriter) {
- try {
- newRecordWriter.write(key, value);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while writing next key-value",e);
- }
- } else {
- oldRecordWriter.write(key, value);
- }
-
- long bytesOutCurr = getOutputBytes();
- fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
- outputRecordCounter.increment(1);
- }
- };
- }
-
- @Override
- public void handleEvents(List<Event> outputEvents) {
- // Not expecting any events at the moment.
- }
-
- @Override
- public synchronized List<Event> close() throws IOException {
- if (closed.getAndSet(true)) {
- return null;
- }
-
- LOG.info("Closing Simple Output");
- long bytesOutPrev = getOutputBytes();
- if (useNewApi) {
- try {
- newRecordWriter.close(newApiTaskAttemptContext);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while closing record writer", e);
- }
- } else {
- oldRecordWriter.close(null);
- }
- long bytesOutCurr = getOutputBytes();
- fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
- LOG.info("Closed Simple Output");
- return null;
- }
-
- @Override
- public void setNumPhysicalOutputs(int numOutputs) {
- // Nothing to do for now
- }
-
- /**
- * SimpleOutput expects that a Processor call commit prior to the
- * Processor's completion
- * @throws IOException
- */
- public void commit() throws IOException {
- close();
- if (useNewApi) {
- committer.commitTask(newApiTaskAttemptContext);
- } else {
- committer.commitTask(oldApiTaskAttemptContext);
- }
- }
-
-
- /**
- * SimpleOutput expects that a Processor call abort in case of any error
- * ( including an error during commit ) prior to the Processor's completion
- * @throws IOException
- */
- public void abort() throws IOException {
- close();
- if (useNewApi) {
- committer.abortTask(newApiTaskAttemptContext);
- } else {
- committer.abortTask(oldApiTaskAttemptContext);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index f7404d4..fac1454 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -81,7 +81,7 @@ import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.output.MROutput;
@SuppressWarnings("deprecation")
public abstract class MRTask {
@@ -423,8 +423,8 @@ public abstract class MRTask {
+ " And is in the process of committing");
// TODO change this to use the new context
// TODO TEZ Interaciton between Commit and OutputReady. Merge ?
- if (output instanceof SimpleOutput) {
- SimpleOutput sOut = (SimpleOutput)output;
+ if (output instanceof MROutput) {
+ MROutput sOut = (MROutput)output;
if (sOut.isCommitRequired()) {
//wait for commit approval and commit
// TODO EVENTUALLY - Commit is not required for map tasks.
@@ -458,7 +458,7 @@ public abstract class MRTask {
statusUpdate();
}
- private void commit(SimpleOutput output) throws IOException {
+ private void commit(MROutput output) throws IOException {
int retries = 3;
while (true) {
// This will loop till the AM asks for the task to be killed. As
@@ -495,7 +495,7 @@ public abstract class MRTask {
}
private
- void discardOutput(SimpleOutput output) {
+ void discardOutput(MROutput output) {
try {
output.abort();
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
index 22312f7..85139ed 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
@@ -65,7 +65,7 @@ public class MRTaskReporter
if (isProcessorContext) {
((TezProcessorContext)context).setProgress(progress);
} else {
- // TODO FIXME NEWTEZ - will simpleoutput's reporter use this api?
+ // TODO FIXME NEWTEZ - will MROutput's reporter use this api?
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 2084146..e4b990a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -44,9 +44,9 @@ import org.apache.tez.engine.api.LogicalOutput;
import org.apache.tez.engine.api.TezProcessorContext;
import org.apache.tez.engine.lib.output.OnFileSortedOutput;
import org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl;
-import org.apache.tez.mapreduce.input.SimpleInput;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -99,15 +99,15 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
LogicalOutput out = outputs.values().iterator().next();
// Sanity check
- if (!(in instanceof SimpleInputLegacy)) {
+ if (!(in instanceof MRInputLegacy)) {
throw new IOException(new TezException(
"Only Simple Input supported. Input: " + in.getClass()));
}
- SimpleInputLegacy input = (SimpleInputLegacy)in;
+ MRInputLegacy input = (MRInputLegacy)in;
KVWriter kvWriter = null;
if (!(out instanceof OnFileSortedOutput)) {
- kvWriter = ((SimpleOutput)out).getWriter();
+ kvWriter = ((MROutput)out).getWriter();
} else {
kvWriter = ((OnFileSortedOutput)out).getWriter();
}
@@ -124,13 +124,13 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
void runOldMapper(
final JobConf job,
final MRTaskReporter reporter,
- final SimpleInputLegacy input,
+ final MRInputLegacy input,
final KVWriter output
) throws IOException, InterruptedException {
// Initialize input in-line since it sets parameters which may be used by the processor.
- // Done only for SimpleInput.
- // TODO use new method in SimpleInput to get required info
+ // Done only for MRInput.
+ // TODO use new method in MRInput to get required info
//input.initialize(job, master);
RecordReader in = new OldRecordReader(input);
@@ -147,13 +147,13 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
private void runNewMapper(final JobConf job,
MRTaskReporter reporter,
- final SimpleInputLegacy in,
+ final MRInputLegacy in,
KVWriter out
) throws IOException, InterruptedException {
// Initialize input in-line since it sets parameters which may be used by the processor.
- // Done only for SimpleInput.
- // TODO use new method in SimpleInput to get required info
+ // Done only for MRInput.
+ // TODO use new method in MRInput to get required info
//in.initialize(job, master);
// make a task context so we can get the classes
@@ -197,10 +197,10 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
private static class NewRecordReader extends
org.apache.hadoop.mapreduce.RecordReader {
- private final SimpleInput in;
+ private final MRInput in;
private KVReader reader;
- private NewRecordReader(SimpleInput in) throws IOException {
+ private NewRecordReader(MRInput in) throws IOException {
this.in = in;
this.reader = in.getReader();
}
@@ -241,38 +241,38 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
}
private static class OldRecordReader implements RecordReader {
- private final SimpleInputLegacy simpleInput;
+ private final MRInputLegacy mrInput;
- private OldRecordReader(SimpleInputLegacy simpleInput) {
- this.simpleInput = simpleInput;
+ private OldRecordReader(MRInputLegacy mrInput) {
+ this.mrInput = mrInput;
}
@Override
public boolean next(Object key, Object value) throws IOException {
// TODO broken
-// simpleInput.setKey(key);
-// simpleInput.setValue(value);
+// mrInput.setKey(key);
+// mrInput.setValue(value);
// try {
-// return simpleInput.hasNext();
+// return mrInput.hasNext();
// } catch (InterruptedException ie) {
// throw new IOException(ie);
// }
- return simpleInput.getOldRecordReader().next(key, value);
+ return mrInput.getOldRecordReader().next(key, value);
}
@Override
public Object createKey() {
- return simpleInput.getOldRecordReader().createKey();
+ return mrInput.getOldRecordReader().createKey();
}
@Override
public Object createValue() {
- return simpleInput.getOldRecordReader().createValue();
+ return mrInput.getOldRecordReader().createValue();
}
@Override
public long getPos() throws IOException {
- return simpleInput.getOldRecordReader().getPos();
+ return mrInput.getOldRecordReader().getPos();
}
@Override
@@ -282,7 +282,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
@Override
public float getProgress() throws IOException {
try {
- return simpleInput.getProgress();
+ return mrInput.getProgress();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 9274765..19acb39 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -48,7 +48,7 @@ import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
import org.apache.tez.engine.lib.output.OnFileSortedOutput;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -133,8 +133,8 @@ implements LogicalIOProcessor {
KVReader kvReader = shuffleInput.getReader();
KVWriter kvWriter = null;
- if((out instanceof SimpleOutput)) {
- kvWriter = ((SimpleOutput) out).getWriter();
+ if((out instanceof MROutput)) {
+ kvWriter = ((MROutput) out).getWriter();
} else if ((out instanceof OnFileSortedOutput)) {
kvWriter = ((OnFileSortedOutput) out).getWriter();
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 06e2f4b..89292ab 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -48,7 +48,7 @@ import org.apache.tez.mapreduce.TestUmbilical;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.processor.MapUtils;
import org.junit.After;
@@ -120,7 +120,7 @@ public class TestMapProcessor {
MapUtils.generateInputSplit(localFs, workDir, job, mapInput);
- InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 0);
+ InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()), 0);
OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, job, 0,
@@ -191,7 +191,7 @@ public class TestMapProcessor {
// localFs, workDir, job, 0, new Path(workDir, "map0"),
// new TestUmbilicalProtocol(true), vertexName,
// Collections.singletonList(new InputSpec("NullVertex", 0,
-// SimpleInput.class.getName())),
+// MRInput.class.getName())),
// Collections.singletonList(new OutputSpec("FakeVertex", 1,
// OldInMemorySortedOutput.class.getName()))
// );
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index a3abd76..274c353 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -54,8 +54,8 @@ import org.apache.tez.mapreduce.hadoop.IDConverter;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.processor.MapUtils;
import org.junit.After;
@@ -125,7 +125,7 @@ public class TestReduceProcessor {
Path mapInput = new Path(workDir, "map0");
MapUtils.generateInputSplit(localFs, workDir, mapConf, mapInput);
- InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 0);
+ InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()), 0);
OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
// Run a map
LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, mapConf, 0,
@@ -152,7 +152,7 @@ public class TestReduceProcessor {
ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(reduceConf));
InputSpec reduceInputSpec = new InputSpec(mapVertexName, new InputDescriptor(LocalMergedInput.class.getName()), 1);
- OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(SimpleOutput.class.getName()), 1);
+ OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(MROutput.class.getName()), 1);
// Now run a reduce
TaskSpec taskSpec = new TaskSpec(