You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/17 00:42:46 UTC

git commit: TEZ-455. Fix local dir setup and committer setup for MRTask (part of TEZ-398). (sseth)

Updated Branches:
  refs/heads/TEZ-398 c5ce1aba0 -> e7b591d79


TEZ-455. Fix local dir setup and committer setup for MRTask (part of
TEZ-398). (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e7b591d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e7b591d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e7b591d7

Branch: refs/heads/TEZ-398
Commit: e7b591d794bb9df6d202e2840262b0694d9c2966
Parents: c5ce1ab
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 16 15:42:27 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 16 15:42:27 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   | 64 ++-----------------
 .../tez/engine/common/TezEngineUtils.java       |  2 +-
 .../engine/newapi/impl/TezInputContextImpl.java |  2 +-
 .../newapi/impl/TezOutputContextImpl.java       |  2 +-
 .../newapi/impl/TezProcessorContextImpl.java    |  2 +-
 .../tez/mapreduce/newprocessor/MRTask.java      | 66 ++++++++++++++++----
 .../newprocessor/map/MapProcessor.java          | 23 +++----
 .../newprocessor/reduce/ReduceProcessor.java    | 18 +++---
 8 files changed, 82 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index e988fa5..ab52a4e 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -37,7 +37,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -45,9 +44,8 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -76,6 +74,7 @@ import org.apache.tez.engine.newapi.events.TaskAttemptCompletedEvent;
 import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
 import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
 import org.apache.tez.engine.newapi.impl.EventMetaData;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.engine.newapi.impl.InputSpec;
 import org.apache.tez.engine.newapi.impl.OutputSpec;
 import org.apache.tez.engine.newapi.impl.TaskSpec;
@@ -83,7 +82,6 @@ import org.apache.tez.engine.newapi.impl.TezEvent;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
 import org.apache.tez.engine.newapi.impl.TezUmbilical;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.mapreduce.newinput.SimpleInputLegacy;
 import org.apache.tez.mapreduce.newoutput.SimpleOutput;
@@ -447,60 +445,6 @@ public class YarnTezDagChild {
     }
   }
 
