You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/11/22 01:48:27 UTC
git commit: TEZ-631. [MR Support] Some mappers/reducers require
TaskAttemptID to be present in JobConf. (hitesh)
Updated Branches:
refs/heads/master b885eca72 -> 3d646ab2a
TEZ-631. [MR Support] Some mappers/reducers require TaskAttemptID to be present in JobConf. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3d646ab2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3d646ab2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3d646ab2
Branch: refs/heads/master
Commit: 3d646ab2a97cb5b5255bd8e4ea0fbba635e0c41c
Parents: b885eca
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Nov 21 16:48:04 2013 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Nov 21 16:48:04 2013 -0800
----------------------------------------------------------------------
.../org/apache/tez/mapreduce/input/MRInput.java | 23 ++++++++++++++----
.../apache/tez/mapreduce/output/MROutput.java | 25 ++++++++++----------
.../apache/tez/mapreduce/processor/MRTask.java | 2 ++
3 files changed, 33 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d646ab2/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index c84d48c..3c6f46b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -40,7 +40,10 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -117,14 +120,26 @@ public class MRInput implements LogicalInput {
@Override
public List<Event> initialize(TezInputContext inputContext) throws IOException {
this.inputContext = inputContext;
- MRInputUserPayloadProto mrUserPayload = MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
+ MRInputUserPayloadProto mrUserPayload =
+ MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
"All split information not expected in MRInput");
- Configuration conf = MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
+ Configuration conf =
+ MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
this.jobConf = new JobConf(conf);
-
-
+ TaskAttemptID taskAttemptId = new TaskAttemptID(
+ new TaskID(
+ Long.toString(inputContext.getApplicationId().getClusterTimestamp()),
+ inputContext.getApplicationId().getId(), TaskType.MAP,
+ inputContext.getTaskIndex()),
+ inputContext.getTaskAttemptNumber());
+
+ jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
+ taskAttemptId.toString());
+ jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+ inputContext.getDAGAttemptNumber());
+
// TODO NEWTEZ Rename this to be specific to MRInput. This Input, in
// theory, can be used by the MapProcessor, ReduceProcessor or a custom
// processor. (The processor could provide the counter though)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d646ab2/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index f4c334d..fbb155c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -104,6 +104,18 @@ public class MROutput implements LogicalOutput {
false);
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
outputContext.getDAGAttemptNumber());
+ TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(
+ outputContext.getApplicationId().getClusterTimestamp()),
+ outputContext.getApplicationId().getId(),
+ (isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
+ outputContext.getTaskIndex()),
+ outputContext.getTaskAttemptNumber());
+ jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
+ jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
+ jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
+ jobConf.setInt(JobContext.TASK_PARTITION,
+ taskAttemptId.getTaskID().getId());
+ jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
outputRecordCounter = outputContext.getCounters().findCounter(
TaskCounter.MAP_OUTPUT_RECORDS);
@@ -141,19 +153,6 @@ public class MROutput implements LogicalOutput {
long bytesOutCurr = getOutputBytes();
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
} else {
- TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(
- outputContext.getApplicationId().getClusterTimestamp()),
- outputContext.getApplicationId().getId(),
- (isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
- outputContext.getTaskIndex()),
- outputContext.getTaskAttemptNumber());
- jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
- jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
- jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
- jobConf.setInt(JobContext.TASK_PARTITION,
- taskAttemptId.getTaskID().getId());
- jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
-
oldApiTaskAttemptContext =
new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
jobConf, taskAttemptId,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d646ab2/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 5471c55..7383122 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -157,6 +157,8 @@ public abstract class MRTask {
}
jobConf.set(Constants.TEZ_RUNTIME_TASK_ATTEMPT_ID,
taskAttemptId.toString());
+ jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
+ taskAttemptId.toString());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
context.getDAGAttemptNumber());