You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/21 00:44:21 UTC

git commit: TEZ-433. Change Combiner to work with new APIs (part of TEZ-398). (sseth)

Updated Branches:
  refs/heads/TEZ-398 85a9d46e3 -> eb0f6ffe5


TEZ-433. Change Combiner to work with new APIs (part of TEZ-398).
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/eb0f6ffe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/eb0f6ffe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/eb0f6ffe

Branch: refs/heads/TEZ-398
Commit: eb0f6ffe5250b497f35909f8559682cbaa3621ae
Parents: 85a9d46
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Sep 20 15:43:56 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Sep 20 15:43:56 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezJobConfig.java     |   5 +
 .../apache/tez/engine/common/ConfigUtils.java   |   4 +
 .../tez/engine/common/TezEngineUtils.java       |  39 +++
 .../tez/engine/common/ValuesIterator.java       |   2 +
 .../tez/engine/common/combine/Combiner.java     |  43 ++++
 .../common/shuffle/impl/MergeManager.java       |  41 +---
 .../tez/engine/common/shuffle/impl/Shuffle.java |  10 +-
 .../engine/common/sort/impl/ExternalSorter.java |  45 +---
 .../common/sort/impl/PipelinedSorter.java       |  15 +-
 .../common/sort/impl/dflt/DefaultSorter.java    |   4 +-
 .../engine/common/task/impl/ValuesIterator.java |   9 +-
 .../engine/lib/input/ShuffledMergedInput.java   |   1 -
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  |  13 +
 .../hadoop/MultiStageMRConfToTezTranslator.java |  19 +-
 .../tez/mapreduce/newcombine/MRCombiner.java    | 242 +++++++++++++++++++
 .../mapreduce/newpartition/MRPartitioner.java   |   3 +-
 .../mapreduce/newprocessor/MRTaskReporter.java  |   7 +
 17 files changed, 411 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index 12c2b4b..7c4540c 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -128,6 +128,11 @@ public class TezJobConfig {
    */
   public static final String TEZ_ENGINE_PARTITIONER_CLASS = "tez.engine.partitioner.class";
   
+  /**
+   * Specifies a combiner class (primarily for Shuffle)
+   */
+  public static final String TEZ_ENGINE_COMBINER_CLASS = "tez.engine.combiner.class";
+  
   public static final String TEZ_ENGINE_NUM_EXPECTED_PARTITIONS = "tez.engine.num.expected.partitions";
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
index a92cf1b..f73adfd 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
@@ -140,5 +140,9 @@ public class ConfigUtils {
 
     return ReflectionUtils.newInstance(theClass, conf);
   }
+  
+  public static boolean useNewApi(Configuration conf) {
+    return conf.getBoolean("mapred.mapper.new-api", false);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
index f352e08..3920ce6 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
@@ -29,9 +29,11 @@ import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.engine.api.Partitioner;
+import org.apache.tez.engine.common.combine.Combiner;
 import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
 import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
 import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.TezTaskContext;
 
 public class TezEngineUtils {
 
@@ -55,6 +57,43 @@ public class TezEngineUtils {
   }
 
   @SuppressWarnings("unchecked")
+  public static Combiner instantiateCombiner(Configuration conf, TezTaskContext taskContext) throws IOException {
+    Class<? extends Combiner> clazz;
+    String className = conf.get(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS);
+    if (className == null) {
+      LOG.info("No combiner specified via " + TezJobConfig.TEZ_ENGINE_COMBINER_CLASS + ". Combiner will not be used");
+      return null;
+    }
+    LOG.info("Using Combiner class: " + className);
+    try {
+      clazz = (Class<? extends Combiner>) conf.getClassByName(className);
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Unable to load combiner class: " + className);
+    }
+    
+    Combiner combiner = null;
+    
+      Constructor<? extends Combiner> ctor;
+      try {
+        ctor = clazz.getConstructor(TezTaskContext.class);
+        combiner = ctor.newInstance(taskContext);
+      } catch (SecurityException e) {
+        throw new IOException(e);
+      } catch (NoSuchMethodException e) {
+        throw new IOException(e);
+      } catch (IllegalArgumentException e) {
+        throw new IOException(e);
+      } catch (InstantiationException e) {
+        throw new IOException(e);
+      } catch (IllegalAccessException e) {
+        throw new IOException(e);
+      } catch (InvocationTargetException e) {
+        throw new IOException(e);
+      }
+      return combiner;
+  }
+  
+  @SuppressWarnings("unchecked")
   public static Partitioner instantiatePartitioner(Configuration conf)
       throws IOException {
     Class<? extends Partitioner> clazz;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
index a33d00b..b7867aa 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
@@ -101,6 +101,8 @@ public class ValuesIterator<KEY,VALUE> {
     return key; 
   }
   
+  // TODO NEWTEZ Maybe add another method which returns an iterator instead of iterable
+  
   public Iterable<VALUE> getValues() {
     return new Iterable<VALUE>() {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java
new file mode 100644
index 0000000..6f86d61
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.combine;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTaskContext;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+/**
+ *<b>Combiner Initialization</b></p> The Combiner class is picked up
+ * using the TEZ_ENGINE_COMBINER_CLASS attribute in {@link TezJobConfig}
+ * 
+ * 
+ * Partitioners need to provide a single argument ({@link TezTaskContext})
+ * constructor.
+ */
+@Unstable
+@LimitedPrivate("mapreduce")
+public interface Combiner {
+  public void combine(TezRawKeyValueIterator rawIter, Writer writer)
+      throws InterruptedException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
index 093a293..ad9bb5f 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
@@ -48,6 +48,7 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.InputAttemptIdentifier;
+import org.apache.tez.engine.common.combine.Combiner;
 import org.apache.tez.engine.common.sort.impl.IFile;
 import org.apache.tez.engine.common.sort.impl.TezMerger;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
@@ -55,7 +56,6 @@ import org.apache.tez.engine.common.sort.impl.IFile.Writer;
 import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
 import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
 import org.apache.tez.engine.hadoop.compat.NullProgressable;
-import org.apache.tez.engine.newapi.Processor;
 import org.apache.tez.engine.newapi.TezInputContext;
 
 @InterfaceAudience.Private
@@ -72,7 +72,7 @@ public class MergeManager {
   
   private final  TezTaskOutputFiles mapOutputFile;
   private final Progressable nullProgressable = new NullProgressable();
-  private final Processor combineProcessor = null; // TODO NEWTEZ Fix CombineProcessor  
+  private final Combiner combiner;  
   
   Set<MapOutput> inMemoryMergedMapOutputs = 
     new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
@@ -98,12 +98,6 @@ public class MergeManager {
   private final ExceptionReporter exceptionReporter;
   
   private final TezInputContext inputContext;
-  
-  /**
-   * Combiner processor to run during in-memory merge, if defined.
-   */
-  // TODO NEWTEZ Fix Combiner
-  //private final Processor combineProcessor;
 
   private final TezCounter spilledRecordsCounter;
 
@@ -119,18 +113,18 @@ public class MergeManager {
                       FileSystem localFS,
                       LocalDirAllocator localDirAllocator,  
                       TezInputContext inputContext,
-                      Processor combineProcessor,
+                      Combiner combiner,
                       TezCounter spilledRecordsCounter,
                       TezCounter reduceCombineInputCounter,
                       TezCounter mergedMapOutputsCounter,
                       ExceptionReporter exceptionReporter) {
-    // TODO NEWTEZ Change to include Combiner
     this.inputContext = inputContext;
     this.conf = conf;
     this.localDirAllocator = localDirAllocator;
     this.exceptionReporter = exceptionReporter;
     
-    //this.combineProcessor = combineProcessor;
+    this.combiner = combiner;
+
     this.reduceCombineInputCounter = reduceCombineInputCounter;
     this.spilledRecordsCounter = spilledRecordsCounter;
     this.mergedMapOutputsCounter = mergedMapOutputsCounter;
@@ -370,27 +364,8 @@ public class MergeManager {
   }
    
   void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
-  throws IOException, InterruptedException {
-
-    // TODO NEWTEZ Fix CombineProcessor
-    
-//    CombineInput combineIn = new CombineInput(kvIter);
-//    combineIn.initialize(conf, reporter);
-//    
-//    CombineOutput combineOut = new CombineOutput(writer);
-//    combineOut.initialize(conf, reporter);
-//
-//    try {
-//      combineProcessor.process(new Input[] {combineIn},
-//          new Output[] {combineOut});
-//    } catch (IOException ioe) {
-//      try {
-//        combineProcessor.close();
-//      } catch (IOException ignoredException) {}
-//
-//      throw ioe;
-//    }
-  
+      throws IOException, InterruptedException {
+    combiner.combine(kvIter, writer);
   }
 
   private class IntermediateMemoryToMemoryMerger 
@@ -500,7 +475,7 @@ public class MergeManager {
             (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
             nullProgressable, spilledRecordsCounter, null, null);
 
-        if (null == combineProcessor) {
+        if (null == combiner) {
           TezMerger.writeFile(rIter, writer, nullProgressable, conf);
         } else {
           runCombineProcessor(rIter, writer);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
index 620c620..f605b7c 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
@@ -38,6 +38,8 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.TezEngineUtils;
+import org.apache.tez.engine.common.combine.Combiner;
 import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.engine.newapi.Event;
@@ -67,11 +69,10 @@ public class Shuffle implements ExceptionReporter {
   private final SecretKey jobTokenSecret;
   private AtomicInteger reduceRange = new AtomicInteger(
       TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT);
-  
+
   private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
 
   public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
-    // TODO NEWTEZ Handle Combiner
     this.inputContext = inputContext;
     this.conf = conf;
     this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
@@ -84,6 +85,8 @@ public class Shuffle implements ExceptionReporter {
         .getJobTokenSecretFromTokenBytes(inputContext
             .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
     
+    Combiner combiner = TezEngineUtils.instantiateCombiner(conf, inputContext);
+    
     FileSystem localFS = FileSystem.getLocal(this.conf);
     LocalDirAllocator localDirAllocator = 
         new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
@@ -123,7 +126,7 @@ public class Shuffle implements ExceptionReporter {
           localFS,
           localDirAllocator,
           inputContext,
-          null, // TODO NEWTEZ Fix Combiner
+          combiner,
           spilledRecordsCounter,
           reduceCombineInputCounter,
           mergedMapOutputsCounter,
@@ -272,5 +275,4 @@ public class Shuffle implements ExceptionReporter {
       throw e; 
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
index 8b20192..1b5e015 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
@@ -43,15 +43,14 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.api.Partitioner;
-import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.TezEngineUtils;
+import org.apache.tez.engine.common.combine.Combiner;
 import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
 import org.apache.tez.engine.common.sort.impl.IFile.Writer;
 import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
 import org.apache.tez.engine.hadoop.compat.NullProgressable;
 import org.apache.tez.engine.newapi.TezOutputContext;
-import org.apache.tez.engine.records.OutputContext;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public abstract class ExternalSorter {
@@ -66,7 +65,7 @@ public abstract class ExternalSorter {
 
   protected Progressable nullProgressable = new NullProgressable();
   protected TezOutputContext outputContext;
-  protected Processor combineProcessor;
+  protected Combiner combiner;
   protected Partitioner partitioner;
   protected Configuration conf;
   protected FileSystem rfs;
@@ -84,9 +83,6 @@ public abstract class ExternalSorter {
   // Compression for map-outputs
   protected CompressionCodec codec;
 
-  // TODO NEWTEZ Setup CombineProcessor
-  // TODO NEWTEZ Setup Partitioner in SimpleOutput
-
   // Counters
   // TODO TEZ Rename all counter variables [Mapping of counter to MR for compatibility in the MR layer]
   protected TezCounter mapOutputByteCounter;
@@ -139,12 +135,7 @@ public abstract class ExternalSorter {
     LOG.info("Instantiating Partitioner: [" + conf.get(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS) + "]");
     this.conf.setInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, this.partitions);
     this.partitioner = TezEngineUtils.instantiatePartitioner(this.conf);
-  }
-
-  // TODO NEWTEZ Add an interface (! Processor) for CombineProcessor, which MR tasks can initialize and set.
-  // Alternately add a config key with a classname, which is easy to initialize.
-  public void setCombiner(Processor combineProcessor) {
-    this.combineProcessor = combineProcessor;
+    this.combiner = TezEngineUtils.instantiateCombiner(this.conf, outputContext);
   }
 
   /**
@@ -165,27 +156,11 @@ public abstract class ExternalSorter {
 
   protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
       Writer writer) throws IOException {
-
-    // TODO NEWTEZ Fix Combiner.
-//    CombineInput combineIn = new CombineInput(kvIter);
-//    combineIn.initialize(job, runningTaskContext.getTaskReporter());
-//
-//    CombineOutput combineOut = new CombineOutput(writer);
-//    combineOut.initialize(job, runningTaskContext.getTaskReporter());
-//
-//    try {
-//      combineProcessor.process(new Input[] {combineIn},
-//          new Output[] {combineOut});
-//    } catch (IOException ioe) {
-//      try {
-//        combineProcessor.close();
-//      } catch (IOException ignored) {}
-//
-//      // Do not close output here as the sorter should close the combine output
-//
-//      throw ioe;
-//    }
-
+    try {
+      combiner.combine(kvIter, writer);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
   }
 
   /**
@@ -216,8 +191,4 @@ public abstract class ExternalSorter {
   public ShuffleHeader getShuffleHeader(int reduce) {
     throw new UnsupportedOperationException("getShuffleHeader isn't supported!");
   }
-
-  public OutputContext getOutputContext() {
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
index bafbd4d..952568e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
@@ -45,10 +45,9 @@ import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progress;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.newapi.TezOutputContext;
-import org.apache.tez.engine.records.OutputContext;
 import org.apache.tez.engine.common.sort.impl.IFile.Writer;
 import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
+import org.apache.tez.engine.newapi.TezOutputContext;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class PipelinedSorter extends ExternalSorter {
@@ -270,7 +269,7 @@ public class PipelinedSorter extends ExternalSorter {
           new Writer(conf, out, keyClass, valClass, codec,
               spilledRecordsCounter);
         writer.setRLE(merger.needsRLE());
-        if (combineProcessor == null) {
+        if (combiner == null) {
           while(kvIter.next()) {
             writer.append(kvIter.getKey(), kvIter.getValue());
           }
@@ -380,10 +379,10 @@ public class PipelinedSorter extends ExternalSorter {
           new Writer(conf, finalOut, keyClass, valClass, codec,
                            spilledRecordsCounter);
       writer.setRLE(merger.needsRLE());
-      if (combineProcessor == null || numSpills < minSpillsForCombine) {
+      if (combiner == null || numSpills < minSpillsForCombine) {
         TezMerger.writeFile(kvIter, writer, nullProgressable, conf);
       } else {
-    	runCombineProcessor(kvIter, writer);
+        runCombineProcessor(kvIter, writer);
       }
 
       //close
@@ -930,10 +929,4 @@ public class PipelinedSorter extends ExternalSorter {
     }
 
   }
-
-  @Override
-  public OutputContext getOutputContext() {
-    return null;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
index b1e17e7..1ad31f7 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
@@ -752,7 +752,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           long segmentStart = out.getPos();
           writer = new Writer(conf, out, keyClass, valClass, codec,
                                     spilledRecordsCounter);
-          if (combineProcessor == null) {
+          if (combiner == null) {
             // spill directly
             DataInputBuffer key = new DataInputBuffer();
             while (spindex < mend &&
@@ -1082,7 +1082,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         Writer writer =
             new Writer(conf, finalOut, keyClass, valClass, codec,
                 spilledRecordsCounter);
-        if (combineProcessor == null || numSpills < minSpillsForCombine) {
+        if (combiner == null || numSpills < minSpillsForCombine) {
           TezMerger.writeFile(kvIter, writer,
               nullProgressable, conf);
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
index 126c5b2..841e54d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
@@ -30,7 +30,14 @@ import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.util.Progressable;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
 
-/** Iterates values while keys match in sorted input. */
+
+/**
+ * Iterates values while keys match in sorted input.
+ *
+ * Usage: Call moveToNext to move to the next k, v pair. This returns true if another exists,
+ * followed by getKey() and getValues() to get the current key and list of values.
+ * 
+ */
 public class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
   protected TezRawKeyValueIterator in; //input iterator
   private KEY key;               // current key

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
index 0732e20..91bb6d5 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
@@ -165,7 +165,6 @@ public class ShuffledMergedInput implements LogicalInput {
 
   }
 
-
   // This functionality is currently broken. If there's inputs which need to be
   // written to disk, there's a possibility that inputs from the different
   // sources could clobber each others' output. Also the current structures do

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 2f4a62a..7a9b7e0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -59,6 +59,7 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.newcombine.MRCombiner;
 import org.apache.tez.mapreduce.newpartition.MRPartitioner;
 
 
@@ -367,6 +368,18 @@ public class MRHelpers {
 
     // TODO eventually ACLs
     conf.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName());
+    
+    boolean useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
+    if (useNewApi) {
+      if (conf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null) {
+        conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName());
+      }
+    } else {
+      if (conf.get("mapred.combiner.class") != null) {
+        conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName());
+      }
+    }
+    
     setWorkingDirectory(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
index 6b68e95..ad231b3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.mapreduce.hadoop;
 
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -31,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys.MultiStageKeys;
+import org.apache.tez.mapreduce.newcombine.MRCombiner;
 import org.apache.tez.mapreduce.newpartition.MRPartitioner;
 
 import com.google.common.base.Preconditions;
@@ -220,22 +220,39 @@ public class MultiStageMRConfToTezTranslator {
     // Assuming no 0 map jobs, and the first stage is always a map.
     int numStages = numIntermediateStages + (hasFinalReduceStage ? 2 : 1);
 
+    // Setup Tez partitioner class
     conf.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS,
         MRPartitioner.class.getName());
+    
+    // Setup Tez Combiner class if required.
+    // This would already have been set since the call is via JobClient
+    boolean useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
+    if (useNewApi) {
+      if (conf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null) {
+        conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName());
+      }
+    } else {
+      if (conf.get("mapred.combiner.class") != null) {
+        conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName());
+      }
+    }
 
     Configuration confs[] = new Configuration[numStages];
     Configuration nonItermediateConf = MultiStageMRConfigUtil.extractStageConf(
         conf, "");
+    confs[0].setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
     if (numStages == 1) {
       confs[0] = nonItermediateConf;
     } else {
       confs[0] = nonItermediateConf;
       confs[numStages - 1] = new Configuration(nonItermediateConf);
+      confs[numStages -1].setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
     }
     if (numStages > 2) {
       for (int i = 1; i < numStages - 1; i++) {
         confs[i] = MultiStageMRConfigUtil.extractStageConf(conf,
             MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i, ""));
+        confs[i].setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
       }
     }
     return confs;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java
new file mode 100644
index 0000000..788019a
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.newcombine;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.ValuesIterator;
+import org.apache.tez.engine.common.combine.Combiner;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.impl.TezTaskContextImpl;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
+import org.apache.tez.mapreduce.newprocessor.MRTaskReporter;
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class MRCombiner implements Combiner {
+
+  private static Log LOG = LogFactory.getLog(MRCombiner.class);
+  
+  private final Configuration conf;
+  private final Class<?> keyClass;
+  private final Class<?> valClass;
+  private final RawComparator<?> comparator;
+  private final boolean useNewApi;
+  
+  private final TezCounter combineInputKeyCounter;
+  private final TezCounter combineInputValueCounter;
+  
+  private final MRTaskReporter reporter;
+  private final TaskAttemptID mrTaskAttemptID;
+
+  public MRCombiner(TezTaskContextImpl taskContext) throws IOException {
+    this.conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+
+    assert(taskContext instanceof TezInputContext || taskContext instanceof TezOutputContext);
+    if (taskContext instanceof TezOutputContext) {
+      this.keyClass = ConfigUtils.getIntermediateOutputKeyClass(conf);
+      this.valClass = ConfigUtils.getIntermediateOutputKeyClass(conf);
+      this.comparator = ConfigUtils.getIntermediateOutputKeyComparator(conf);
+      this.reporter = new MRTaskReporter((TezOutputContext)taskContext);
+    } else {
+      this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+      this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+      this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
+      this.reporter = new MRTaskReporter((TezInputContext)taskContext);
+    }
+
+    this.useNewApi = ConfigUtils.useNewApi(conf);
+    
+    combineInputKeyCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    combineInputValueCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+    
+    boolean isMap = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR,false);
+    this.mrTaskAttemptID = new TaskAttemptID(
+        new TaskID(String.valueOf(taskContext.getApplicationId()
+            .getClusterTimestamp()), taskContext.getApplicationId().getId(),
+            isMap ? TaskType.MAP : TaskType.REDUCE,
+            taskContext.getTaskIndex()), taskContext.getTaskAttemptNumber());
+    
+    LOG.info("Using combineKeyClass: " + keyClass + ", combineValueClass: " + valClass + ", combineComparator: " +comparator + ", useNewApi: " + useNewApi);
+  }
+
+  @Override
+  public void combine(TezRawKeyValueIterator rawIter, Writer writer)
+      throws InterruptedException, IOException {
+    if (useNewApi) {
+      runNewCombiner(rawIter, writer);
+    } else {
+      runOldCombiner(rawIter, writer);
+    }
+    
+  }
+
+  ///////////////// Methods for old API //////////////////////
+  
+  private void runOldCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws IOException {
+    Class<? extends Reducer> reducerClazz = (Class<? extends Reducer>) conf.getClass("mapred.combiner.class", null, Reducer.class);
+    
+    Reducer combiner = ReflectionUtils.newInstance(reducerClazz, conf);
+    
+    OutputCollector collector = new OutputCollector() {
+      @Override
+      public void collect(Object key, Object value) throws IOException {
+        writer.append(key, value);
+      }
+    };
+    
+    CombinerValuesIterator values = new CombinerValuesIterator(rawIter, keyClass, valClass, comparator);
+    
+    while (values.moveToNext()) {
+      combiner.reduce(values.getKey(), values.getValues().iterator(), collector, reporter);
+    }
+  }
+  
+  private final class CombinerValuesIterator<KEY,VALUE> extends ValuesIterator<KEY, VALUE> {
+    public CombinerValuesIterator(TezRawKeyValueIterator rawIter,
+        Class<KEY> keyClass, Class<VALUE> valClass,
+        RawComparator<KEY> comparator) throws IOException {
+      super(rawIter, comparator, keyClass, valClass, conf,
+          combineInputKeyCounter, combineInputValueCounter);
+    }
+  }
+  
+  ///////////////// End of methods for old API //////////////////////
+  
+  ///////////////// Methods for new API //////////////////////
+  
+  private void runNewCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws InterruptedException, IOException {
+    
+    RecordWriter recordWriter = new RecordWriter() {
+
+      @Override
+      public void write(Object key, Object value) throws IOException,
+          InterruptedException {
+        writer.append(key, value);
+      }
+
+      @Override
+      public void close(TaskAttemptContext context) throws IOException,
+          InterruptedException {
+        // Will be closed by whoever invokes the combiner.
+      }
+    };
+    
+    Class<? extends org.apache.hadoop.mapreduce.Reducer> reducerClazz = (Class<? extends org.apache.hadoop.mapreduce.Reducer>) conf
+        .getClass(MRJobConfig.COMBINE_CLASS_ATTR, null,
+            org.apache.hadoop.mapreduce.Reducer.class);
+    org.apache.hadoop.mapreduce.Reducer reducer = ReflectionUtils.newInstance(reducerClazz, conf);
+    
+    org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
+        createReduceContext(
+            conf,
+            mrTaskAttemptID,
+            rawIter,
+            new MRCounters.MRCounter(combineInputKeyCounter),
+            new MRCounters.MRCounter(combineInputValueCounter),
+            recordWriter,
+            reporter,
+            (RawComparator)comparator,
+            keyClass,
+            valClass);
+    
+    reducer.run(reducerContext);
+    recordWriter.close(reducerContext);
+  }
+
+  private static <KEYIN, VALUEIN, KEYOUT, VALUEOUT> org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
+      Configuration conf,
+      TaskAttemptID mrTaskAttemptID,
+      final TezRawKeyValueIterator rawIter,
+      Counter combineInputKeyCounter,
+      Counter combineInputValueCounter,
+      RecordWriter<KEYOUT, VALUEOUT> recordWriter,
+      MRTaskReporter reporter,
+      RawComparator<KEYIN> comparator,
+      Class<KEYIN> keyClass,
+      Class<VALUEIN> valClass) throws InterruptedException, IOException {
+
+    RawKeyValueIterator r = new RawKeyValueIterator() {
+
+      @Override
+      public boolean next() throws IOException {
+        return rawIter.next();
+      }
+
+      @Override
+      public DataInputBuffer getValue() throws IOException {
+        return rawIter.getValue();
+      }
+
+      @Override
+      public Progress getProgress() {
+        return rawIter.getProgress();
+      }
+
+      @Override
+      public DataInputBuffer getKey() throws IOException {
+        return rawIter.getKey();
+      }
+
+      @Override
+      public void close() throws IOException {
+        rawIter.close();
+      }
+    };
+
+    ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> rContext = new ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
+        conf, mrTaskAttemptID, r, combineInputKeyCounter,
+        combineInputValueCounter, recordWriter, null, reporter, comparator,
+        keyClass, valClass);
+
+    org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext = new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
+        .getReducerContext(rContext);
+    return reducerContext;
+  }
+
+  
+ 
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
index 4a967ad..dcea35c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -38,7 +39,7 @@ public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
   private org.apache.hadoop.mapred.Partitioner oldPartitioner;
 
   public MRPartitioner(Configuration conf) {
-    this.useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
+    this.useNewApi = ConfigUtils.useNewApi(conf);
     this.partitions = conf.getInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, 1);
 
     if (useNewApi) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb0f6ffe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
index d01e562..c7c9567 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.newapi.TezInputContext;
 import org.apache.tez.engine.newapi.TezOutputContext;
 import org.apache.tez.engine.newapi.TezProcessorContext;
 import org.apache.tez.engine.newapi.TezTaskContext;
@@ -53,6 +54,12 @@ public class MRTaskReporter
     this.reporter = new MRReporter(context);
     this.isProcessorContext = false;
   }
+  
+  public MRTaskReporter(TezInputContext context) {
+    this.context= context;
+    this.reporter = new MRReporter(context);
+    this.isProcessorContext = false;
+  }
 
   public void setProgress(float progress) {
     if (isProcessorContext) {