-  /**
-   * Configure mapred-local dirs. This config is used by the task for finding
-   * out an output directory.
-   * @throws IOException
-   */
-  /**
-   * Configure tez-local-dirs, tez-localized-file-dir, etc. Also create these
-   * dirs.
-   */
-
-  private static void configureLocalDirs(Configuration conf) throws IOException {
-    String[] localSysDirs = StringUtils.getTrimmedStrings(
-        System.getenv(Environment.LOCAL_DIRS.name()));
-    conf.setStrings(TezJobConfig.LOCAL_DIRS, localSysDirs);
-    conf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
-        System.getenv(Environment.PWD.name()));
-
-    LOG.info(TezJobConfig.LOCAL_DIRS + " for child: " +
-        conf.get(TezJobConfig.LOCAL_DIRS));
-    LOG.info(TezJobConfig.TASK_LOCAL_RESOURCE_DIR + " for child: "
-        + conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR));
-
-    LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-    Path workDir = null;
-    // First, try to find the JOB_LOCAL_DIR on this host.
-    try {
-      workDir = lDirAlloc.getLocalPathToRead("work", conf);
-    } catch (DiskErrorException e) {
-      // DiskErrorException means dir not found. If not found, it will
-      // be created below.
-    }
-    if (workDir == null) {
-      // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
-      workDir = lDirAlloc.getLocalPathForWrite("work", conf);
-      FileSystem lfs = FileSystem.getLocal(conf).getRaw();
-      boolean madeDir = false;
-      try {
-        madeDir = lfs.mkdirs(workDir);
-      } catch (FileAlreadyExistsException e) {
-        // Since all tasks will be running in their own JVM, the race condition
-        // exists where multiple tasks could be trying to create this directory
-        // at the same time. If this task loses the race, it's okay because
-        // the directory already exists.
-        madeDir = true;
-        workDir = lDirAlloc.getLocalPathToRead("work", conf);
-      }
-      if (!madeDir) {
-          throw new IOException("Mkdirs failed to create "
-              + workDir.toString());
-      }
-    }
-    conf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString());
-  }
-
   private static LogicalIOProcessorRuntimeTask createLogicalTask(int attemptNum,
       TaskSpec taskSpec, Configuration conf, TezUmbilical tezUmbilical,
       Token<JobTokenIdentifier> jobToken) throws IOException {
@@ -508,7 +452,6 @@ public class YarnTezDagChild {
     // FIXME TODONEWTEZ
     conf.setBoolean("ipc.client.tcpnodelay", true);
     conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, attemptNum);
-    configureLocalDirs(conf);
     FileSystem.get(conf).setWorkingDirectory(getWorkingDirectory(conf));
 
     // FIXME need Input/Output vertices else we have this hack
@@ -528,6 +471,9 @@ public class YarnTezDagChild {
       taskSpec.getOutputs().add(
           new OutputSpec("null", simpleOutputDesc, 0));
     }
+    String [] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
+    conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
+    LOG.info("LocalDirs for child: " + localDirs);
     return new LogicalIOProcessorRuntimeTask(taskSpec, conf,
         tezUmbilical, jobToken);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
index f34092c..da333a2 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
@@ -60,7 +60,7 @@ public class TezEngineUtils {
       throw new IOException("Unable to find Partitioner class in config", e);
     }
 
-    LOG.info("ZZZ: Using partitioner class: " + clazz.getName());
+    LOG.info("Using partitioner class: " + clazz.getName());
 
     Partitioner partitioner = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index 72a36d9..b761205 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -52,7 +52,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
     this.sourceInfo = new EventMetaData(
         EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
         taskAttemptID);
-    this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
+    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
         .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
         getTaskIndex(), getAttemptNumber(), sourceVertexName);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index ba48b71..f4dbc9e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -52,7 +52,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
     this.destinationVertexName = destinationVertexName;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
         taskVertexName, destinationVertexName, taskAttemptID);
-    this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
+    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
         .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
         getTaskIndex(), getAttemptNumber(), destinationVertexName);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index e769550..f5e5b8d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -48,7 +48,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
     this.userPayload = userPayload;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
         taskVertexName, "", taskAttemptID);
