You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/24 08:38:22 UTC

git commit: TEZ-490. Rename SimpleInput / SimpleOutput to be MR specific (part of TEZ-398). (sseth)

Updated Branches:
  refs/heads/TEZ-398 c5a8a3c6e -> 5d86b9350


TEZ-490. Rename SimpleInput / SimpleOutput to be MR specific (part of
TEZ-398). (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5d86b935
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5d86b935
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5d86b935

Branch: refs/heads/TEZ-398
Commit: 5d86b9350555819b26110c200c8f3cdda6893020
Parents: c5a8a3c
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 23 23:37:53 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 23 23:37:53 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   |  20 +-
 .../broadcast/input/BroadcastKVReader.java      |   2 +-
 .../apache/hadoop/mapred/LocalJobRunnerTez.java |   8 +-
 .../org/apache/tez/mapreduce/input/MRInput.java | 438 +++++++++++++++++++
 .../tez/mapreduce/input/MRInputLegacy.java      |  36 ++
 .../apache/tez/mapreduce/input/SimpleInput.java | 438 -------------------
 .../tez/mapreduce/input/SimpleInputLegacy.java  |  36 --
 .../apache/tez/mapreduce/output/MROutput.java   | 326 ++++++++++++++
 .../tez/mapreduce/output/SimpleOutput.java      | 326 --------------
 .../apache/tez/mapreduce/processor/MRTask.java  |  10 +-
 .../tez/mapreduce/processor/MRTaskReporter.java |   2 +-
 .../mapreduce/processor/map/MapProcessor.java   |  50 +--
 .../processor/reduce/ReduceProcessor.java       |   6 +-
 .../processor/map/TestMapProcessor.java         |   6 +-
 .../processor/reduce/TestReduceProcessor.java   |   8 +-
 15 files changed, 856 insertions(+), 856 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index f32fa6b..1967462 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -85,8 +85,8 @@ import org.apache.tez.engine.common.objectregistry.ObjectRegistryModule;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.TokenCache;
 import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -497,20 +497,20 @@ public class YarnTezDagChild {
 
     // FIXME need Input/Output vertices else we have this hack
     if (taskSpec.getInputs().isEmpty()) {
-      InputDescriptor simpleInputDesc =
-          new InputDescriptor(SimpleInputLegacy.class.getName());
-      simpleInputDesc.setUserPayload(
+      InputDescriptor mrInputDesc =
+          new InputDescriptor(MRInputLegacy.class.getName());
+      mrInputDesc.setUserPayload(
           taskSpec.getProcessorDescriptor().getUserPayload());
       taskSpec.getInputs().add(
-          new InputSpec("null", simpleInputDesc, 0));
+          new InputSpec("null", mrInputDesc, 0));
     }
     if (taskSpec.getOutputs().isEmpty()) {
-      OutputDescriptor simpleOutputDesc =
-          new OutputDescriptor(SimpleOutput.class.getName());
-      simpleOutputDesc.setUserPayload(
+      OutputDescriptor mrOutputDesc =
+          new OutputDescriptor(MROutput.class.getName());
+      mrOutputDesc.setUserPayload(
           taskSpec.getProcessorDescriptor().getUserPayload());
       taskSpec.getOutputs().add(
-          new OutputSpec("null", simpleOutputDesc, 0));
+          new OutputSpec("null", mrOutputDesc, 0));
     }
     String [] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
     conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
index 0b86a8e..2c53e75 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
@@ -187,7 +187,7 @@ public class BroadcastKVReader<K, V> implements KVReader {
 
   
   
-  // TODO NEWTEZ Move this into a common class. Also used in SImpleInput
+  // TODO NEWTEZ Move this into a common class. Also used in MRInput
   private class SimpleValueIterator implements Iterator<V> {
 
     private V value;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index f59e836..f2b0a38 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -83,8 +83,8 @@
 //import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
 //import org.apache.tez.mapreduce.hadoop.IDConverter;
 //import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
-//import org.apache.tez.mapreduce.input.SimpleInput;
-//import org.apache.tez.mapreduce.output.SimpleOutput;
+//import org.apache.tez.mapreduce.input.MRInput;
+//import org.apache.tez.mapreduce.output.MROutput;
 //import org.apache.tez.mapreduce.processor.map.MapProcessor;
 //import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 //
@@ -252,7 +252,7 @@
 //                  tezMapId, user, localConf.getJobName(), "TODO_vertexName",
 //                  mapProcessorDesc,
 //                  Collections.singletonList(new InputSpec("srcVertex", 0,
-//                      SimpleInput.class.getName())),
+//                      MRInput.class.getName())),
 //                  Collections.singletonList(new OutputSpec("tgtVertex", 0,
 //                      LocalOnFileSorterOutput.class.getName())));
 //
@@ -458,7 +458,7 @@
 //                Collections.singletonList(new InputSpec("TODO_srcVertexName",
 //                    mapIds.size(), LocalMergedInput.class.getName())),
 //                Collections.singletonList(new OutputSpec("TODO_targetVertex",
-//                    0, SimpleOutput.class.getName())));
+//                    0, MROutput.class.getName())));
 //
 //            // move map output to reduce input
 //            for (int i = 0; i < mapIds.size(); i++) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
new file mode 100644
index 0000000..6066d93
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.input;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Event;
+import org.apache.tez.engine.api.KVReader;
+import org.apache.tez.engine.api.LogicalInput;
+import org.apache.tez.engine.api.TezInputContext;
+import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
+import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * {@link MRInput} is an {@link Input} which provides key/values pairs
+ * for the consumer.
+ *
+ * It is compatible with all standard Apache Hadoop MapReduce 
+ * {@link InputFormat} implementations.
+ */
+
+public class MRInput implements LogicalInput {
+
+  private static final Log LOG = LogFactory.getLog(MRInput.class);
+  
+  
+  private TezInputContext inputContext;
+  
+  private JobConf jobConf;
+  private Configuration incrementalConf;
+  private boolean recordReaderCreated = false;
+  
+  boolean useNewApi;
+  
+  org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
+
+  @SuppressWarnings("rawtypes")
+  private org.apache.hadoop.mapreduce.InputFormat newInputFormat;
+  @SuppressWarnings("rawtypes")
+  private org.apache.hadoop.mapreduce.RecordReader newRecordReader;
+  protected org.apache.hadoop.mapreduce.InputSplit newInputSplit;
+  
+  @SuppressWarnings("rawtypes")
+  private InputFormat oldInputFormat;
+  @SuppressWarnings("rawtypes")
+  protected RecordReader oldRecordReader;
+
+  protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
+  
+  private TezCounter inputRecordCounter;
+  private TezCounter fileInputByteCounter; 
+  private List<Statistics> fsStats;
+
+  @Override
+  public List<Event> initialize(TezInputContext inputContext) throws IOException {
+    this.inputContext = inputContext;
+    Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+    this.jobConf = new JobConf(conf);
+
+    // Read split information.
+    TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
+    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()];
+    this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
+        thisTaskMetaInfo.getStartOffset());
+    
+    // TODO NEWTEZ Rename this to be specific to MRInput. This Input, in
+    // theory, can be used by the MapProcessor, ReduceProcessor or a custom
+    // processor. (The processor could provide the counter though)
+    this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
+    this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
+    
+    useNewApi = this.jobConf.getUseNewMapper();
+
+    if (useNewApi) {
+      TaskAttemptContext taskAttemptContext = createTaskAttemptContext();
+      Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
+      try {
+        inputFormatClazz = taskAttemptContext.getInputFormatClass();
+      } catch (ClassNotFoundException e) {
+        throw new IOException("Unable to instantiate InputFormat class", e);
+      }
+
+      newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf);
+
+      newInputSplit = getNewSplitDetails(splitMetaInfo);
+
+      List<Statistics> matchedStats = null;
+      if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+        matchedStats = Utils.getFsStatistics(
+            ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
+                newInputSplit).getPath(), this.jobConf);
+      }
+      fsStats = matchedStats;
+      
+      try {
+        newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
+        newRecordReader.initialize(newInputSplit, taskAttemptContext);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while creating record reader", e);
+      }
+    } else { // OLD API
+      oldInputFormat = this.jobConf.getInputFormat();
+      InputSplit oldInputSplit =
+          getOldSplitDetails(splitMetaInfo);
+      
+      
+      List<Statistics> matchedStats = null;
+      if (oldInputSplit instanceof FileSplit) {
+        matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
+      }
+      fsStats = matchedStats;
+      
+      long bytesInPrev = getInputBytes();
+      oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
+          this.jobConf, new MRReporter(inputContext, oldInputSplit));
+      long bytesInCurr = getInputBytes();
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+      setIncrementalConfigParams(oldInputSplit);
+    }    
+    return null;
+  }
+
+  @Override
+  public KVReader getReader() throws IOException {
+    Preconditions
+        .checkState(recordReaderCreated == false,
+            "Only a single instance of record reader can be created for this input.");
+    recordReaderCreated = true;
+    return new MRInputKVReader();
+  }
+
+
+  @Override
+  public void handleEvents(List<Event> inputEvents) {
+    // Not expecting any events at the moment.
+  }
+
+
+  @Override
+  public void setNumPhysicalInputs(int numInputs) {
+    // Not required at the moment. May be required if splits are sent via events.
+  }
+
+  @Override
+  public List<Event> close() throws IOException {
+    long bytesInPrev = getInputBytes();
+    if (useNewApi) {
+      newRecordReader.close();
+    } else {
+      oldRecordReader.close();
+    }
+    long bytesInCurr = getInputBytes();
+    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+    
+    return null;
+  }
+
+  /**
+   * {@link MRInput} sets some additional parameters like split location when using
+   * the new API. This methods returns the list of additional updates, and
+   * should be used by Processors using the old MapReduce API with {@link MRInput}.
+   * 
+   * @return the additional fields set by {@link MRInput}
+   */
+  public Configuration getConfigUpdates() {
+    return new Configuration(incrementalConf);
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    if (useNewApi) {
+      return newRecordReader.getProgress();
+    } else {
+      return oldRecordReader.getProgress();
+    }
+  }
+
+  
+  private TaskAttemptContext createTaskAttemptContext() {
+    return new TaskAttemptContextImpl(this.jobConf, inputContext, true);
+  }
+  
+
+  private static class SimpleValueIterator implements Iterator<Object> {
+
+    private Object value;
+
+    public void setValue(Object value) {
+      this.value = value;
+    }
+
+    public boolean hasNext() {
+      return value != null;
+    }
+
+    public Object next() {
+      Object value = this.value;
+      this.value = null;
+      return value;
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class SimpleIterable implements Iterable<Object> {
+    private final Iterator<Object> iterator;
+    public SimpleIterable(Iterator<Object> iterator) {
+      this.iterator = iterator;
+    }
+
+    @Override
+    public Iterator<Object> iterator() {
+      return iterator;
+    }
+  }
+
+
+
+  
+  @SuppressWarnings("unchecked")
+  private InputSplit getOldSplitDetails(TaskSplitIndex splitMetaInfo)
+      throws IOException {
+    Path file = new Path(splitMetaInfo.getSplitLocation());
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    file = fs.makeQualified(file);
+    LOG.info("Reading input split file from : " + file);
+    long offset = splitMetaInfo.getStartOffset();
+    
+    FSDataInputStream inFile = fs.open(file);
+    inFile.seek(offset);
+    String className = Text.readString(inFile);
+    Class<org.apache.hadoop.mapred.InputSplit> cls;
+    try {
+      cls = 
+          (Class<org.apache.hadoop.mapred.InputSplit>) 
+          jobConf.getClassByName(className);
+    } catch (ClassNotFoundException ce) {
+      IOException wrap = new IOException("Split class " + className + 
+          " not found");
+      wrap.initCause(ce);
+      throw wrap;
+    }
+    SerializationFactory factory = new SerializationFactory(jobConf);
+    Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = 
+        (Deserializer<org.apache.hadoop.mapred.InputSplit>) 
+        factory.getDeserializer(cls);
+    deserializer.open(inFile);
+    org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
+    long pos = inFile.getPos();
+    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+        .increment(pos - offset);
+    inFile.close();
+    return split;
+  }
+
+  @SuppressWarnings("unchecked")
+  private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetails(
+      TaskSplitIndex splitMetaInfo) throws IOException {
+    Path file = new Path(splitMetaInfo.getSplitLocation());
+    long offset = splitMetaInfo.getStartOffset();
+    
+    // Split information read from local filesystem.
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    file = fs.makeQualified(file);
+    LOG.info("Reading input split file from : " + file);
+    FSDataInputStream inFile = fs.open(file);
+    inFile.seek(offset);
+    String className = Text.readString(inFile);
+    Class<org.apache.hadoop.mapreduce.InputSplit> cls;
+    try {
+      cls = 
+          (Class<org.apache.hadoop.mapreduce.InputSplit>) 
+          jobConf.getClassByName(className);
+    } catch (ClassNotFoundException ce) {
+      IOException wrap = new IOException("Split class " + className + 
+          " not found");
+      wrap.initCause(ce);
+      throw wrap;
+    }
+    SerializationFactory factory = new SerializationFactory(jobConf);
+    Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = 
+        (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) 
+        factory.getDeserializer(cls);
+    deserializer.open(inFile);
+    org.apache.hadoop.mapreduce.InputSplit split = 
+        deserializer.deserialize(null);
+    long pos = inFile.getPos();
+    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+        .increment(pos - offset);
+    inFile.close();
+    return split;
+  }
+
+  private void setIncrementalConfigParams(InputSplit inputSplit) {
+    if (inputSplit instanceof FileSplit) {
+      FileSplit fileSplit = (FileSplit) inputSplit;
+      this.incrementalConf = new Configuration(false);
+
+      this.incrementalConf.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath()
+          .toString());
+      this.incrementalConf.setLong(JobContext.MAP_INPUT_START,
+          fileSplit.getStart());
+      this.incrementalConf.setLong(JobContext.MAP_INPUT_PATH,
+          fileSplit.getLength());
+    }
+    LOG.info("Processing split: " + inputSplit);
+  }
+
+  private long getInputBytes() {
+    if (fsStats == null) return 0;
+    long bytesRead = 0;
+    for (Statistics stat: fsStats) {
+      bytesRead = bytesRead + stat.getBytesRead();
+    }
+    return bytesRead;
+  }
+
+  protected TaskSplitMetaInfo[] readSplits(Configuration conf)
+      throws IOException {
+    TaskSplitMetaInfo[] allTaskSplitMetaInfo;
+    allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(conf,
+        FileSystem.getLocal(conf));
+    return allTaskSplitMetaInfo;
+  }
+  
+  private class MRInputKVReader implements KVReader {
+    
+    Object key;
+    Object value;
+
+    private SimpleValueIterator valueIterator = new SimpleValueIterator();
+    private SimpleIterable valueIterable = new SimpleIterable(valueIterator);
+
+    private final boolean localNewApi;
+    
+    MRInputKVReader() {
+      localNewApi = useNewApi;
+      if (!localNewApi) {
+        key = oldRecordReader.createKey();
+        value =oldRecordReader.createValue();
+      }
+    }
+    
+    // Setup the values iterator once, and set value on the same object each time
+    // to prevent lots of objects being created.
+
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean next() throws IOException {
+      boolean hasNext = false;
+      long bytesInPrev = getInputBytes();
+      if (localNewApi) {
+        try {
+          hasNext = newRecordReader.nextKeyValue();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while checking for next key-value", e);
+        }
+      } else {
+        hasNext = oldRecordReader.next(key, value);
+      }
+      long bytesInCurr = getInputBytes();
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+      
+      if (hasNext) {
+        inputRecordCounter.increment(1);
+      }
+      
+      return hasNext;
+    }
+
+    @Override
+    public KVRecord getCurrentKV() throws IOException {
+      KVRecord kvRecord = null;
+      if (localNewApi) {
+        try {
+          valueIterator.setValue(newRecordReader.getCurrentValue());
+          kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while fetching next key-value", e);
+        }
+        
+      } else {
+        valueIterator.setValue(value);
+        kvRecord = new KVRecord(key, valueIterable);
+      }
+      return kvRecord;
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
new file mode 100644
index 0000000..5923746
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.input;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.mapred.RecordReader;
+
+public class MRInputLegacy extends MRInput {
+
+  @Private
+  public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
+    return this.newInputSplit;
+  }  
+  
+  @SuppressWarnings("rawtypes")
+  @Private
+  public RecordReader getOldRecordReader() {
+    return this.oldRecordReader;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
deleted file mode 100644
index 598f801..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.mapreduce.input;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVReader;
-import org.apache.tez.engine.api.LogicalInput;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.mapreduce.common.Utils;
-import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
-import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
-
-import com.google.common.base.Preconditions;
-
-/**
- * {@link SimpleInput} is an {@link Input} which provides key/values pairs
- * for the consumer.
- *
- * It is compatible with all standard Apache Hadoop MapReduce 
- * {@link InputFormat} implementations.
- */
-
-public class SimpleInput implements LogicalInput {
-
-  private static final Log LOG = LogFactory.getLog(SimpleInput.class);
-  
-  
-  private TezInputContext inputContext;
-  
-  private JobConf jobConf;
-  private Configuration incrementalConf;
-  private boolean recordReaderCreated = false;
-  
-  boolean useNewApi;
-  
-  org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
-
-  @SuppressWarnings("rawtypes")
-  private org.apache.hadoop.mapreduce.InputFormat newInputFormat;
-  @SuppressWarnings("rawtypes")
-  private org.apache.hadoop.mapreduce.RecordReader newRecordReader;
-  protected org.apache.hadoop.mapreduce.InputSplit newInputSplit;
-  
-  @SuppressWarnings("rawtypes")
-  private InputFormat oldInputFormat;
-  @SuppressWarnings("rawtypes")
-  protected RecordReader oldRecordReader;
-
-  protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
-  
-  private TezCounter inputRecordCounter;
-  private TezCounter fileInputByteCounter; 
-  private List<Statistics> fsStats;
-
-  @Override
-  public List<Event> initialize(TezInputContext inputContext) throws IOException {
-    this.inputContext = inputContext;
-    Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
-    this.jobConf = new JobConf(conf);
-
-    // Read split information.
-    TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
-    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()];
-    this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
-        thisTaskMetaInfo.getStartOffset());
-    
-    // TODO NEWTEZ Rename this to be specific to SimpleInput. This Input, in
-    // theory, can be used by the MapProcessor, ReduceProcessor or a custom
-    // processor. (The processor could provide the counter though)
-    this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
-    this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
-    
-    useNewApi = this.jobConf.getUseNewMapper();
-
-    if (useNewApi) {
-      TaskAttemptContext taskAttemptContext = createTaskAttemptContext();
-      Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
-      try {
-        inputFormatClazz = taskAttemptContext.getInputFormatClass();
-      } catch (ClassNotFoundException e) {
-        throw new IOException("Unable to instantiate InputFormat class", e);
-      }
-
-      newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf);
-
-      newInputSplit = getNewSplitDetails(splitMetaInfo);
-
-      List<Statistics> matchedStats = null;
-      if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
-        matchedStats = Utils.getFsStatistics(
-            ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
-                newInputSplit).getPath(), this.jobConf);
-      }
-      fsStats = matchedStats;
-      
-      try {
-        newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
-        newRecordReader.initialize(newInputSplit, taskAttemptContext);
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted while creating record reader", e);
-      }
-    } else { // OLD API
-      oldInputFormat = this.jobConf.getInputFormat();
-      InputSplit oldInputSplit =
-          getOldSplitDetails(splitMetaInfo);
-      
-      
-      List<Statistics> matchedStats = null;
-      if (oldInputSplit instanceof FileSplit) {
-        matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
-      }
-      fsStats = matchedStats;
-      
-      long bytesInPrev = getInputBytes();
-      oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
-          this.jobConf, new MRReporter(inputContext, oldInputSplit));
-      long bytesInCurr = getInputBytes();
-      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-      setIncrementalConfigParams(oldInputSplit);
-    }    
-    return null;
-  }
-
-  @Override
-  public KVReader getReader() throws IOException {
-    Preconditions
-        .checkState(recordReaderCreated == false,
-            "Only a single instance of record reader can be created for this input.");
-    recordReaderCreated = true;
-    return new MRInputKVReader();
-  }
-
-
-  @Override
-  public void handleEvents(List<Event> inputEvents) {
-    // Not expecting any events at the moment.
-  }
-
-
-  @Override
-  public void setNumPhysicalInputs(int numInputs) {
-    // Not required at the moment. May be required if splits are sent via events.
-  }
-
-  @Override
-  public List<Event> close() throws IOException {
-    long bytesInPrev = getInputBytes();
-    if (useNewApi) {
-      newRecordReader.close();
-    } else {
-      oldRecordReader.close();
-    }
-    long bytesInCurr = getInputBytes();
-    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-    
-    return null;
-  }
-
-  /**
-   * SimpleInputs sets some additional parameters like split location when using
-   * the new API. This methods returns the list of additional updates, and
-   * should be used by Processors using the old MapReduce API with SimpleInput.
-   * 
-   * @return the additional fields set by SimpleInput
-   */
-  public Configuration getConfigUpdates() {
-    return new Configuration(incrementalConf);
-  }
-
-  public float getProgress() throws IOException, InterruptedException {
-    if (useNewApi) {
-      return newRecordReader.getProgress();
-    } else {
-      return oldRecordReader.getProgress();
-    }
-  }
-
-  
-  private TaskAttemptContext createTaskAttemptContext() {
-    return new TaskAttemptContextImpl(this.jobConf, inputContext, true);
-  }
-  
-
-  private static class SimpleValueIterator implements Iterator<Object> {
-
-    private Object value;
-
-    public void setValue(Object value) {
-      this.value = value;
-    }
-
-    public boolean hasNext() {
-      return value != null;
-    }
-
-    public Object next() {
-      Object value = this.value;
-      this.value = null;
-      return value;
-    }
-
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  private static class SimpleIterable implements Iterable<Object> {
-    private final Iterator<Object> iterator;
-    public SimpleIterable(Iterator<Object> iterator) {
-      this.iterator = iterator;
-    }
-
-    @Override
-    public Iterator<Object> iterator() {
-      return iterator;
-    }
-  }
-
-
-
-  
-  @SuppressWarnings("unchecked")
-  private InputSplit getOldSplitDetails(TaskSplitIndex splitMetaInfo)
-      throws IOException {
-    Path file = new Path(splitMetaInfo.getSplitLocation());
-    FileSystem fs = FileSystem.getLocal(jobConf);
-    file = fs.makeQualified(file);
-    LOG.info("Reading input split file from : " + file);
-    long offset = splitMetaInfo.getStartOffset();
-    
-    FSDataInputStream inFile = fs.open(file);
-    inFile.seek(offset);
-    String className = Text.readString(inFile);
-    Class<org.apache.hadoop.mapred.InputSplit> cls;
-    try {
-      cls = 
-          (Class<org.apache.hadoop.mapred.InputSplit>) 
-          jobConf.getClassByName(className);
-    } catch (ClassNotFoundException ce) {
-      IOException wrap = new IOException("Split class " + className + 
-          " not found");
-      wrap.initCause(ce);
-      throw wrap;
-    }
-    SerializationFactory factory = new SerializationFactory(jobConf);
-    Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = 
-        (Deserializer<org.apache.hadoop.mapred.InputSplit>) 
-        factory.getDeserializer(cls);
-    deserializer.open(inFile);
-    org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
-    long pos = inFile.getPos();
-    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
-        .increment(pos - offset);
-    inFile.close();
-    return split;
-  }
-
-  @SuppressWarnings("unchecked")
-  private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetails(
-      TaskSplitIndex splitMetaInfo) throws IOException {
-    Path file = new Path(splitMetaInfo.getSplitLocation());
-    long offset = splitMetaInfo.getStartOffset();
-    
-    // Split information read from local filesystem.
-    FileSystem fs = FileSystem.getLocal(jobConf);
-    file = fs.makeQualified(file);
-    LOG.info("Reading input split file from : " + file);
-    FSDataInputStream inFile = fs.open(file);
-    inFile.seek(offset);
-    String className = Text.readString(inFile);
-    Class<org.apache.hadoop.mapreduce.InputSplit> cls;
-    try {
-      cls = 
-          (Class<org.apache.hadoop.mapreduce.InputSplit>) 
-          jobConf.getClassByName(className);
-    } catch (ClassNotFoundException ce) {
-      IOException wrap = new IOException("Split class " + className + 
-          " not found");
-      wrap.initCause(ce);
-      throw wrap;
-    }
-    SerializationFactory factory = new SerializationFactory(jobConf);
-    Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = 
-        (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) 
-        factory.getDeserializer(cls);
-    deserializer.open(inFile);
-    org.apache.hadoop.mapreduce.InputSplit split = 
-        deserializer.deserialize(null);
-    long pos = inFile.getPos();
-    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
-        .increment(pos - offset);
-    inFile.close();
-    return split;
-  }
-
-  private void setIncrementalConfigParams(InputSplit inputSplit) {
-    if (inputSplit instanceof FileSplit) {
-      FileSplit fileSplit = (FileSplit) inputSplit;
-      this.incrementalConf = new Configuration(false);
-
-      this.incrementalConf.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath()
-          .toString());
-      this.incrementalConf.setLong(JobContext.MAP_INPUT_START,
-          fileSplit.getStart());
-      this.incrementalConf.setLong(JobContext.MAP_INPUT_PATH,
-          fileSplit.getLength());
-    }
-    LOG.info("Processing split: " + inputSplit);
-  }
-
-  private long getInputBytes() {
-    if (fsStats == null) return 0;
-    long bytesRead = 0;
-    for (Statistics stat: fsStats) {
-      bytesRead = bytesRead + stat.getBytesRead();
-    }
-    return bytesRead;
-  }
-
-  protected TaskSplitMetaInfo[] readSplits(Configuration conf)
-      throws IOException {
-    TaskSplitMetaInfo[] allTaskSplitMetaInfo;
-    allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(conf,
-        FileSystem.getLocal(conf));
-    return allTaskSplitMetaInfo;
-  }
-  
-  private class MRInputKVReader implements KVReader {
-    
-    Object key;
-    Object value;
-
-    private SimpleValueIterator valueIterator = new SimpleValueIterator();
-    private SimpleIterable valueIterable = new SimpleIterable(valueIterator);
-
-    private final boolean localNewApi;
-    
-    MRInputKVReader() {
-      localNewApi = useNewApi;
-      if (!localNewApi) {
-        key = oldRecordReader.createKey();
-        value =oldRecordReader.createValue();
-      }
-    }
-    
-    // Setup the values iterator once, and set value on the same object each time
-    // to prevent lots of objects being created.
-
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public boolean next() throws IOException {
-      boolean hasNext = false;
-      long bytesInPrev = getInputBytes();
-      if (localNewApi) {
-        try {
-          hasNext = newRecordReader.nextKeyValue();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new IOException("Interrupted while checking for next key-value", e);
-        }
-      } else {
-        hasNext = oldRecordReader.next(key, value);
-      }
-      long bytesInCurr = getInputBytes();
-      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-      
-      if (hasNext) {
-        inputRecordCounter.increment(1);
-      }
-      
-      return hasNext;
-    }
-
-    @Override
-    public KVRecord getCurrentKV() throws IOException {
-      KVRecord kvRecord = null;
-      if (localNewApi) {
-        try {
-          valueIterator.setValue(newRecordReader.getCurrentValue());
-          kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new IOException("Interrupted while fetching next key-value", e);
-        }
-        
-      } else {
-        valueIterator.setValue(value);
-        kvRecord = new KVRecord(key, valueIterable);
-      }
-      return kvRecord;
-    }
-  };
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java
deleted file mode 100644
index 4e61aa7..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.input;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.mapred.RecordReader;
-
-public class SimpleInputLegacy extends SimpleInput {
-
-  @Private
-  public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
-    return this.newInputSplit;
-  }  
-  
-  @SuppressWarnings("rawtypes")
-  @Private
-  public RecordReader getOldRecordReader() {
-    return this.oldRecordReader;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
new file mode 100644
index 0000000..e6bdbe6
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -0,0 +1,326 @@
+package org.apache.tez.mapreduce.output;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Event;
+import org.apache.tez.engine.api.KVWriter;
+import org.apache.tez.engine.api.LogicalOutput;
+import org.apache.tez.engine.api.TezOutputContext;
+import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
+import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+public class MROutput implements LogicalOutput {
+
+  private static final Log LOG = LogFactory.getLog(MROutput.class);
+
+  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+  static {
+    NUMBER_FORMAT.setMinimumIntegerDigits(5);
+    NUMBER_FORMAT.setGroupingUsed(false);
+  }
+
+  private TezOutputContext outputContext;
+  private JobConf jobConf;
+  boolean useNewApi;
+  private AtomicBoolean closed = new AtomicBoolean(false);
+
+  @SuppressWarnings("rawtypes")
+  org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
+  @SuppressWarnings("rawtypes")
+  org.apache.hadoop.mapreduce.RecordWriter newRecordWriter;
+
+  @SuppressWarnings("rawtypes")
+  org.apache.hadoop.mapred.OutputFormat oldOutputFormat;
+  @SuppressWarnings("rawtypes")
+  org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
+
+  private TezCounter outputRecordCounter;
+  private TezCounter fileOutputByteCounter;
+  private List<Statistics> fsStats;
+
+  private TaskAttemptContext newApiTaskAttemptContext;
+  private org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext;
+
+  private boolean isMapperOutput;
+
+  private OutputCommitter committer;
+
+  @Override
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws IOException, InterruptedException {
+    LOG.info("Initializing Simple Output");
+    this.outputContext = outputContext;
+    Configuration conf = TezUtils.createConfFromUserPayload(
+        outputContext.getUserPayload());
+    this.jobConf = new JobConf(conf);
+    this.useNewApi = this.jobConf.getUseNewMapper();
+    this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
+        false);
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+        outputContext.getDAGAttemptNumber());
+
+    outputRecordCounter = outputContext.getCounters().findCounter(
+        TaskCounter.MAP_OUTPUT_RECORDS);
+    fileOutputByteCounter = outputContext.getCounters().findCounter(
+        FileOutputFormatCounter.BYTES_WRITTEN);
+
+    if (useNewApi) {
+      newApiTaskAttemptContext = createTaskAttemptContext();
+      try {
+        newOutputFormat =
+            ReflectionUtils.newInstance(
+                newApiTaskAttemptContext.getOutputFormatClass(), jobConf);
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException(cnfe);
+      }
+
+      List<Statistics> matchedStats = null;
+      if (newOutputFormat instanceof
+          org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
+        matchedStats =
+            Utils.getFsStatistics(
+                org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+                    .getOutputPath(newApiTaskAttemptContext),
+                jobConf);
+      }
+      fsStats = matchedStats;
+
+      long bytesOutPrev = getOutputBytes();
+      try {
+        newRecordWriter =
+            newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while creating record writer", e);
+      }
+      long bytesOutCurr = getOutputBytes();
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    } else {
+      TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(
+          outputContext.getApplicationId().getClusterTimestamp()),
+          outputContext.getApplicationId().getId(),
+          (isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
+          outputContext.getTaskIndex()),
+          outputContext.getTaskAttemptNumber());
+      jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
+      jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
+      jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
+      jobConf.setInt(JobContext.TASK_PARTITION,
+          taskAttemptId.getTaskID().getId());
+      jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
+
+      oldApiTaskAttemptContext =
+          new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
+              jobConf, taskAttemptId,
+              new MRTaskReporter(outputContext));
+      oldOutputFormat = jobConf.getOutputFormat();
+
+      List<Statistics> matchedStats = null;
+      if (oldOutputFormat
+          instanceof org.apache.hadoop.mapred.FileOutputFormat) {
+        matchedStats =
+            Utils.getFsStatistics(
+                org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
+                    jobConf),
+                jobConf);
+      }
+      fsStats = matchedStats;
+
+      FileSystem fs = FileSystem.get(jobConf);
+      String finalName = getOutputName();
+
+      long bytesOutPrev = getOutputBytes();
+      oldRecordWriter =
+          oldOutputFormat.getRecordWriter(
+              fs, jobConf, finalName, new MRReporter(outputContext));
+      long bytesOutCurr = getOutputBytes();
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    }
+    initCommitter(jobConf, useNewApi);
+
+    LOG.info("Initialized Simple Output"
+        + ", using_new_api: " + useNewApi);
+    return null;
+  }
+
+  public void initCommitter(JobConf job, boolean useNewApi)
+      throws IOException, InterruptedException {
+
+    if (useNewApi) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("using new api for output committer");
+      }
+
+      OutputFormat<?, ?> outputFormat = null;
+      try {
+        outputFormat = ReflectionUtils.newInstance(
+            newApiTaskAttemptContext.getOutputFormatClass(), job);
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException("Unknown OutputFormat", cnfe);
+      }
+      this.committer = outputFormat.getOutputCommitter(
+          newApiTaskAttemptContext);
+    } else {
+      this.committer = job.getOutputCommitter();
+    }
+
+    Path outputPath = FileOutputFormat.getOutputPath(job);
+    if (outputPath != null) {
+      if ((this.committer instanceof FileOutputCommitter)) {
+        FileOutputFormat.setWorkOutputPath(job,
+            ((FileOutputCommitter) this.committer).getTaskAttemptPath(
+                oldApiTaskAttemptContext));
+      } else {
+        FileOutputFormat.setWorkOutputPath(job, outputPath);
+      }
+    }
+    if (useNewApi) {
+      this.committer.setupTask(newApiTaskAttemptContext);
+    } else {
+      this.committer.setupTask(oldApiTaskAttemptContext);
+    }
+  }
+
+  public boolean isCommitRequired() throws IOException {
+    if (useNewApi) {
+      return committer.needsTaskCommit(newApiTaskAttemptContext);
+    } else {
+      return committer.needsTaskCommit(oldApiTaskAttemptContext);
+    }
+  }
+
+  private TaskAttemptContext createTaskAttemptContext() {
+    return new TaskAttemptContextImpl(this.jobConf, outputContext,
+        isMapperOutput);
+  }
+
+  private long getOutputBytes() {
+    if (fsStats == null) return 0;
+    long bytesWritten = 0;
+    for (Statistics stat: fsStats) {
+      bytesWritten = bytesWritten + stat.getBytesWritten();
+    }
+    return bytesWritten;
+  }
+
+  private String getOutputName() {
+    return "part-" + NUMBER_FORMAT.format(outputContext.getTaskIndex());
+  }
+
+  @Override
+  public KVWriter getWriter() throws IOException {
+    return new KVWriter() {
+      private final boolean useNewWriter = useNewApi;
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public void write(Object key, Object value) throws IOException {
+        long bytesOutPrev = getOutputBytes();
+        if (useNewWriter) {
+          try {
+            newRecordWriter.write(key, value);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Interrupted while writing next key-value",e);
+          }
+        } else {
+          oldRecordWriter.write(key, value);
+        }
+
+        long bytesOutCurr = getOutputBytes();
+        fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+        outputRecordCounter.increment(1);
+      }
+    };
+  }
+
+  @Override
+  public void handleEvents(List<Event> outputEvents) {
+    // Not expecting any events at the moment.
+  }
+
+  @Override
+  public synchronized List<Event> close() throws IOException {
+    if (closed.getAndSet(true)) {
+      return null;
+    }
+
+    LOG.info("Closing Simple Output");
+    long bytesOutPrev = getOutputBytes();
+    if (useNewApi) {
+      try {
+        newRecordWriter.close(newApiTaskAttemptContext);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while closing record writer", e);
+      }
+    } else {
+      oldRecordWriter.close(null);
+    }
+    long bytesOutCurr = getOutputBytes();
+    fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    LOG.info("Closed Simple Output");
+    return null;
+  }
+
+  @Override
+  public void setNumPhysicalOutputs(int numOutputs) {
+    // Nothing to do for now
+  }
+
+  /**
+   * MROutput expects that a Processor call commit prior to the
+   * Processor's completion
+   * @throws IOException
+   */
+  public void commit() throws IOException {
+    close();
+    if (useNewApi) {
+      committer.commitTask(newApiTaskAttemptContext);
+    } else {
+      committer.commitTask(oldApiTaskAttemptContext);
+    }
+  }
+
+
+  /**
+   * MROutput expects that a Processor call abort in case of any error
+   * ( including an error during commit ) prior to the Processor's completion
+   * @throws IOException
+   */
+  public void abort() throws IOException {
+    close();
+    if (useNewApi) {
+      committer.abortTask(newApiTaskAttemptContext);
+    } else {
+      committer.abortTask(oldApiTaskAttemptContext);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
deleted file mode 100644
index d82c9e2..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
+++ /dev/null
@@ -1,326 +0,0 @@
-package org.apache.tez.mapreduce.output;
-
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVWriter;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.mapreduce.common.Utils;
-import org.apache.tez.mapreduce.hadoop.MRConfig;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
-import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
-import org.apache.tez.mapreduce.processor.MRTaskReporter;
-
-public class SimpleOutput implements LogicalOutput {
-
-  private static final Log LOG = LogFactory.getLog(SimpleOutput.class);
-
-  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
-  static {
-    NUMBER_FORMAT.setMinimumIntegerDigits(5);
-    NUMBER_FORMAT.setGroupingUsed(false);
-  }
-
-  private TezOutputContext outputContext;
-  private JobConf jobConf;
-  boolean useNewApi;
-  private AtomicBoolean closed = new AtomicBoolean(false);
-
-  @SuppressWarnings("rawtypes")
-  org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
-  @SuppressWarnings("rawtypes")
-  org.apache.hadoop.mapreduce.RecordWriter newRecordWriter;
-
-  @SuppressWarnings("rawtypes")
-  org.apache.hadoop.mapred.OutputFormat oldOutputFormat;
-  @SuppressWarnings("rawtypes")
-  org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
-
-  private TezCounter outputRecordCounter;
-  private TezCounter fileOutputByteCounter;
-  private List<Statistics> fsStats;
-
-  private TaskAttemptContext newApiTaskAttemptContext;
-  private org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext;
-
-  private boolean isMapperOutput;
-
-  private OutputCommitter committer;
-
-  @Override
-  public List<Event> initialize(TezOutputContext outputContext)
-      throws IOException, InterruptedException {
-    LOG.info("Initializing Simple Output");
-    this.outputContext = outputContext;
-    Configuration conf = TezUtils.createConfFromUserPayload(
-        outputContext.getUserPayload());
-    this.jobConf = new JobConf(conf);
-    this.useNewApi = this.jobConf.getUseNewMapper();
-    this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
-        false);
-    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
-        outputContext.getDAGAttemptNumber());
-
-    outputRecordCounter = outputContext.getCounters().findCounter(
-        TaskCounter.MAP_OUTPUT_RECORDS);
-    fileOutputByteCounter = outputContext.getCounters().findCounter(
-        FileOutputFormatCounter.BYTES_WRITTEN);
-
-    if (useNewApi) {
-      newApiTaskAttemptContext = createTaskAttemptContext();
-      try {
-        newOutputFormat =
-            ReflectionUtils.newInstance(
-                newApiTaskAttemptContext.getOutputFormatClass(), jobConf);
-      } catch (ClassNotFoundException cnfe) {
-        throw new IOException(cnfe);
-      }
-
-      List<Statistics> matchedStats = null;
-      if (newOutputFormat instanceof
-          org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
-        matchedStats =
-            Utils.getFsStatistics(
-                org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
-                    .getOutputPath(newApiTaskAttemptContext),
-                jobConf);
-      }
-      fsStats = matchedStats;
-
-      long bytesOutPrev = getOutputBytes();
-      try {
-        newRecordWriter =
-            newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted while creating record writer", e);
-      }
-      long bytesOutCurr = getOutputBytes();
-      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
-    } else {
-      TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(
-          outputContext.getApplicationId().getClusterTimestamp()),
-          outputContext.getApplicationId().getId(),
-          (isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
-          outputContext.getTaskIndex()),
-          outputContext.getTaskAttemptNumber());
-      jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
-      jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
-      jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
-      jobConf.setInt(JobContext.TASK_PARTITION,
-          taskAttemptId.getTaskID().getId());
-      jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
-
-      oldApiTaskAttemptContext =
-          new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
-              jobConf, taskAttemptId,
-              new MRTaskReporter(outputContext));
-      oldOutputFormat = jobConf.getOutputFormat();
-
-      List<Statistics> matchedStats = null;
-      if (oldOutputFormat
-          instanceof org.apache.hadoop.mapred.FileOutputFormat) {
-        matchedStats =
-            Utils.getFsStatistics(
-                org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
-                    jobConf),
-                jobConf);
-      }
-      fsStats = matchedStats;
-
-      FileSystem fs = FileSystem.get(jobConf);
-      String finalName = getOutputName();
-
-      long bytesOutPrev = getOutputBytes();
-      oldRecordWriter =
-          oldOutputFormat.getRecordWriter(
-              fs, jobConf, finalName, new MRReporter(outputContext));
-      long bytesOutCurr = getOutputBytes();
-      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
-    }
-    initCommitter(jobConf, useNewApi);
-
-    LOG.info("Initialized Simple Output"
-        + ", using_new_api: " + useNewApi);
-    return null;
-  }
-
-  public void initCommitter(JobConf job, boolean useNewApi)
-      throws IOException, InterruptedException {
-
-    if (useNewApi) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("using new api for output committer");
-      }
-
-      OutputFormat<?, ?> outputFormat = null;
-      try {
-        outputFormat = ReflectionUtils.newInstance(
-            newApiTaskAttemptContext.getOutputFormatClass(), job);
-      } catch (ClassNotFoundException cnfe) {
-        throw new IOException("Unknown OutputFormat", cnfe);
-      }
-      this.committer = outputFormat.getOutputCommitter(
-          newApiTaskAttemptContext);
-    } else {
-      this.committer = job.getOutputCommitter();
-    }
-
-    Path outputPath = FileOutputFormat.getOutputPath(job);
-    if (outputPath != null) {
-      if ((this.committer instanceof FileOutputCommitter)) {
-        FileOutputFormat.setWorkOutputPath(job,
-            ((FileOutputCommitter) this.committer).getTaskAttemptPath(
-                oldApiTaskAttemptContext));
-      } else {
-        FileOutputFormat.setWorkOutputPath(job, outputPath);
-      }
-    }
-    if (useNewApi) {
-      this.committer.setupTask(newApiTaskAttemptContext);
-    } else {
-      this.committer.setupTask(oldApiTaskAttemptContext);
-    }
-  }
-
-  public boolean isCommitRequired() throws IOException {
-    if (useNewApi) {
-      return committer.needsTaskCommit(newApiTaskAttemptContext);
-    } else {
-      return committer.needsTaskCommit(oldApiTaskAttemptContext);
-    }
-  }
-
-  private TaskAttemptContext createTaskAttemptContext() {
-    return new TaskAttemptContextImpl(this.jobConf, outputContext,
-        isMapperOutput);
-  }
-
-  private long getOutputBytes() {
-    if (fsStats == null) return 0;
-    long bytesWritten = 0;
-    for (Statistics stat: fsStats) {
-      bytesWritten = bytesWritten + stat.getBytesWritten();
-    }
-    return bytesWritten;
-  }
-
-  private String getOutputName() {
-    return "part-" + NUMBER_FORMAT.format(outputContext.getTaskIndex());
-  }
-
-  @Override
-  public KVWriter getWriter() throws IOException {
-    return new KVWriter() {
-      private final boolean useNewWriter = useNewApi;
-
-      @SuppressWarnings("unchecked")
-      @Override
-      public void write(Object key, Object value) throws IOException {
-        long bytesOutPrev = getOutputBytes();
-        if (useNewWriter) {
-          try {
-            newRecordWriter.write(key, value);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IOException("Interrupted while writing next key-value",e);
-          }
-        } else {
-          oldRecordWriter.write(key, value);
-        }
-
-        long bytesOutCurr = getOutputBytes();
-        fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
-        outputRecordCounter.increment(1);
-      }
-    };
-  }
-
-  @Override
-  public void handleEvents(List<Event> outputEvents) {
-    // Not expecting any events at the moment.
-  }
-
-  @Override
-  public synchronized List<Event> close() throws IOException {
-    if (closed.getAndSet(true)) {
-      return null;
-    }
-
-    LOG.info("Closing Simple Output");
-    long bytesOutPrev = getOutputBytes();
-    if (useNewApi) {
-      try {
-        newRecordWriter.close(newApiTaskAttemptContext);
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted while closing record writer", e);
-      }
-    } else {
-      oldRecordWriter.close(null);
-    }
-    long bytesOutCurr = getOutputBytes();
-    fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
-    LOG.info("Closed Simple Output");
-    return null;
-  }
-
-  @Override
-  public void setNumPhysicalOutputs(int numOutputs) {
-    // Nothing to do for now
-  }
-
-  /**
-   * SimpleOutput expects that a Processor call commit prior to the
-   * Processor's completion
-   * @throws IOException
-   */
-  public void commit() throws IOException {
-    close();
-    if (useNewApi) {
-      committer.commitTask(newApiTaskAttemptContext);
-    } else {
-      committer.commitTask(oldApiTaskAttemptContext);
-    }
-  }
-
-
-  /**
-   * SimpleOutput expects that a Processor call abort in case of any error
-   * ( including an error during commit ) prior to the Processor's completion
-   * @throws IOException
-   */
-  public void abort() throws IOException {
-    close();
-    if (useNewApi) {
-      committer.abortTask(newApiTaskAttemptContext);
-    } else {
-      committer.abortTask(oldApiTaskAttemptContext);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index f7404d4..fac1454 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -81,7 +81,7 @@ import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.output.MROutput;
 
 @SuppressWarnings("deprecation")
 public abstract class MRTask {
@@ -423,8 +423,8 @@ public abstract class MRTask {
         + " And is in the process of committing");
     // TODO change this to use the new context
     // TODO TEZ Interaciton between Commit and OutputReady. Merge ?
-    if (output instanceof SimpleOutput) {
-      SimpleOutput sOut = (SimpleOutput)output;
+    if (output instanceof MROutput) {
+      MROutput sOut = (MROutput)output;
       if (sOut.isCommitRequired()) {
         //wait for commit approval and commit
         // TODO EVENTUALLY - Commit is not required for map tasks.
@@ -458,7 +458,7 @@ public abstract class MRTask {
     statusUpdate();
   }
 
-  private void commit(SimpleOutput output) throws IOException {
+  private void commit(MROutput output) throws IOException {
     int retries = 3;
     while (true) {
       // This will loop till the AM asks for the task to be killed. As
@@ -495,7 +495,7 @@ public abstract class MRTask {
   }
 
   private
-  void discardOutput(SimpleOutput output) {
+  void discardOutput(MROutput output) {
     try {
       output.abort();
     } catch (IOException ioe)  {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
index 22312f7..85139ed 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
@@ -65,7 +65,7 @@ public class MRTaskReporter
     if (isProcessorContext) {
       ((TezProcessorContext)context).setProgress(progress);
     } else {
-      // TODO FIXME NEWTEZ - will simpleoutput's reporter use this api?
+      // TODO FIXME NEWTEZ - will MROutput's reporter use this api?
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 2084146..e4b990a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -44,9 +44,9 @@ import org.apache.tez.engine.api.LogicalOutput;
 import org.apache.tez.engine.api.TezProcessorContext;
 import org.apache.tez.engine.lib.output.OnFileSortedOutput;
 import org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl;
-import org.apache.tez.mapreduce.input.SimpleInput;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 
@@ -99,15 +99,15 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
     LogicalOutput out = outputs.values().iterator().next();
 
     // Sanity check
-    if (!(in instanceof SimpleInputLegacy)) {
+    if (!(in instanceof MRInputLegacy)) {
       throw new IOException(new TezException(
           "Only Simple Input supported. Input: " + in.getClass()));
     }
-    SimpleInputLegacy input = (SimpleInputLegacy)in;
+    MRInputLegacy input = (MRInputLegacy)in;
 
     KVWriter kvWriter = null;
     if (!(out instanceof OnFileSortedOutput)) {
-      kvWriter = ((SimpleOutput)out).getWriter();
+      kvWriter = ((MROutput)out).getWriter();
     } else {
       kvWriter = ((OnFileSortedOutput)out).getWriter();
     }
@@ -124,13 +124,13 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
   void runOldMapper(
       final JobConf job,
       final MRTaskReporter reporter,
-      final SimpleInputLegacy input,
+      final MRInputLegacy input,
       final KVWriter output
       ) throws IOException, InterruptedException {
 
     // Initialize input in-line since it sets parameters which may be used by the processor.
-    // Done only for SimpleInput.
-    // TODO use new method in SimpleInput to get required info
+    // Done only for MRInput.
+    // TODO use new method in MRInput to get required info
     //input.initialize(job, master);
 
     RecordReader in = new OldRecordReader(input);
@@ -147,13 +147,13 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
 
   private void runNewMapper(final JobConf job,
       MRTaskReporter reporter,
-      final SimpleInputLegacy in,
+      final MRInputLegacy in,
       KVWriter out
       ) throws IOException, InterruptedException {
 
     // Initialize input in-line since it sets parameters which may be used by the processor.
-    // Done only for SimpleInput.
-    // TODO use new method in SimpleInput to get required info
+    // Done only for MRInput.
+    // TODO use new method in MRInput to get required info
     //in.initialize(job, master);
 
     // make a task context so we can get the classes
@@ -197,10 +197,10 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
 
   private static class NewRecordReader extends
       org.apache.hadoop.mapreduce.RecordReader {
-    private final SimpleInput in;
+    private final MRInput in;
     private KVReader reader;
 
-    private NewRecordReader(SimpleInput in) throws IOException {
+    private NewRecordReader(MRInput in) throws IOException {
       this.in = in;
       this.reader = in.getReader();
     }
@@ -241,38 +241,38 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
   }
 
   private static class OldRecordReader implements RecordReader {
-    private final SimpleInputLegacy simpleInput;
+    private final MRInputLegacy mrInput;
 
-    private OldRecordReader(SimpleInputLegacy simpleInput) {
-      this.simpleInput = simpleInput;
+    private OldRecordReader(MRInputLegacy mrInput) {
+      this.mrInput = mrInput;
     }
 
     @Override
     public boolean next(Object key, Object value) throws IOException {
       // TODO broken
-//      simpleInput.setKey(key);
-//      simpleInput.setValue(value);
+//      mrInput.setKey(key);
+//      mrInput.setValue(value);
 //      try {
-//        return simpleInput.hasNext();
+//        return mrInput.hasNext();
 //      } catch (InterruptedException ie) {
 //        throw new IOException(ie);
 //      }
-      return simpleInput.getOldRecordReader().next(key, value);
+      return mrInput.getOldRecordReader().next(key, value);
     }
 
     @Override
     public Object createKey() {
-      return simpleInput.getOldRecordReader().createKey();
+      return mrInput.getOldRecordReader().createKey();
     }
 
     @Override
     public Object createValue() {
-      return simpleInput.getOldRecordReader().createValue();
+      return mrInput.getOldRecordReader().createValue();
     }
 
     @Override
     public long getPos() throws IOException {
-      return simpleInput.getOldRecordReader().getPos();
+      return mrInput.getOldRecordReader().getPos();
     }
 
     @Override
@@ -282,7 +282,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
     @Override
     public float getProgress() throws IOException {
       try {
-        return simpleInput.getProgress();
+        return mrInput.getProgress();
       } catch (InterruptedException ie) {
         throw new IOException(ie);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 9274765..19acb39 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -48,7 +48,7 @@ import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
 import org.apache.tez.engine.lib.output.OnFileSortedOutput;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 
@@ -133,8 +133,8 @@ implements LogicalIOProcessor {
     KVReader kvReader = shuffleInput.getReader();
 
     KVWriter kvWriter = null;
-    if((out instanceof SimpleOutput)) {
-      kvWriter = ((SimpleOutput) out).getWriter();
+    if((out instanceof MROutput)) {
+      kvWriter = ((MROutput) out).getWriter();
     } else if ((out instanceof OnFileSortedOutput)) {
       kvWriter = ((OnFileSortedOutput) out).getWriter();
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 06e2f4b..89292ab 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -48,7 +48,7 @@ import org.apache.tez.mapreduce.TestUmbilical;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.MapUtils;
 import org.junit.After;
@@ -120,7 +120,7 @@ public class TestMapProcessor {
     
     MapUtils.generateInputSplit(localFs, workDir, job, mapInput);
     
-    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 0);
+    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()), 0);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
 
     LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, job, 0,
@@ -191,7 +191,7 @@ public class TestMapProcessor {
 //            localFs, workDir, job, 0, new Path(workDir, "map0"), 
 //            new TestUmbilicalProtocol(true), vertexName, 
 //            Collections.singletonList(new InputSpec("NullVertex", 0,
-//                SimpleInput.class.getName())),
+//                MRInput.class.getName())),
 //            Collections.singletonList(new OutputSpec("FakeVertex", 1,
 //                OldInMemorySortedOutput.class.getName()))
 //            );

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5d86b935/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index a3abd76..274c353 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -54,8 +54,8 @@ import org.apache.tez.mapreduce.hadoop.IDConverter;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.MapUtils;
 import org.junit.After;
@@ -125,7 +125,7 @@ public class TestReduceProcessor {
     Path mapInput = new Path(workDir, "map0");
     MapUtils.generateInputSplit(localFs, workDir, mapConf, mapInput);
     
-    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 0);
+    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()), 0);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
     // Run a map
     LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, mapConf, 0,
@@ -152,7 +152,7 @@ public class TestReduceProcessor {
         ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(reduceConf));
     
     InputSpec reduceInputSpec = new InputSpec(mapVertexName, new InputDescriptor(LocalMergedInput.class.getName()), 1);
-    OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(SimpleOutput.class.getName()), 1);
+    OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(MROutput.class.getName()), 1);
     
     // Now run a reduce
     TaskSpec taskSpec = new TaskSpec(