You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/05/07 22:39:03 UTC

git commit: TEZ-103. Configured Combiner is not being used.

Updated Branches:
  refs/heads/TEZ-1 b102eb1c6 -> 18f0ebd8e


TEZ-103. Configured Combiner is not being used.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/18f0ebd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/18f0ebd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/18f0ebd8

Branch: refs/heads/TEZ-1
Commit: 18f0ebd8e562a01fce6e39d7a0977ce2f8b9d3a1
Parents: b102eb1
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon May 6 18:16:20 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue May 7 13:38:41 2013 -0700

----------------------------------------------------------------------
 .../tez/engine/common/combine/CombineInput.java    |    6 +-
 .../engine/common/shuffle/impl/MergeManager.java   |   55 ++-
 .../engine/common/sort/impl/ExternalSorter.java    |   15 +-
 .../apache/tez/engine/common/sort/impl/IFile.java  |    5 +
 .../engine/common/sort/impl/IFileOutputStream.java |    3 +
 .../common/sort/impl/dflt/DefaultSorter.java       |   12 +-
 .../task/local/output/TezTaskOutputFiles.java      |   11 +-
 .../apache/tez/mapreduce/combine/MRCombiner.java   |  348 +++++++++++++++
 .../org/apache/tez/mapreduce/processor/MRTask.java |    3 +
 9 files changed, 423 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