-    this.uniqueIdentifier = String.format("%s_%s_%6d_%2d", taskAttemptID
+    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d", taskAttemptID
         .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
         getTaskIndex(), getAttemptNumber());
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
index bfa73c4..ceb8e81 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
@@ -36,10 +36,12 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -62,6 +64,8 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
@@ -101,7 +105,7 @@ public abstract class MRTask {
   protected GcTimeUpdater gcUpdater;
   private ResourceCalculatorProcessTree pTree;
   private long initCpuCumulativeTime = 0;
-  protected TezProcessorContext tezEngineTaskContext;
+  protected TezProcessorContext processorContext;
   protected TaskAttemptID taskAttemptId;
   protected Progress progress = new Progress();
   protected SecretKey jobTokenSecret;
@@ -138,7 +142,7 @@ public abstract class MRTask {
     
     DeprecatedKeys.init();
 
-    tezEngineTaskContext = context;
+    processorContext = context;
     counters = context.getCounters();
     this.taskAttemptId = new TaskAttemptID(
         new TaskID(
@@ -174,7 +178,7 @@ public abstract class MRTask {
     // vertex. A solution like this may be more practical once it's possible to
     // submit configuration parameters to the AM and effectively tasks via RPC.
 
-    jobConf.set(MRJobConfig.VERTEX_NAME, tezEngineTaskContext.getTaskVertexName());
+    jobConf.set(MRJobConfig.VERTEX_NAME, processorContext.getTaskVertexName());
 
     if (LOG.isDebugEnabled() && userPayload != null) {
       Iterator<Entry<String, String>> iter = jobConf.iterator();
@@ -203,7 +207,7 @@ public abstract class MRTask {
     // Containers.
     // Set it in conf, so as to be able to be used the the OutputCommitter.
     // TODO should this be DAG Id
-    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, tezEngineTaskContext
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, processorContext
         .getApplicationId().getId());
 
     jobConf.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, YarnOutputFiles.class,
@@ -220,9 +224,8 @@ public abstract class MRTask {
       LOG.warn("No job token set");
     }
 
-    jobConf.set(MRJobConfig.JOB_LOCAL_DIR, jobConf.get(TezJobConfig.JOB_LOCAL_DIR));
-    jobConf.set(MRConfig.LOCAL_DIR, jobConf.get(TezJobConfig.LOCAL_DIRS));
-    
+    configureLocalDirs();
+
     if (jobConf.get(TezJobConfig.DAG_CREDENTIALS_BINARY) != null) {
       jobConf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY,
           jobConf.get(TezJobConfig.DAG_CREDENTIALS_BINARY));
@@ -231,6 +234,47 @@ public abstract class MRTask {
     // Set up the DistributedCache related configs
     setupDistributedCacheConfig(jobConf);
   }
+  
+  private void configureLocalDirs() throws IOException {
+    // TODO NEWTEZ Is most of this functionality required ?
+    jobConf.setStrings(TezJobConfig.LOCAL_DIRS, processorContext.getWorkDirs());
+    jobConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, System.getenv(Environment.PWD.name()));
+    
+    jobConf.setStrings(MRConfig.LOCAL_DIR, processorContext.getWorkDirs());
+    
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+    Path workDir = null;
+    // First, try to find the JOB_LOCAL_DIR on this host.
+    try {
+      workDir = lDirAlloc.getLocalPathToRead("work", jobConf);
+    } catch (DiskErrorException e) {
+      // DiskErrorException means dir not found. If not found, it will
+      // be created below.
+    }
+    if (workDir == null) {
+      // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
+      workDir = lDirAlloc.getLocalPathForWrite("work", jobConf);
+      FileSystem lfs = FileSystem.getLocal(jobConf).getRaw();
+      boolean madeDir = false;
+      try {
+        madeDir = lfs.mkdirs(workDir);
+      } catch (FileAlreadyExistsException e) {
+        // Since all tasks will be running in their own JVM, the race condition
+        // exists where multiple tasks could be trying to create this directory
+        // at the same time. If this task loses the race, it's okay because
+        // the directory already exists.
+        madeDir = true;
+        workDir = lDirAlloc.getLocalPathToRead("work", jobConf);
+      }
+      if (!madeDir) {
+          throw new IOException("Mkdirs failed to create "
+              + workDir.toString());
+      }
+    }
+    // TODO NEWTEZ Is this required ?
+    jobConf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString());
+    jobConf.set(MRJobConfig.JOB_LOCAL_DIR, workDir.toString());
+  }
 
   /**
    * Set up the DistributedCache related configs to make
@@ -299,12 +343,12 @@ public abstract class MRTask {
   }
 
   public TezProcessorContext getUmbilical() {
-    return this.tezEngineTaskContext;
+    return this.processorContext;
   }
 
   public void initTask() throws IOException,
                                 InterruptedException {
-    this.mrReporter = new MRTaskReporter(tezEngineTaskContext);
+    this.mrReporter = new MRTaskReporter(processorContext);
     this.useNewApi = jobConf.getUseNewMapper();
     TezDAGID dagId = IDConverter.fromMRTaskAttemptId(taskAttemptId).getTaskID()
         .getVertexID().getDAGId();
@@ -513,7 +557,7 @@ public abstract class MRTask {
 
   private void commit(org.apache.hadoop.mapreduce.OutputCommitter committer
       ) throws IOException {
-    while (!tezEngineTaskContext.canCommit()) {
+    while (!processorContext.canCommit()) {
       // This will loop till the AM asks for the task to be killed. As
       // against, the AM sending a signal to the task to kill itself
       // gracefully.
@@ -732,6 +776,6 @@ public abstract class MRTask {
   }
 
   public TezProcessorContext getTezEngineTaskContext() {
-    return tezEngineTaskContext;
+    return processorContext;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
index ab4fc8e..b372536 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
@@ -47,6 +47,7 @@ import org.apache.tez.engine.newapi.TezProcessorContext;
 import org.apache.tez.mapreduce.hadoop.newmapreduce.MapContextImpl;
 import org.apache.tez.mapreduce.newinput.SimpleInput;
 import org.apache.tez.mapreduce.newinput.SimpleInputLegacy;
+import org.apache.tez.mapreduce.newoutput.SimpleOutput;
 import org.apache.tez.mapreduce.newprocessor.MRTask;
 import org.apache.tez.mapreduce.newprocessor.MRTaskReporter;
 
@@ -85,7 +86,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
   public void run(Map<String, LogicalInput> inputs,
       Map<String, LogicalOutput> outputs) throws Exception {
 
-    LOG.info("Running map: " + tezEngineTaskContext.getUniqueIdentifier());
+    LOG.info("Running map: " + processorContext.getUniqueIdentifier());
     
     initTask();
 
@@ -112,15 +113,11 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
     
     KVWriter kvWriter = ((OnFileSortedOutput)out).getWriter();
     
-    //TODO fix committer
-//    if (out instanceof SimpleOutput) {
-//      initCommitter(jobConf, useNewApi, false);
-//      ((SimpleOutput)out).setTask(this);
-//    } else if (out instanceof SortingOutput) {
-//      initCommitter(jobConf, useNewApi, true);
-//      initPartitioner(jobConf);
-//      ((SortingOutput)out).setTask(this);
-//    }
+    if (out instanceof SimpleOutput) {
+      initCommitter(jobConf, useNewApi, false);
+    } else { // Assuming no other output needs commit.
+      initCommitter(jobConf, useNewApi, true);
+    }
 
     if (useNewApi) {
       runNewMapper(jobConf, mrReporter, input, kvWriter, doingShuffle);
@@ -195,7 +192,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
         job, taskAttemptId, 
         input, output, 
         getCommitter(), 
-        tezEngineTaskContext, split);
+        processorContext, split);
 
     org.apache.hadoop.mapreduce.Mapper.Context mapperContext = 
         new WrappedMapper().getMapContext(mapContext);
@@ -342,12 +339,12 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
   
   @Override
   public TezCounter getOutputRecordsCounter() {
-    return tezEngineTaskContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+    return processorContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
   }
 
   @Override
   public TezCounter getInputRecordsCounter() {
-    return tezEngineTaskContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
+    return processorContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
index e597979..a5bf16b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
@@ -95,7 +95,7 @@ implements LogicalIOProcessor {
   public void run(Map<String, LogicalInput> inputs,
       Map<String, LogicalOutput> outputs) throws Exception {
 
-    LOG.info("Running reduce: " + tezEngineTaskContext.getUniqueIdentifier());
+    LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
     
     initTask();
     
@@ -112,13 +112,11 @@ implements LogicalIOProcessor {
     Input in = inputs.values().iterator().next();
     Output out = outputs.values().iterator().next();
     
-//    if (out instanceof SimpleOutput) {
-//      initCommitter(jobConf, useNewApi, false);
-//    } else if (out instanceof SortingOutput) {
-//      initCommitter(jobConf, useNewApi, true);
-//      initPartitioner(jobConf);
-//      ((SortingOutput)out).setTask(this);
-//    }
+    if (out instanceof SimpleOutput) {
+      initCommitter(jobConf, useNewApi, false);
+    } else {
+      initCommitter(jobConf, useNewApi, true);
+    }
 
     this.statusUpdate();
     
@@ -352,12 +350,12 @@ implements LogicalIOProcessor {
 
   @Override
   public TezCounter getOutputRecordsCounter() {
-    return tezEngineTaskContext.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
+    return processorContext.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
   }
 
   @Override
   public TezCounter getInputRecordsCounter() {
-    return tezEngineTaskContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+    return processorContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
   }
 
 }