You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/06/08 22:04:31 UTC
tez git commit: TEZ-3290. Set full task attempt id string in MRInput
configuration object. Contributed by Prasanth Jayachandran. (cherry picked
from commit 2c212858ab91fb49ca8e6ab36bf51328adcccb43)
Repository: tez
Updated Branches:
refs/heads/branch-0.8 f9fb1f1d7 -> b5859b6f6
TEZ-3290. Set full task attempt id string in MRInput configuration
object. Contributed by Prasanth Jayachandran.
(cherry picked from commit 2c212858ab91fb49ca8e6ab36bf51328adcccb43)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b5859b6f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b5859b6f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b5859b6f
Branch: refs/heads/branch-0.8
Commit: b5859b6f68e9119a83938a1171ce903a46e4f942
Parents: f9fb1f1
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Jun 8 15:02:50 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Jun 8 15:04:24 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/mapreduce/hadoop/MRInputHelpers.java | 40 ++++++++++++++++++++
.../org/apache/tez/mapreduce/input/MRInput.java | 4 ++
.../tez/mapreduce/input/base/MRInputBase.java | 13 +++++++
.../apache/tez/mapreduce/input/TestMRInput.java | 10 +++++
5 files changed, 68 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b5859b6f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 61770d0..c598deb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3290. Set full task attempt id string in MRInput configuration object.
TEZ-3278. Hide Swimlane from Tez UI
TEZ-3280. LOG MRInputHelpers split generation message as INFO
TEZ-909. Provide support for application tags
http://git-wip-us.apache.org/repos/asf/tez/blob/b5859b6f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index 034d379..2a806ab 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -730,6 +730,16 @@ public class MRInputHelpers {
}
/**
+ * Returns string representation of full DAG identifier
+ * @param conf configuration instance
+ * @return dag identifier
+ */
+ @Public
+ public static String getDagIdString(Configuration conf) {
+ return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ID);
+ }
+
+ /**
* * @see {@link InputContext#getTaskVertexIndex}
* @param conf configuration instance
* @return vertex index
@@ -740,6 +750,16 @@ public class MRInputHelpers {
}
/**
+ * Returns string representation of full vertex identifier
+ * @param conf configuration instance
+ * @return vertex identifier
+ */
+ @Public
+ public static String getVertexIdString(Configuration conf) {
+ return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_VERTEX_ID);
+ }
+
+ /**
* @see {@link InputContext#getTaskIndex}
* @param conf configuration instance
* @return task index
@@ -750,6 +770,16 @@ public class MRInputHelpers {
}
/**
+ * Returns string representation of full task identifier
+ * @param conf configuration instance
+ * @return task identifier
+ */
+ @Public
+ public static String getTaskIdString(Configuration conf) {
+ return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_ID);
+ }
+
+ /**
* @see {@link InputContext#getTaskAttemptNumber}
* @param conf configuration instance
* @return task attempt index
@@ -760,6 +790,16 @@ public class MRInputHelpers {
}
/**
+ * Returns string representation of full task attempt identifier
+ * @param conf configuration instance
+ * @return task attempt identifier
+ */
+ @Public
+ public static String getTaskAttemptIdString(Configuration conf) {
+ return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID);
+ }
+
+ /**
* @see {@link InputContext#getInputIndex}
* @param conf configuration instance
* @return input index
http://git-wip-us.apache.org/repos/asf/tez/blob/b5859b6f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 4a4ba86..e859058 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -94,6 +94,10 @@ public class MRInput extends MRInputBase {
@Private public static final String TEZ_MAPREDUCE_APPLICATION_ID = "tez.mapreduce.application.id";
@Private public static final String TEZ_MAPREDUCE_UNIQUE_IDENTIFIER = "tez.mapreduce.unique.identifier";
@Private public static final String TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER = "tez.mapreduce.dag.attempt.number";
+ @Private public static final String TEZ_MAPREDUCE_DAG_ID= "tez.mapreduce.dag.id";
+ @Private public static final String TEZ_MAPREDUCE_VERTEX_ID = "tez.mapreduce.vertex.id";
+ @Private public static final String TEZ_MAPREDUCE_TASK_ID = "tez.mapreduce.task.id";
+ @Private public static final String TEZ_MAPREDUCE_TASK_ATTEMPT_ID = "tez.mapreduce.task.attempt.id";
http://git-wip-us.apache.org/repos/asf/tez/blob/b5859b6f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
index 230f55e..9a26c2b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
@@ -30,6 +30,10 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.input.MRInput;
@@ -109,6 +113,15 @@ public abstract class MRInputBase extends AbstractLogicalInput {
jobConf.set(MRInput.TEZ_MAPREDUCE_UNIQUE_IDENTIFIER, getContext().getUniqueIdentifier());
jobConf.setInt(MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER, getContext().getDAGAttemptNumber());
+ TezDAGID tezDAGID = TezDAGID.getInstance(getContext().getApplicationId(), getContext().getDagIdentifier());
+ TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, getContext().getTaskVertexIndex());
+ TezTaskID tezTaskID = TezTaskID.getInstance(tezVertexID, getContext().getTaskIndex());
+ TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, getContext().getTaskAttemptNumber());
+ jobConf.set(MRInput.TEZ_MAPREDUCE_DAG_ID, tezDAGID.toString());
+ jobConf.set(MRInput.TEZ_MAPREDUCE_VERTEX_ID, tezVertexID.toString());
+ jobConf.set(MRInput.TEZ_MAPREDUCE_TASK_ID, tezTaskID.toString());
+ jobConf.set(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID, tezTaskAttemptID.toString());
+
this.inputRecordCounter = getContext().getCounters().findCounter(
TaskCounter.INPUT_RECORDS_PROCESSED);
http://git-wip-us.apache.org/repos/asf/tez/blob/b5859b6f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
index b42ef25..b878416 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
@@ -97,6 +97,11 @@ public class TestMRInput {
private static final int TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX = 4000;
private static final int TEST_ATTRIBUTES_INPUT_INDEX = 5000;
private static final int TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER = 6000;
+ private static final String TEST_ATTRIBUTES_APPLICATION_ID_STRING = "application_0_0000";
+ private static final String TEST_ATTRIBUTES_DAG_ID_STRING = "dag_0_0000_1000";
+ private static final String TEST_ATTRIBUTES_VERTEX_ID_STRING = "vertex_0_0000_1000_2000";
+ private static final String TEST_ATTRIBUTES_TASK_ID_STRING = "task_0_0000_1000_2000_003000";
+ private static final String TEST_ATTRIBUTES_TASK_ATTEMPT_ID_STRING = "attempt_0_0000_1000_2000_003000_4000";
@Test(timeout = 5000)
public void testAttributesInJobConf() throws Exception {
@@ -163,6 +168,11 @@ public class TestMRInput {
assertEquals(TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX, MRInputHelpers.getTaskAttemptIndex(job));
assertEquals(TEST_ATTRIBUTES_INPUT_INDEX, MRInputHelpers.getInputIndex(job));
assertEquals(TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER, MRInputHelpers.getDagAttemptNumber(job));
+ assertEquals(TEST_ATTRIBUTES_APPLICATION_ID_STRING, MRInputHelpers.getApplicationIdString(job));
+ assertEquals(TEST_ATTRIBUTES_DAG_ID_STRING, MRInputHelpers.getDagIdString(job));
+ assertEquals(TEST_ATTRIBUTES_VERTEX_ID_STRING, MRInputHelpers.getVertexIdString(job));
+ assertEquals(TEST_ATTRIBUTES_TASK_ID_STRING, MRInputHelpers.getTaskIdString(job));
+ assertEquals(TEST_ATTRIBUTES_TASK_ATTEMPT_ID_STRING, MRInputHelpers.getTaskAttemptIdString(job));
invoked.set(true);
return new RecordReader() {
@Override