You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/17 00:42:46 UTC
git commit: TEZ-455. Fix local dir setup and committer setup for
MRTask (part of TEZ-398). (sseth)
Updated Branches:
refs/heads/TEZ-398 c5ce1aba0 -> e7b591d79
TEZ-455. Fix local dir setup and committer setup for MRTask (part of
TEZ-398). (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e7b591d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e7b591d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e7b591d7
Branch: refs/heads/TEZ-398
Commit: e7b591d794bb9df6d202e2840262b0694d9c2966
Parents: c5ce1ab
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 16 15:42:27 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 16 15:42:27 2013 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/YarnTezDagChild.java | 64 ++-----------------
.../tez/engine/common/TezEngineUtils.java | 2 +-
.../engine/newapi/impl/TezInputContextImpl.java | 2 +-
.../newapi/impl/TezOutputContextImpl.java | 2 +-
.../newapi/impl/TezProcessorContextImpl.java | 2 +-
.../tez/mapreduce/newprocessor/MRTask.java | 66 ++++++++++++++++----
.../newprocessor/map/MapProcessor.java | 23 +++----
.../newprocessor/reduce/ReduceProcessor.java | 18 +++---
8 files changed, 82 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index e988fa5..ab52a4e 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -37,7 +37,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -45,9 +44,8 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -76,6 +74,7 @@ import org.apache.tez.engine.newapi.events.TaskAttemptCompletedEvent;
import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
import org.apache.tez.engine.newapi.impl.EventMetaData;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.engine.newapi.impl.InputSpec;
import org.apache.tez.engine.newapi.impl.OutputSpec;
import org.apache.tez.engine.newapi.impl.TaskSpec;
@@ -83,7 +82,6 @@ import org.apache.tez.engine.newapi.impl.TezEvent;
import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
import org.apache.tez.engine.newapi.impl.TezUmbilical;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.mapreduce.newinput.SimpleInputLegacy;
import org.apache.tez.mapreduce.newoutput.SimpleOutput;
@@ -447,60 +445,6 @@ public class YarnTezDagChild {
}
}
- /**
- * Configure mapred-local dirs. This config is used by the task for finding
- * out an output directory.
- * @throws IOException
- */
- /**
- * Configure tez-local-dirs, tez-localized-file-dir, etc. Also create these
- * dirs.
- */
-
- private static void configureLocalDirs(Configuration conf) throws IOException {
- String[] localSysDirs = StringUtils.getTrimmedStrings(
- System.getenv(Environment.LOCAL_DIRS.name()));
- conf.setStrings(TezJobConfig.LOCAL_DIRS, localSysDirs);
- conf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
- System.getenv(Environment.PWD.name()));
-
- LOG.info(TezJobConfig.LOCAL_DIRS + " for child: " +
- conf.get(TezJobConfig.LOCAL_DIRS));
- LOG.info(TezJobConfig.TASK_LOCAL_RESOURCE_DIR + " for child: "
- + conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR));
-
- LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
- Path workDir = null;
- // First, try to find the JOB_LOCAL_DIR on this host.
- try {
- workDir = lDirAlloc.getLocalPathToRead("work", conf);
- } catch (DiskErrorException e) {
- // DiskErrorException means dir not found. If not found, it will
- // be created below.
- }
- if (workDir == null) {
- // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
- workDir = lDirAlloc.getLocalPathForWrite("work", conf);
- FileSystem lfs = FileSystem.getLocal(conf).getRaw();
- boolean madeDir = false;
- try {
- madeDir = lfs.mkdirs(workDir);
- } catch (FileAlreadyExistsException e) {
- // Since all tasks will be running in their own JVM, the race condition
- // exists where multiple tasks could be trying to create this directory
- // at the same time. If this task loses the race, it's okay because
- // the directory already exists.
- madeDir = true;
- workDir = lDirAlloc.getLocalPathToRead("work", conf);
- }
- if (!madeDir) {
- throw new IOException("Mkdirs failed to create "
- + workDir.toString());
- }
- }
- conf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString());
- }
-
private static LogicalIOProcessorRuntimeTask createLogicalTask(int attemptNum,
TaskSpec taskSpec, Configuration conf, TezUmbilical tezUmbilical,
Token<JobTokenIdentifier> jobToken) throws IOException {
@@ -508,7 +452,6 @@ public class YarnTezDagChild {
// FIXME TODONEWTEZ
conf.setBoolean("ipc.client.tcpnodelay", true);
conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, attemptNum);
- configureLocalDirs(conf);
FileSystem.get(conf).setWorkingDirectory(getWorkingDirectory(conf));
// FIXME need Input/Output vertices else we have this hack
@@ -528,6 +471,9 @@ public class YarnTezDagChild {
taskSpec.getOutputs().add(
new OutputSpec("null", simpleOutputDesc, 0));
}
+ String [] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
+ conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
+ LOG.info("LocalDirs for child: " + localDirs);
return new LogicalIOProcessorRuntimeTask(taskSpec, conf,
tezUmbilical, jobToken);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
index f34092c..da333a2 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
@@ -60,7 +60,7 @@ public class TezEngineUtils {
throw new IOException("Unable to find Partitioner class in config", e);
}
- LOG.info("ZZZ: Using partitioner class: " + clazz.getName());
+ LOG.info("Using partitioner class: " + clazz.getName());
Partitioner partitioner = null;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index 72a36d9..b761205 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -52,7 +52,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
this.sourceInfo = new EventMetaData(
EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
taskAttemptID);
- this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
+ this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
.getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
getTaskIndex(), getAttemptNumber(), sourceVertexName);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index ba48b71..f4dbc9e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -52,7 +52,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
this.destinationVertexName = destinationVertexName;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
taskVertexName, destinationVertexName, taskAttemptID);
- this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
+ this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
.getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
getTaskIndex(), getAttemptNumber(), destinationVertexName);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index e769550..f5e5b8d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -48,7 +48,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
this.userPayload = userPayload;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
taskVertexName, "", taskAttemptID);
- this.uniqueIdentifier = String.format("%s_%s_%6d_%2d", taskAttemptID
+ this.uniqueIdentifier = String.format("%s_%s_%06d_%02d", taskAttemptID
.getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
getTaskIndex(), getAttemptNumber());
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
index bfa73c4..ceb8e81 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
@@ -36,10 +36,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
@@ -62,6 +64,8 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
@@ -101,7 +105,7 @@ public abstract class MRTask {
protected GcTimeUpdater gcUpdater;
private ResourceCalculatorProcessTree pTree;
private long initCpuCumulativeTime = 0;
- protected TezProcessorContext tezEngineTaskContext;
+ protected TezProcessorContext processorContext;
protected TaskAttemptID taskAttemptId;
protected Progress progress = new Progress();
protected SecretKey jobTokenSecret;
@@ -138,7 +142,7 @@ public abstract class MRTask {
DeprecatedKeys.init();
- tezEngineTaskContext = context;
+ processorContext = context;
counters = context.getCounters();
this.taskAttemptId = new TaskAttemptID(
new TaskID(
@@ -174,7 +178,7 @@ public abstract class MRTask {
// vertex. A solution like this may be more practical once it's possible to
// submit configuration parameters to the AM and effectively tasks via RPC.
- jobConf.set(MRJobConfig.VERTEX_NAME, tezEngineTaskContext.getTaskVertexName());
+ jobConf.set(MRJobConfig.VERTEX_NAME, processorContext.getTaskVertexName());
if (LOG.isDebugEnabled() && userPayload != null) {
Iterator<Entry<String, String>> iter = jobConf.iterator();
@@ -203,7 +207,7 @@ public abstract class MRTask {
// Containers.
// Set it in conf, so as to be able to be used the the OutputCommitter.
// TODO should this be DAG Id
- jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, tezEngineTaskContext
+ jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, processorContext
.getApplicationId().getId());
jobConf.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, YarnOutputFiles.class,
@@ -220,9 +224,8 @@ public abstract class MRTask {
LOG.warn("No job token set");
}
- jobConf.set(MRJobConfig.JOB_LOCAL_DIR, jobConf.get(TezJobConfig.JOB_LOCAL_DIR));
- jobConf.set(MRConfig.LOCAL_DIR, jobConf.get(TezJobConfig.LOCAL_DIRS));
-
+ configureLocalDirs();
+
if (jobConf.get(TezJobConfig.DAG_CREDENTIALS_BINARY) != null) {
jobConf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY,
jobConf.get(TezJobConfig.DAG_CREDENTIALS_BINARY));
@@ -231,6 +234,47 @@ public abstract class MRTask {
// Set up the DistributedCache related configs
setupDistributedCacheConfig(jobConf);
}
+
+ private void configureLocalDirs() throws IOException {
+ // TODO NEWTEZ Is most of this functionality required ?
+ jobConf.setStrings(TezJobConfig.LOCAL_DIRS, processorContext.getWorkDirs());
+ jobConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, System.getenv(Environment.PWD.name()));
+
+ jobConf.setStrings(MRConfig.LOCAL_DIR, processorContext.getWorkDirs());
+
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+ Path workDir = null;
+ // First, try to find the JOB_LOCAL_DIR on this host.
+ try {
+ workDir = lDirAlloc.getLocalPathToRead("work", jobConf);
+ } catch (DiskErrorException e) {
+ // DiskErrorException means dir not found. If not found, it will
+ // be created below.
+ }
+ if (workDir == null) {
+ // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
+ workDir = lDirAlloc.getLocalPathForWrite("work", jobConf);
+ FileSystem lfs = FileSystem.getLocal(jobConf).getRaw();
+ boolean madeDir = false;
+ try {
+ madeDir = lfs.mkdirs(workDir);
+ } catch (FileAlreadyExistsException e) {
+ // Since all tasks will be running in their own JVM, the race condition
+ // exists where multiple tasks could be trying to create this directory
+ // at the same time. If this task loses the race, it's okay because
+ // the directory already exists.
+ madeDir = true;
+ workDir = lDirAlloc.getLocalPathToRead("work", jobConf);
+ }
+ if (!madeDir) {
+ throw new IOException("Mkdirs failed to create "
+ + workDir.toString());
+ }
+ }
+ // TODO NEWTEZ Is this required ?
+ jobConf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString());
+ jobConf.set(MRJobConfig.JOB_LOCAL_DIR, workDir.toString());
+ }
/**
* Set up the DistributedCache related configs to make
@@ -299,12 +343,12 @@ public abstract class MRTask {
}
public TezProcessorContext getUmbilical() {
- return this.tezEngineTaskContext;
+ return this.processorContext;
}
public void initTask() throws IOException,
InterruptedException {
- this.mrReporter = new MRTaskReporter(tezEngineTaskContext);
+ this.mrReporter = new MRTaskReporter(processorContext);
this.useNewApi = jobConf.getUseNewMapper();
TezDAGID dagId = IDConverter.fromMRTaskAttemptId(taskAttemptId).getTaskID()
.getVertexID().getDAGId();
@@ -513,7 +557,7 @@ public abstract class MRTask {
private void commit(org.apache.hadoop.mapreduce.OutputCommitter committer
) throws IOException {
- while (!tezEngineTaskContext.canCommit()) {
+ while (!processorContext.canCommit()) {
// This will loop till the AM asks for the task to be killed. As
// against, the AM sending a signal to the task to kill itself
// gracefully.
@@ -732,6 +776,6 @@ public abstract class MRTask {
}
public TezProcessorContext getTezEngineTaskContext() {
- return tezEngineTaskContext;
+ return processorContext;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
index ab4fc8e..b372536 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
@@ -47,6 +47,7 @@ import org.apache.tez.engine.newapi.TezProcessorContext;
import org.apache.tez.mapreduce.hadoop.newmapreduce.MapContextImpl;
import org.apache.tez.mapreduce.newinput.SimpleInput;
import org.apache.tez.mapreduce.newinput.SimpleInputLegacy;
+import org.apache.tez.mapreduce.newoutput.SimpleOutput;
import org.apache.tez.mapreduce.newprocessor.MRTask;
import org.apache.tez.mapreduce.newprocessor.MRTaskReporter;
@@ -85,7 +86,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
- LOG.info("Running map: " + tezEngineTaskContext.getUniqueIdentifier());
+ LOG.info("Running map: " + processorContext.getUniqueIdentifier());
initTask();
@@ -112,15 +113,11 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
KVWriter kvWriter = ((OnFileSortedOutput)out).getWriter();
- //TODO fix committer
-// if (out instanceof SimpleOutput) {
-// initCommitter(jobConf, useNewApi, false);
-// ((SimpleOutput)out).setTask(this);
-// } else if (out instanceof SortingOutput) {
-// initCommitter(jobConf, useNewApi, true);
-// initPartitioner(jobConf);
-// ((SortingOutput)out).setTask(this);
-// }
+ if (out instanceof SimpleOutput) {
+ initCommitter(jobConf, useNewApi, false);
+ } else { // Assuming no other output needs commit.
+ initCommitter(jobConf, useNewApi, true);
+ }
if (useNewApi) {
runNewMapper(jobConf, mrReporter, input, kvWriter, doingShuffle);
@@ -195,7 +192,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
job, taskAttemptId,
input, output,
getCommitter(),
- tezEngineTaskContext, split);
+ processorContext, split);
org.apache.hadoop.mapreduce.Mapper.Context mapperContext =
new WrappedMapper().getMapContext(mapContext);
@@ -342,12 +339,12 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
@Override
public TezCounter getOutputRecordsCounter() {
- return tezEngineTaskContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+ return processorContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
}
@Override
public TezCounter getInputRecordsCounter() {
- return tezEngineTaskContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
+ return processorContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7b591d7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
index e597979..a5bf16b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
@@ -95,7 +95,7 @@ implements LogicalIOProcessor {
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
- LOG.info("Running reduce: " + tezEngineTaskContext.getUniqueIdentifier());
+ LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
initTask();
@@ -112,13 +112,11 @@ implements LogicalIOProcessor {
Input in = inputs.values().iterator().next();
Output out = outputs.values().iterator().next();
-// if (out instanceof SimpleOutput) {
-// initCommitter(jobConf, useNewApi, false);
-// } else if (out instanceof SortingOutput) {
-// initCommitter(jobConf, useNewApi, true);
-// initPartitioner(jobConf);
-// ((SortingOutput)out).setTask(this);
-// }
+ if (out instanceof SimpleOutput) {
+ initCommitter(jobConf, useNewApi, false);
+ } else {
+ initCommitter(jobConf, useNewApi, true);
+ }
this.statusUpdate();
@@ -352,12 +350,12 @@ implements LogicalIOProcessor {
@Override
public TezCounter getOutputRecordsCounter() {
- return tezEngineTaskContext.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
+ return processorContext.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
}
@Override
public TezCounter getInputRecordsCounter() {
- return tezEngineTaskContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+ return processorContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
}
}