You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/06/08 22:04:31 UTC

tez git commit: TEZ-3290. Set full task attempt id string in MRInput configuration object. Contributed by Prasanth Jayachandran. (cherry picked from commit 2c212858ab91fb49ca8e6ab36bf51328adcccb43)

Repository: tez
Updated Branches:
  refs/heads/branch-0.8 f9fb1f1d7 -> b5859b6f6


TEZ-3290. Set full task attempt id string in MRInput configuration
object. Contributed by Prasanth Jayachandran.
(cherry picked from commit 2c212858ab91fb49ca8e6ab36bf51328adcccb43)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b5859b6f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b5859b6f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b5859b6f

Branch: refs/heads/branch-0.8
Commit: b5859b6f68e9119a83938a1171ce903a46e4f942
Parents: f9fb1f1
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Jun 8 15:02:50 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Jun 8 15:04:24 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/mapreduce/hadoop/MRInputHelpers.java    | 40 ++++++++++++++++++++
 .../org/apache/tez/mapreduce/input/MRInput.java |  4 ++
 .../tez/mapreduce/input/base/MRInputBase.java   | 13 +++++++
 .../apache/tez/mapreduce/input/TestMRInput.java | 10 +++++
 5 files changed, 68 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b5859b6f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 61770d0..c598deb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3290. Set full task attempt id string in MRInput configuration object.
   TEZ-3278. Hide Swimlane from Tez UI
   TEZ-3280. LOG MRInputHelpers split generation message as INFO
   TEZ-909.  Provide support for application tags

