You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/13 03:52:37 UTC

git commit: TEZ-637. [MR Support] Add all required info into JobConf for MR related I/O/P (bikas)

Updated Branches:
  refs/heads/master ac471ada3 -> 924ce6611


TEZ-637. [MR Support] Add all required info into JobConf for MR related I/O/P (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/924ce661
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/924ce661
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/924ce661

Branch: refs/heads/master
Commit: 924ce6611b5e6cfc5be6d6fc2da794ded4304180
Parents: ac471ad
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 12 18:52:31 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 12 18:52:31 2014 -0800

----------------------------------------------------------------------
 .../tez/mapreduce/examples/MRRSleepJob.java     |  2 +-
 .../mapreduce/examples/OrderedWordCount.java    |  2 +-
 .../apache/tez/mapreduce/client/YARNRunner.java |  2 +-
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  | 10 +++-
 .../org/apache/tez/mapreduce/input/MRInput.java |  2 +-
 .../tez/mapreduce/input/MRInputLegacy.java      |  7 +++
 .../apache/tez/mapreduce/output/MROutput.java   |  2 +-
 .../tez/mapreduce/output/MROutputLegacy.java    | 28 ++++++++++
 .../apache/tez/mapreduce/processor/MRTask.java  | 49 +++++++++++------
 .../mapreduce/processor/map/MapProcessor.java   | 57 ++++++++++++++++----
 .../processor/reduce/ReduceProcessor.java       | 14 ++---
 .../processor/reduce/TestReduceProcessor.java   |  9 ++--
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  2 +-
 13 files changed, 144 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 4353fb1..5f3b184 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -650,7 +650,7 @@ public class MRRSleepJob extends Configured implements Tool {
       Map<String, String> reduceEnv = new HashMap<String, String>();
       MRHelpers.updateEnvironmentForMRTasks(finalReduceConf, reduceEnv, false);
       finalReduceVertex.setTaskEnvironment(reduceEnv);
-      MRHelpers.addMROutput(finalReduceVertex, reducePayload);
+      MRHelpers.addMROutputLegacy(finalReduceVertex, reducePayload);
       vertices.add(finalReduceVertex);
     } else {
       // Map only job

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 2000668..3b5d573 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -273,7 +273,7 @@ public class OrderedWordCount {
     Map<String, String> reduceEnv = new HashMap<String, String>();
     MRHelpers.updateEnvironmentForMRTasks(finalReduceConf, reduceEnv, false);
     finalReduceVertex.setTaskEnvironment(reduceEnv);
-    MRHelpers.addMROutput(finalReduceVertex, finalReducePayload);
+    MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload);
     vertices.add(finalReduceVertex);
 
     DAG dag = new DAG("OrderedWordCount" + dagIndex);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index 2102a1c..49057d3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -414,7 +414,7 @@ public class YARNRunner implements ClientProtocol {
     }
     // Map only jobs.
     if (stageNum == totalStages -1) {
-      MRHelpers.addMROutput(vertex, vertexUserPayload);
+      MRHelpers.addMROutputLegacy(vertex, vertexUserPayload);
     }
 
     Map<String, String> taskEnv = new HashMap<String, String>();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index eaa597a..a01503c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -74,6 +74,7 @@ import org.apache.tez.mapreduce.combine.MRCombiner;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
@@ -952,7 +953,14 @@ public class MRHelpers {
         .setUserPayload(userPayload);
     vertex.addOutput("MROutput", od, MROutputCommitter.class);
   }
-  
+
+  @Private
+  public static void addMROutputLegacy(Vertex vertex, byte[] userPayload) {
+    OutputDescriptor od = new OutputDescriptor(MROutputLegacy.class.getName())
+        .setUserPayload(userPayload);
+    vertex.addOutput("MROutput", od, MROutputCommitter.class);
+  }
+
   @SuppressWarnings("unchecked")
   public static InputSplit createOldFormatSplitFromUserPayload(
       MRSplitProto splitProto, SerializationFactory serializationFactory)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 70003ef..f8f99b7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -105,7 +105,7 @@ public class MRInput implements LogicalInput {
   private InputFormat oldInputFormat;
   @SuppressWarnings("rawtypes")
   protected RecordReader oldRecordReader;
-  private InputSplit oldInputSplit;
+  protected InputSplit oldInputSplit;
 
   protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index 5c2e515..9419aae 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 
@@ -49,6 +50,12 @@ public class MRInputLegacy extends MRInput {
     return this.newInputSplit;
   }  
   
+
+  @Private
+  public InputSplit getOldInputSplit() {
+    return this.oldInputSplit;
+  }
+  
   @SuppressWarnings("rawtypes")
   @Private
   public RecordReader getOldRecordReader() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index efa18b4..66d4566 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -84,7 +84,7 @@ public class MROutput implements LogicalOutput {
 
   private boolean isMapperOutput;
 
-  private OutputCommitter committer;
+  protected OutputCommitter committer;
 
   @Override
   public List<Event> initialize(TezOutputContext outputContext)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java
new file mode 100644
index 0000000..7c9f804
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.output;
+
+import org.apache.hadoop.mapreduce.OutputCommitter;
+
+public class MROutputLegacy extends MROutput {
+
+  public OutputCommitter getOutputCommitter() {
+    return committer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 87c7912..242b798 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
@@ -77,7 +79,8 @@ import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
-import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
+import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.library.common.Constants;
@@ -102,6 +105,9 @@ public abstract class MRTask {
   protected TaskAttemptID taskAttemptId;
   protected Progress progress = new Progress();
   protected SecretKey jobTokenSecret;
+  
+  LogicalInput input;
+  LogicalOutput output;
 
   boolean isMap;
 
@@ -334,8 +340,13 @@ public abstract class MRTask {
     return this.processorContext;
   }
 
-  public void initTask() throws IOException,
+  public void initTask(LogicalOutput output) throws IOException,
                                 InterruptedException {
+    // By this time output has been initialized
+    this.output = output;
+    if (output instanceof MROutputLegacy) {
+      committer = ((MROutputLegacy)output).getOutputCommitter();
+    }
     this.mrReporter = new MRTaskReporter(processorContext);
     this.useNewApi = jobConf.getUseNewMapper();
     TezDAGID dagId = IDConverter.fromMRTaskAttemptId(taskAttemptId).getTaskID()
@@ -366,14 +377,6 @@ public abstract class MRTask {
     return null;
   }
 
-  public OutputCommitter getCommitter() {
-    return committer;
-  }
-
-  public void setCommitter(OutputCommitter committer) {
-    this.committer = committer;
-  }
-
   public TezCounters getCounters() { return counters; }
 
   public void setConf(JobConf jobConf) {
@@ -415,15 +418,15 @@ public abstract class MRTask {
       InterruptedException {
   }
 
-  public void done(LogicalOutput output) throws IOException, InterruptedException {
+  public void done() throws IOException, InterruptedException {
     updateCounters();
 
     LOG.info("Task:" + taskAttemptId + " is done."
         + " And is in the process of committing");
     // TODO change this to use the new context
     // TODO TEZ Interaciton between Commit and OutputReady. Merge ?
-    if (output instanceof MROutput) {
-      MROutput sOut = (MROutput)output;
+    if (output instanceof MROutputLegacy) {
+      MROutputLegacy sOut = (MROutputLegacy)output;
       if (sOut.isCommitRequired()) {
         //wait for commit approval and commit
         // TODO EVENTUALLY - Commit is not required for map tasks.
@@ -456,7 +459,7 @@ public abstract class MRTask {
     statusUpdate();
   }
 
-  private void commit(MROutput output) throws IOException {
+  private void commit(MROutputLegacy output) throws IOException {
     int retries = 3;
     while (true) {
       // This will loop till the AM asks for the task to be killed. As
@@ -493,7 +496,7 @@ public abstract class MRTask {
   }
 
   private
-  void discardOutput(MROutput output) {
+  void discardOutput(MROutputLegacy output) {
     try {
       output.abort();
     } catch (IOException ioe)  {
@@ -657,7 +660,9 @@ public abstract class MRTask {
     statusUpdate();
     LOG.info("Runnning cleanup for the task");
     // do the cleanup
-    committer.abortTask(taskAttemptContext);
+    if (output instanceof MROutputLegacy) {
+      ((MROutputLegacy) output).abort();
+    }
   }
 
   public void localizeConfiguration(JobConf jobConf)
@@ -667,6 +672,18 @@ public abstract class MRTask {
     jobConf.setInt(JobContext.TASK_PARTITION,
         taskAttemptId.getTaskID().getId());
     jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
+    
+    jobConf.setBoolean(MRJobConfig.TASK_ISMAP, isMap);
+    
+    Path outputPath = FileOutputFormat.getOutputPath(jobConf);
+    if (outputPath != null) {
+      if ((committer instanceof FileOutputCommitter)) {
+        FileOutputFormat.setWorkOutputPath(jobConf, 
+          ((FileOutputCommitter)committer).getTaskAttemptPath(taskAttemptContext));
+      } else {
+        FileOutputFormat.setWorkOutputPath(jobConf, outputPath);
+      }
+    }
   }
 
   public abstract TezCounter getOutputRecordsCounter();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 164818c..00a054e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -25,12 +25,15 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.MapRunnable;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.split.TezGroupedSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -40,7 +43,7 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.Event;
@@ -89,8 +92,6 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
 
     LOG.info("Running map: " + processorContext.getUniqueIdentifier());
 
-    initTask();
-
     if (inputs.size() != 1
         || outputs.size() != 1) {
       throw new IOException("Cannot handle multiple inputs or outputs"
@@ -100,10 +101,12 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
     LogicalInput in = inputs.values().iterator().next();
     LogicalOutput out = outputs.values().iterator().next();
 
+    initTask(out);
+
     // Sanity check
     if (!(in instanceof MRInputLegacy)) {
       throw new IOException(new TezException(
-          "Only Simple Input supported. Input: " + in.getClass()));
+          "Only MRInputLegacy supported. Input: " + in.getClass()));
     }
     MRInputLegacy input = (MRInputLegacy)in;
     input.init();
@@ -115,10 +118,12 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
     }
 
     KeyValueWriter kvWriter = null;
-    if (!(out instanceof OnFileSortedOutput)) {
-      kvWriter = ((MROutput)out).getWriter();
-    } else {
+    if ((out instanceof MROutputLegacy)) {
+      kvWriter = ((MROutputLegacy)out).getWriter();
+    } else if ((out instanceof OnFileSortedOutput)){
       kvWriter = ((OnFileSortedOutput)out).getWriter();
+    } else {
+      throw new IOException("Illegal output to map: " + in.getClass());
     }
 
     if (useNewApi) {
@@ -127,7 +132,35 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
       runOldMapper(jobConf, mrReporter, input, kvWriter);
     }
 
-    done(out);
+    done();
+  }
+  
+
+  /**
+   * Update the job with details about the file split
+   * @param job the job configuration to update
+   * @param inputSplit the file split
+   */
+  private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
+    if (inputSplit instanceof FileSplit) {
+      FileSplit fileSplit = (FileSplit) inputSplit;
+      job.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
+      job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
+      job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
+    }
+    LOG.info("Processing mapred split: " + inputSplit);
+  }
+  
+  private void updateJobWithSplit(
+      final JobConf job, org.apache.hadoop.mapreduce.InputSplit inputSplit) {
+    if (inputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+      org.apache.hadoop.mapreduce.lib.input.FileSplit fileSplit = 
+          (org.apache.hadoop.mapreduce.lib.input.FileSplit) inputSplit;
+      job.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
+      job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
+      job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
+    }
+    LOG.info("Processing mapreduce split: " + inputSplit);
   }
 
   void runOldMapper(
@@ -141,6 +174,10 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
     // Done only for MRInput.
     // TODO use new method in MRInput to get required info
     //input.initialize(job, master);
+    
+    InputSplit inputSplit = input.getOldInputSplit();
+    
+    updateJobWithSplit(job, inputSplit);
 
     RecordReader in = new OldRecordReader(input);
 
@@ -188,13 +225,15 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
         new NewOutputCollector(out);
 
     org.apache.hadoop.mapreduce.InputSplit split = in.getNewInputSplit();
+    
+    updateJobWithSplit(job, split);
 
     org.apache.hadoop.mapreduce.MapContext
     mapContext =
     new MapContextImpl(
         job, taskAttemptId,
         input, output,
-        getCommitter(),
+        committer,
         processorContext, split, reporter);
 
     org.apache.hadoop.mapreduce.Mapper.Context mapperContext =

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index ecda8c6..bf4f660 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.Event;
@@ -95,8 +95,6 @@ implements LogicalIOProcessor {
 
     LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
 
-    initTask();
-
     if (outputs.size() <= 0 || outputs.size() > 1) {
       throw new IOException("Invalid number of outputs"
           + ", outputCount=" + outputs.size());
@@ -110,6 +108,8 @@ implements LogicalIOProcessor {
     LogicalInput in = inputs.values().iterator().next();
     LogicalOutput out = outputs.values().iterator().next();
 
+    initTask(out);
+
     this.statusUpdate();
 
     Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
@@ -133,12 +133,12 @@ implements LogicalIOProcessor {
     KeyValuesReader kvReader = shuffleInput.getReader();
 
     KeyValueWriter kvWriter = null;
-    if((out instanceof MROutput)) {
-      kvWriter = ((MROutput) out).getWriter();
+    if((out instanceof MROutputLegacy)) {
+      kvWriter = ((MROutputLegacy) out).getWriter();
     } else if ((out instanceof OnFileSortedOutput)) {
       kvWriter = ((OnFileSortedOutput) out).getWriter();
     } else {
-      throw new IOException("Illegal input to reduce: " + in.getClass());
+      throw new IOException("Illegal output to reduce: " + in.getClass());
     }
 
     if (useNewApi) {
@@ -157,7 +157,7 @@ implements LogicalIOProcessor {
           kvReader, comparator, keyClass, valueClass, kvWriter);
     }
 
-    done(out);
+    done();
   }
 
   void runOldReducer(JobConf job,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 2f2ec16..7868ecc 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -52,6 +52,7 @@ import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.MapUtils;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
@@ -161,9 +162,11 @@ public class TestReduceProcessor {
     ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor(
         ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(reduceConf));
     
-    InputSpec reduceInputSpec = new InputSpec(mapVertexName, new InputDescriptor(LocalMergedInput.class.getName()), 1);
-    OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(MROutput.class.getName()), 1);
-    
+    InputSpec reduceInputSpec = new InputSpec(mapVertexName,
+        new InputDescriptor(LocalMergedInput.class.getName()), 1);
+    OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex",
+        new OutputDescriptor(MROutputLegacy.class.getName()), 1);
+
     // Now run a reduce
     TaskSpec taskSpec = new TaskSpec(
         TezTestUtils.getMockTaskAttemptId(0, 1, 0, 0),

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 990dbe6..bb71dfe 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -371,7 +371,7 @@ public class TestMRRJobsDAGApi {
     Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
         ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
         1, Resource.newInstance(256, 1));
-    MRHelpers.addMROutput(stage3Vertex, stage3Payload);
+    MRHelpers.addMROutputLegacy(stage3Vertex, stage3Payload);
 
     Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
     Map<String, String> commonEnv = createCommonEnv();