index 707e54c..bf504bb 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
@@ -121,7 +121,11 @@ public class CombineInput implements Input {
   public void close() throws IOException {
     input.close();
   }
-  
+
+  public TezRawKeyValueIterator getIterator() {
+    return this.input;
+  }
+
   protected class ValueIterator implements Iterator<Object> {
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
index 55860ea..9156f28 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
@@ -373,12 +373,17 @@ public class MergeManager {
     
     CombineOutput combineOut = new CombineOutput(writer);
     combineOut.initialize(conf, reporter);
-    
-    combineProcessor.process(new Input[] {combineIn},
-        new Output[] {combineOut});
-    
-    combineIn.close();
-    combineOut.close();
+
+    try {
+      combineProcessor.process(new Input[] {combineIn},
+          new Output[] {combineOut});
+    } catch (IOException ioe) {
+      try {
+        combineProcessor.close();
+      } catch (IOException ignoredException) {}
+
+      throw ioe;
+    }
   
   }
 
@@ -471,31 +476,33 @@ public class MergeManager {
                                            mergeOutputSize).suffix(
                                                Constants.MERGED_OUTPUT_PREFIX);
 
-      Writer writer = 
-        new Writer(conf, rfs, outputPath,
-                        (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
-                        (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-                        codec, null);
-
-      TezRawKeyValueIterator rIter = null;
+      Writer writer = null;
       try {
+        writer =
+            new Writer(conf, rfs, outputPath,
+                (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+                (Class)ConfigUtils.getIntermediateInputValueClass(conf),
+                codec, null);
+
+        TezRawKeyValueIterator rIter = null;
         LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
-                 " segments...");
-        
+            " segments...");
+
         rIter = TezMerger.merge(conf, rfs,
-                             (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
-                             (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-                             inMemorySegments, inMemorySegments.size(),
-                             new Path(taskAttemptId.toString()),
-                             (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-                             reporter, spilledRecordsCounter, null, null);
-        
+            (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+            (Class)ConfigUtils.getIntermediateInputValueClass(conf),
+            inMemorySegments, inMemorySegments.size(),
+            new Path(taskAttemptId.toString()),
+            (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
+            reporter, spilledRecordsCounter, null, null);
+
         if (null == combineProcessor) {
           TezMerger.writeFile(rIter, writer, reporter, conf);
         } else {
           runCombineProcessor(rIter, writer);
         }
         writer.close();
+        writer = null;
 
         LOG.info(taskAttemptId +  
             " Merge of the " + noInMemorySegments +
@@ -507,6 +514,10 @@ public class MergeManager {
         //earlier when we invoked cloneFileAttributes
         localFS.delete(outputPath, true);
         throw e;
+      } finally {
+        if (writer != null) {
+          writer.close();
+        }
       }
 
       // Note the output of the merge

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
index 6d07a1c..f6af426 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
@@ -193,11 +193,18 @@ public abstract class ExternalSorter {
     CombineOutput combineOut = new CombineOutput(writer);
     combineOut.initialize(job, runningTaskContext.getTaskReporter());
 
-    combineProcessor.process(new Input[] {combineIn},
-        new Output[] {combineOut});
+    try {
+      combineProcessor.process(new Input[] {combineIn},
+          new Output[] {combineOut});
+    } catch (IOException ioe) {
+      try {
+        combineProcessor.close();
+      } catch (IOException ignored) {}
 
-    combineIn.close();
-    combineOut.close();
+      // Do not close output here as the sorter should close the combine output
+
+      throw ioe;
+    }
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
index 161cc5a..db59a13 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -73,6 +74,7 @@ public class IFile {
     boolean ownOutputStream = false;
     long start = 0;
     FSDataOutputStream rawOut;
+    AtomicBoolean closed = new AtomicBoolean(false);
     
     CompressionOutputStream compressedOut;
     Compressor compressor;
@@ -153,6 +155,9 @@ public class IFile {
     }
 
     public void close() throws IOException {
+      if (closed.getAndSet(true)) {
+        throw new IOException("Writer was already closed earlier");
+      }
 
       // When IFile writer is created by BackupStore, we do not have
       // Key and Value classes set. So, check before closing the

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
index 45cf917..75fcd68 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
@@ -101,12 +101,15 @@ public class IFileOutputStream extends FilterOutputStream {
       sum.update(buffer, 0, offset);
       offset = 0;
     }
+    /*
+    // FIXME if needed re-enable this in debug mode
     if (LOG.isDebugEnabled()) {
       LOG.debug("XXX checksum" +
           " b=" + b + " off=" + off + 
           " buffer=" + " offset=" + offset + 
           " len=" + len);
     }
+    */
     /* now we should have len < buffer.length */
     System.arraycopy(b, off, buffer, offset, len);
     offset += len;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
index 75c7de3..0c35760 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
@@ -673,6 +673,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
             spillLock.unlock();
             sortAndSpill();
           } catch (Throwable t) {
+            LOG.warn("ZZZZ: Got an exception in sortAndSpill", t);
             sortSpillException = t;
           } finally {
             spillLock.lock();
@@ -794,6 +795,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
             if (spstart != spindex) {
               TezRawKeyValueIterator kvIter =
                 new MRResultIterator(spstart, spindex);
+              LOG.info("DEBUG: Running combine processor");
               runCombineProcessor(kvIter, writer);
             }
           }
@@ -1052,7 +1054,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       sortPhase.complete();
       return;
     }
-    {
+    else {
       sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
       TezMerger.considerFinalMergeForProgress();
       
@@ -1096,16 +1098,16 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         long segmentStart = finalOut.getPos();
         Writer writer =
             new Writer(job, finalOut, keyClass, valClass, codec,
-                             spilledRecordsCounter);
+                spilledRecordsCounter);
         if (combineProcessor == null || numSpills < minSpillsForCombine) {
-          TezMerger.writeFile(kvIter, writer, runningTaskContext.getTaskReporter(), job);
-          writer.close();
+          TezMerger.writeFile(kvIter, writer,
+              runningTaskContext.getTaskReporter(), job);
         } else {
           runCombineProcessor(kvIter, writer);
         }
+        writer.close();
 
         sortPhase.startNextPhase();
-        
         // record offsets
         final TezIndexRecord rec = 
             new TezIndexRecord(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
index c925367..eeca130 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
@@ -20,6 +20,8 @@ package org.apache.tez.engine.common.task.local.output;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +44,7 @@ import org.apache.tez.engine.records.TezTaskID;
 @InterfaceStability.Unstable
 public class TezTaskOutputFiles extends TezTaskOutput {
 
+  private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class);
   private Configuration conf;
 
   private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
@@ -56,9 +59,11 @@ public class TezTaskOutputFiles extends TezTaskOutput {
     new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
 
   private Path getAttemptOutputDir() {
-    System.err.println("getAttemptOutputDir: " + 
-        Constants.TASK_OUTPUT_DIR + "/" + conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
-    return new Path(Constants.TASK_OUTPUT_DIR, conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+    LOG.info("DEBUG: getAttemptOutputDir: "
+        + Constants.TASK_OUTPUT_DIR + "/"
+        + conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+    return new Path(Constants.TASK_OUTPUT_DIR,
+        conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
new file mode 100644
index 0000000..42eddee
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.combine;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.combine.CombineInput;
+import org.apache.tez.engine.common.combine.CombineOutput;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+public class MRCombiner implements Processor {
+
+  private static Log LOG = LogFactory.getLog(MRCombiner.class);
+
+  JobConf jobConf;
+  boolean useNewApi;
+
+  private final MRTask task;
+
+  private Counter combinerInputKeyCounter;
+  private Counter combinerInputValueCounter;
+  private Progress combinePhase;
+
+  public MRCombiner(MRTask task) {
+    this.task = task;
+  }
+
+  @Override
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+    if (conf instanceof JobConf) {
+      jobConf = (JobConf)conf;
+    } else {
+      jobConf = new JobConf(conf);
+    }
+    useNewApi = jobConf.getUseNewMapper();
+  }
+
+  @Override
+  public void process(Input[] in, Output[] out) throws IOException,
+      InterruptedException {
+    LOG.info("DEBUG: Running MRCombiner"
+        + ", usingNewAPI=" + useNewApi);
+
+    CombineInput input = (CombineInput)in[0];
+    CombineOutput output = (CombineOutput)out[0];
+
+    combinePhase  = task.getProgress().addPhase("combine");
+
+    Class<?> keyClass = ConfigUtils.getIntermediateOutputKeyClass(jobConf);
+    Class<?> valueClass = ConfigUtils.getIntermediateOutputValueClass(jobConf);
+    LOG.info("Using combineKeyClass: " + keyClass);
+    LOG.info("Using combineValueClass: " + valueClass);
+    RawComparator<?> comparator =
+        ConfigUtils.getIntermediateOutputKeyComparator(jobConf);
+    LOG.info("Using combineComparator: " + comparator);
+
+    combinerInputKeyCounter =
+        task.getMRReporter().getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    combinerInputValueCounter =
+        task.getMRReporter().getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+    if (useNewApi) {
+      try {
+        runNewCombiner(this.jobConf,
+            task.getUmbilical(),
+            task.getMRReporter(),
+            input, comparator, keyClass, valueClass, output);
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException(cnfe);
+      }
+    } else {
+      runOldCombiner(this.jobConf,
+          task.getUmbilical(),
+          task.getMRReporter(),
+          input,
+          comparator, keyClass, valueClass,
+          output);
+    }
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private void runOldCombiner(JobConf job,
+        TezTaskUmbilicalProtocol umbilical,
+        final MRTaskReporter reporter,
+        CombineInput input,
+        RawComparator comparator,
+        Class keyClass,
+        Class valueClass,
+        final Output output) throws IOException, InterruptedException {
+
+    Reducer combiner =
+        ReflectionUtils.newInstance(job.getCombinerClass(), job);
+
+    // make output collector
+
+    OutputCollector collector =
+        new OutputCollector() {
+      public void collect(Object key, Object value)
+          throws IOException {
+        try {
+          output.write(key, value);
+        } catch (InterruptedException ie) {
+          throw new IOException(ie);
+        }
+      }
+    };
+
+    // apply combiner function
+    CombinerValuesIterator values =
+        new CombinerValuesIterator(input,
+            comparator, keyClass, valueClass, job, reporter,
+            combinerInputValueCounter, combinePhase);
+
+    values.informReduceProgress();
+    while (values.more()) {
+      combinerInputKeyCounter.increment(1);
+      combiner.reduce(values.getKey(), values, collector, reporter);
+      values.nextKey();
+      values.informReduceProgress();
+    }
+  }
+
+  private static final class CombinerValuesIterator<KEY,VALUE>
+  extends org.apache.tez.engine.common.task.impl.ValuesIterator<KEY,VALUE> {
+    private Counter combineInputValueCounter;
+    private Progress combinePhase;
+
+    public CombinerValuesIterator (CombineInput in,
+        RawComparator<KEY> comparator,
+        Class<KEY> keyClass,
+        Class<VALUE> valClass,
+        Configuration conf, Progressable reporter,
+        Counter combineInputValueCounter,
+        Progress combinePhase)
+            throws IOException {
+      super(in.getIterator(), comparator, keyClass, valClass, conf, reporter);
+      this.combineInputValueCounter = combineInputValueCounter;
+      this.combinePhase = combinePhase;
+    }
+
+    @Override
+    public VALUE next() {
+      combineInputValueCounter.increment(1);
+      return moveToNext();
+    }
+
+    protected VALUE moveToNext() {
+      return super.next();
+    }
+
+    public void informReduceProgress() {
+      combinePhase.set(super.in.getProgress().getProgress()); // update progress
+      reporter.progress();
+    }
+  }
+
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private void runNewCombiner(JobConf job,
+      final TezTaskUmbilicalProtocol umbilical,
+      final MRTaskReporter reporter,
+      CombineInput input,
+      RawComparator comparator,
+      Class keyClass,
+      Class valueClass,
+      final Output out
+      ) throws IOException,InterruptedException,
+      ClassNotFoundException {
+    // wrap value iterator to report progress.
+    final TezRawKeyValueIterator rawIter = input.getIterator();
+    TezRawKeyValueIterator rIter = new TezRawKeyValueIterator() {
+      public void close() throws IOException {
+        rawIter.close();
+      }
+      public DataInputBuffer getKey() throws IOException {
+        return rawIter.getKey();
+      }
+      public Progress getProgress() {
+        return rawIter.getProgress();
+      }
+      public DataInputBuffer getValue() throws IOException {
+        return rawIter.getValue();
+      }
+      public boolean next() throws IOException {
+        boolean ret = rawIter.next();
+        // FIXME progress updates for combiner
+        // reporter.setProgress(rawIter.getProgress().getProgress());
+        return ret;
+      }
+    };
+
+    // make a task context so we can get the classes
+    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+        new TaskAttemptContextImpl(job, task.getTaskAttemptId(), reporter);
+
+    // make a reducer
+    org.apache.hadoop.mapreduce.Reducer reducer =
+        (org.apache.hadoop.mapreduce.Reducer)
+        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
+
+    org.apache.hadoop.mapreduce.RecordWriter trackedRW =
+        new org.apache.hadoop.mapreduce.RecordWriter() {
+
+          @Override
+          public void write(Object key, Object value) throws IOException,
+              InterruptedException {
+            out.write(key, value);
+          }
+
+          @Override
+          public void close(TaskAttemptContext context) throws IOException,
+              InterruptedException {
+            // Should not close this here as the sorter will close the
+            // combine output
+          }
+        };
+
+    org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
+        createReduceContext(
+            reducer, job, task.getTaskAttemptId(),
+            rIter, combinerInputKeyCounter,
+            combinerInputValueCounter,
+            trackedRW,
+            null,
+            reporter, comparator, keyClass,
+            valueClass);
+    reducer.run(reducerContext);
+    trackedRW.close(reducerContext);
+  }
+
+  @Override
+  public void close() throws IOException, InterruptedException {
+  }
+
+  protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+  createReduceContext(org.apache.hadoop.mapreduce.Reducer
+                        <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
+                      Configuration job,
+                      TezTaskAttemptID taskId,
+                      final TezRawKeyValueIterator rIter,
+                      org.apache.hadoop.mapreduce.Counter inputKeyCounter,
+                      org.apache.hadoop.mapreduce.Counter inputValueCounter,
+                      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
+                      org.apache.hadoop.mapreduce.OutputCommitter committer,
+                      org.apache.hadoop.mapreduce.StatusReporter reporter,
+                      RawComparator<INKEY> comparator,
+                      Class<INKEY> keyClass, Class<INVALUE> valueClass
+  ) throws IOException, InterruptedException {
+    RawKeyValueIterator r =
+        new RawKeyValueIterator() {
+
+          @Override
+          public boolean next() throws IOException {
+            return rIter.next();
+          }
+
+          @Override
+          public DataInputBuffer getValue() throws IOException {
+            return rIter.getValue();
+          }
+
+          @Override
+          public Progress getProgress() {
+            return rIter.getProgress();
+          }
+
+          @Override
+          public DataInputBuffer getKey() throws IOException {
+            return rIter.getKey();
+          }
+
+          @Override
+          public void close() throws IOException {
+            rIter.close();
+          }
+        };
+    org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
+    reduceContext =
+      new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
+          job,
+          IDConverter.toMRTaskAttemptId(taskId),
+          r,
+          inputKeyCounter,
+          inputValueCounter,
+          output,
+          committer,
+          reporter,
+          comparator,
+          keyClass,
+          valueClass);
+    LOG.info("DEBUG: Using combineKeyClass: "
+          + keyClass + ", combineValueClass: " + valueClass);
+
+    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+        reducerContext = new
+          WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
+              reduceContext);
+
+    return reducerContext;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 17fab1b..d17e477 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -68,6 +68,7 @@ import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.engine.records.OutputContext;
 import org.apache.tez.engine.records.TezDAGID;
 import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.combine.MRCombiner;
 import org.apache.tez.mapreduce.hadoop.IDConverter;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.hadoop.MRTaskStatus;
@@ -216,6 +217,8 @@ extends RunningTaskContext {
     
     partitioner = new MRPartitioner(this);
     ((MRPartitioner)partitioner).initialize(job, getTaskReporter());
+    combineProcessor = new MRCombiner(this);
+    combineProcessor.initialize(job, getTaskReporter());
 
     localizeConfiguration(jobConf);
   }