http://git-wip-us.apache.org/repos/asf/tez/blob/b5859b6f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index 034d379..2a806ab 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -730,6 +730,16 @@ public class MRInputHelpers {
   }
 
   /**
+   * Returns string representation of full DAG identifier
+   * @param conf configuration instance
+   * @return dag identifier
+   */
+  @Public
+  public static String getDagIdString(Configuration conf) {
+    return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ID);
+  }
+
+  /**
    * * @see {@link InputContext#getTaskVertexIndex}
    * @param conf configuration instance
    * @return vertex index
@@ -740,6 +750,16 @@ public class MRInputHelpers {
   }
 
   /**
+   * Returns string representation of full vertex identifier
+   * @param conf configuration instance
+   * @return vertex identifier
+   */
+  @Public
+  public static String getVertexIdString(Configuration conf) {
+    return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_VERTEX_ID);
+  }
+
+  /**
    * @see {@link InputContext#getTaskIndex}
    * @param conf configuration instance
    * @return task index
@@ -750,6 +770,16 @@ public class MRInputHelpers {
   }
 
   /**
+   * Returns string representation of full task identifier
+   * @param conf configuration instance
+   * @return task identifier
+   */
+  @Public
+  public static String getTaskIdString(Configuration conf) {
+    return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_ID);
+  }
+
+  /**
    * @see {@link InputContext#getTaskAttemptNumber}
    * @param conf configuration instance
    * @return task attempt index
@@ -760,6 +790,16 @@ public class MRInputHelpers {
   }
 
   /**
+   * Returns string representation of full task attempt identifier
+   * @param conf configuration instance
+   * @return task attempt identifier
+   */
+  @Public
+  public static String getTaskAttemptIdString(Configuration conf) {
+    return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID);
+  }
+
+  /**
    * @see {@link InputContext#getInputIndex}
    * @param conf configuration instance
    * @return input index

http://git-wip-us.apache.org/repos/asf/tez/blob/b5859b6f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 4a4ba86..e859058 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -94,6 +94,10 @@ public class MRInput extends MRInputBase {
   @Private public static final String TEZ_MAPREDUCE_APPLICATION_ID = "tez.mapreduce.application.id";
   @Private public static final String TEZ_MAPREDUCE_UNIQUE_IDENTIFIER = "tez.mapreduce.unique.identifier";
   @Private public static final String TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER = "tez.mapreduce.dag.attempt.number";
+  @Private public static final String TEZ_MAPREDUCE_DAG_ID= "tez.mapreduce.dag.id";
+  @Private public static final String TEZ_MAPREDUCE_VERTEX_ID = "tez.mapreduce.vertex.id";
+  @Private public static final String TEZ_MAPREDUCE_TASK_ID = "tez.mapreduce.task.id";
+  @Private public static final String TEZ_MAPREDUCE_TASK_ATTEMPT_ID = "tez.mapreduce.task.attempt.id";
 
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b5859b6f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
index 230f55e..9a26c2b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
@@ -30,6 +30,10 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInput;
@@ -109,6 +113,15 @@ public abstract class MRInputBase extends AbstractLogicalInput {
     jobConf.set(MRInput.TEZ_MAPREDUCE_UNIQUE_IDENTIFIER, getContext().getUniqueIdentifier());
     jobConf.setInt(MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER, getContext().getDAGAttemptNumber());
 
+    TezDAGID tezDAGID = TezDAGID.getInstance(getContext().getApplicationId(), getContext().getDagIdentifier());
+    TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, getContext().getTaskVertexIndex());
+    TezTaskID tezTaskID = TezTaskID.getInstance(tezVertexID, getContext().getTaskIndex());
+    TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, getContext().getTaskAttemptNumber());
+    jobConf.set(MRInput.TEZ_MAPREDUCE_DAG_ID, tezDAGID.toString());
+    jobConf.set(MRInput.TEZ_MAPREDUCE_VERTEX_ID, tezVertexID.toString());
+    jobConf.set(MRInput.TEZ_MAPREDUCE_TASK_ID, tezTaskID.toString());
+    jobConf.set(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID, tezTaskAttemptID.toString());
+
     this.inputRecordCounter = getContext().getCounters().findCounter(
         TaskCounter.INPUT_RECORDS_PROCESSED);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b5859b6f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
index b42ef25..b878416 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
@@ -97,6 +97,11 @@ public class TestMRInput {
   private static final int TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX = 4000;
   private static final int TEST_ATTRIBUTES_INPUT_INDEX = 5000;
   private static final int TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER = 6000;
+  private static final String TEST_ATTRIBUTES_APPLICATION_ID_STRING = "application_0_0000";
+  private static final String TEST_ATTRIBUTES_DAG_ID_STRING = "dag_0_0000_1000";
+  private static final String TEST_ATTRIBUTES_VERTEX_ID_STRING = "vertex_0_0000_1000_2000";
+  private static final String TEST_ATTRIBUTES_TASK_ID_STRING = "task_0_0000_1000_2000_003000";
+  private static final String TEST_ATTRIBUTES_TASK_ATTEMPT_ID_STRING = "attempt_0_0000_1000_2000_003000_4000";
 
   @Test(timeout = 5000)
   public void testAttributesInJobConf() throws Exception {
@@ -163,6 +168,11 @@ public class TestMRInput {
       assertEquals(TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX, MRInputHelpers.getTaskAttemptIndex(job));
       assertEquals(TEST_ATTRIBUTES_INPUT_INDEX, MRInputHelpers.getInputIndex(job));
       assertEquals(TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER, MRInputHelpers.getDagAttemptNumber(job));
+      assertEquals(TEST_ATTRIBUTES_APPLICATION_ID_STRING, MRInputHelpers.getApplicationIdString(job));
+      assertEquals(TEST_ATTRIBUTES_DAG_ID_STRING, MRInputHelpers.getDagIdString(job));
+      assertEquals(TEST_ATTRIBUTES_VERTEX_ID_STRING, MRInputHelpers.getVertexIdString(job));
+      assertEquals(TEST_ATTRIBUTES_TASK_ID_STRING, MRInputHelpers.getTaskIdString(job));
+      assertEquals(TEST_ATTRIBUTES_TASK_ATTEMPT_ID_STRING, MRInputHelpers.getTaskAttemptIdString(job));
       invoked.set(true);
       return new RecordReader() {
         @Override