You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/11/22 01:48:27 UTC

git commit: TEZ-631. [MR Support] Some mappers/reducers require TaskAttemptID to be present in JobConf. (hitesh)

Updated Branches:
  refs/heads/master b885eca72 -> 3d646ab2a


TEZ-631. [MR Support] Some mappers/reducers require TaskAttemptID to be present in JobConf. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3d646ab2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3d646ab2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3d646ab2

Branch: refs/heads/master
Commit: 3d646ab2a97cb5b5255bd8e4ea0fbba635e0c41c
Parents: b885eca
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Nov 21 16:48:04 2013 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Nov 21 16:48:04 2013 -0800

----------------------------------------------------------------------
 .../org/apache/tez/mapreduce/input/MRInput.java | 23 ++++++++++++++----
 .../apache/tez/mapreduce/output/MROutput.java   | 25 ++++++++++----------
 .../apache/tez/mapreduce/processor/MRTask.java  |  2 ++
 3 files changed, 33 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d646ab2/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index c84d48c..3c6f46b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -40,7 +40,10 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -117,14 +120,26 @@ public class MRInput implements LogicalInput {
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws IOException {
     this.inputContext = inputContext;
-    MRInputUserPayloadProto mrUserPayload = MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
+    MRInputUserPayloadProto mrUserPayload =
+      MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
     Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
         "All split information not expected in MRInput");
-    Configuration conf = MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
+    Configuration conf =
+      MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
     this.jobConf = new JobConf(conf);
 
-    
-    
+    TaskAttemptID taskAttemptId = new TaskAttemptID(
+      new TaskID(
+        Long.toString(inputContext.getApplicationId().getClusterTimestamp()),
+        inputContext.getApplicationId().getId(), TaskType.MAP,
+        inputContext.getTaskIndex()),
+      inputContext.getTaskAttemptNumber());
+
+    jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
+      taskAttemptId.toString());
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+      inputContext.getDAGAttemptNumber());
+
     // TODO NEWTEZ Rename this to be specific to MRInput. This Input, in
     // theory, can be used by the MapProcessor, ReduceProcessor or a custom
     // processor. (The processor could provide the counter though)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d646ab2/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index f4c334d..fbb155c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -104,6 +104,18 @@ public class MROutput implements LogicalOutput {
         false);
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
         outputContext.getDAGAttemptNumber());
+    TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(
+      outputContext.getApplicationId().getClusterTimestamp()),
+      outputContext.getApplicationId().getId(),
+      (isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
+      outputContext.getTaskIndex()),
+      outputContext.getTaskAttemptNumber());
+    jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
+    jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
+    jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
+    jobConf.setInt(JobContext.TASK_PARTITION,
+      taskAttemptId.getTaskID().getId());
+    jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
 
     outputRecordCounter = outputContext.getCounters().findCounter(
         TaskCounter.MAP_OUTPUT_RECORDS);
@@ -141,19 +153,6 @@ public class MROutput implements LogicalOutput {
       long bytesOutCurr = getOutputBytes();
       fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     } else {
-      TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(
-          outputContext.getApplicationId().getClusterTimestamp()),
-          outputContext.getApplicationId().getId(),
-          (isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
-          outputContext.getTaskIndex()),
-          outputContext.getTaskAttemptNumber());
-      jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
-      jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
-      jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
-      jobConf.setInt(JobContext.TASK_PARTITION,
-          taskAttemptId.getTaskID().getId());
-      jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
-
       oldApiTaskAttemptContext =
           new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
               jobConf, taskAttemptId,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d646ab2/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 5471c55..7383122 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -157,6 +157,8 @@ public abstract class MRTask {
     }
     jobConf.set(Constants.TEZ_RUNTIME_TASK_ATTEMPT_ID,
         taskAttemptId.toString());
+    jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
+      taskAttemptId.toString());
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
         context.getDAGAttemptNumber());