You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/09/29 20:45:54 UTC
tez git commit: TEZ-2851. Support a way for upstream applications to
pass in a caller context to Tez. (hitesh)
Repository: tez
Updated Branches:
refs/heads/master 83a3c989b -> 774444312
TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/77444431
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/77444431
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/77444431
Branch: refs/heads/master
Commit: 774444312f8ea586939cb85c140c94251162e731
Parents: 83a3c98
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Sep 29 11:45:36 2015 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Sep 29 11:45:36 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../org/apache/tez/client/CallerContext.java | 171 +++++++++++++++++++
.../org/apache/tez/common/ATSConstants.java | 10 +-
.../java/org/apache/tez/common/TezUtils.java | 2 +-
.../main/java/org/apache/tez/dag/api/DAG.java | 20 +++
.../apache/tez/dag/api/DagTypeConverters.java | 28 ++-
tez-api/src/main/proto/DAGApiRecords.proto | 8 +
.../java/org/apache/tez/dag/api/TestDAG.java | 24 +++
.../org/apache/tez/common/TestTezUtils.java | 12 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 10 +-
.../impl/HistoryEventJsonConversion.java | 17 ++
.../apache/tez/dag/history/utils/DAGUtils.java | 24 ++-
.../tez/dag/history/utils/TestDAGUtils.java | 13 +-
.../apache/tez/examples/OrderedWordCount.java | 1 +
.../org/apache/tez/examples/TezExampleBase.java | 10 ++
.../tez/history/parser/datamodel/Constants.java | 5 +
.../tez/history/parser/datamodel/DagInfo.java | 30 ++++
.../apache/tez/history/TestHistoryParser.java | 18 ++
.../ats/HistoryEventTimelineConversion.java | 19 ++-
.../ats/TestHistoryEventTimelineConversion.java | 28 ++-
.../examples/TestOrderedWordCount.java | 3 +-
21 files changed, 439 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f65cfda..d5e8141 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
TEZ-2859. TestMergeManager.testLocalDiskMergeMultipleTasks failing
TEZ-2858. Stop using System.currentTimeMillis in TestInputReadyTracker.
TEZ-2857. Fix flakey tests in TestDAGImpl.
@@ -194,6 +195,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES
+ TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
TEZ-2858. Stop using System.currentTimeMillis in TestInputReadyTracker.
TEZ-2857. Fix flakey tests in TestDAGImpl.
TEZ-2398. Flaky test: TestFaultTolerance
@@ -464,6 +466,7 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
TEZ-2398. Flaky test: TestFaultTolerance
TEZ-2808. Race condition between preemption and container assignment
TEZ-2834. Make Tez preemption resilient to incorrect free resource reported
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/client/CallerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/CallerContext.java b/tez-api/src/main/java/org/apache/tez/client/CallerContext.java
new file mode 100644
index 0000000..ba68851
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/CallerContext.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+import com.google.common.base.Preconditions;
+
+@Public
+@Unstable
+public class CallerContext {
+
+ /**
+ * Context in which Tez is being invoked.
+ * For example, HIVE or PIG.
+ */
+ private String context;
+
+ /**
+ * Type of the caller. Should ideally be used along with callerId to uniquely identify the caller.
+ * When used with YARN Timeline, this should map to the Timeline Entity Type.
+ * For example, HIVE_QUERY_ID.
+ */
+ private String callerType;
+
+ /**
+ * Caller ID.
+ * An ID to uniquely identify the caller within the callerType namespace
+ */
+ private String callerId;
+
+ /**
+ * Free-form text or a json-representation of relevant meta-data.
+ * This can be used to describe the work being done. For example, for Hive,
+ * this could be the Hive query text.
+ */
+ private String blob;
+
+ /**
+ * Private Constructor
+ */
+ private CallerContext() {
+ }
+
+ /**
+ * Instantiate the Caller Context
+ * @param context Context in which Tez is being invoked. For example, HIVE or PIG.
+ * @param callerId Caller ID. An ID to uniquely identifier the caller within the callerType
+ * namespace
+ * @param callerType Type of the caller. Should ideally be used along with callerId to uniquely
+ * identify the caller. When used with YARN Timeline, this should map to
+ * the Timeline Entity Type. For example, HIVE_QUERY_ID.
+ * @param blob Free-form text or a json-representation of relevant meta-data.
+ * This can be used to describe the work being done. For example, for Hive,
+ * this could be the Hive query text.
+ * @return CallerContext
+ */
+ public static CallerContext create(String context, String callerId,
+ String callerType, @Nullable String blob) {
+ return new CallerContext(context, callerId, callerType, blob);
+ }
+
+ /**
+ * Instantiate the Caller Context
+ * @param context Context in which Tez is being invoked. For example, HIVE or PIG.
+ * @param blob Free-form text or a json-representation of relevant meta-data.
+ * This can be used to describe the work being done. For example, for Hive,
+ * this could be the Hive query text.
+ * @return CallerContext
+ */
+ @Private
+ public static CallerContext create(String context, @Nullable String blob) {
+ return new CallerContext(context, blob);
+ }
+
+
+ private CallerContext(String context, String callerId, String callerType,
+ @Nullable String blob) {
+ if (callerId != null || callerType != null) {
+ setCallerIdAndType(callerId, callerType);
+ }
+ setContext(context);
+ setBlob(blob);
+ }
+
+ private CallerContext(String context, @Nullable String blob) {
+ setContext(context);
+ setBlob(blob);
+ }
+
+ public String getCallerType() {
+ return callerType;
+ }
+
+ public String getCallerId() {
+ return callerId;
+ }
+
+ public String getBlob() {
+ return blob;
+ }
+
+ public String getContext() {
+ return context;
+ }
+
+ /**
+ * @param context Context in which Tez is being invoked. For example, HIVE or PIG.
+ */
+ public CallerContext setContext(String context) {
+ Preconditions.checkArgument(context != null && !context.isEmpty(),
+ "Context cannot be null or empty");
+ this.context = context;
+ return this;
+ }
+
+ /**
+ * @param callerId Caller ID. An ID to uniquely identifier the caller within the callerType
+ * namespace
+ * @param callerType Type of the caller. Should ideally be used along with callerId to uniquely
+ * identify the caller. When used with YARN Timeline, this should map to
+ * the Timeline Entity Type. For example, HIVE_QUERY_ID.
+ */
+ public CallerContext setCallerIdAndType(String callerId, String callerType) {
+ Preconditions.checkArgument(callerType != null && !callerType.isEmpty()
+ && callerId != null && !callerId.isEmpty(),
+ "Caller Id and Caller Type cannot be null or empty");
+ this.callerType = callerType;
+ this.callerId = callerId;
+ return this;
+ }
+
+ /**
+ * @param blob Free-form text or a json-representation of relevant meta-data.
+ * This can be used to describe the work being done. For example, for Hive,
+ * this could be the Hive query text.
+ */
+ public CallerContext setBlob(@Nullable String blob) {
+ this.blob = blob;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "context=" + context
+ + ", callerType=" + callerType
+ + ", callerId=" + callerId
+ + ", blob=" + blob;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index f786a4e..7204943 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -45,6 +45,8 @@ public class ATSConstants {
public static final String NODE_ID = "nodeId";
public static final String NODE_HTTP_ADDRESS = "nodeHttpAddress";
public static final String USER = "user";
+ public static final String CALLER_CONTEXT_ID = "callerId";
+ public static final String CALLER_CONTEXT_TYPE = "callerType";
/* Keys used in other info */
public static final String APP_SUBMIT_TIME = "appSubmitTime";
@@ -108,7 +110,7 @@ public class ATSConstants {
"yarn.timeline-service.webapp.https.address";
/* History text related Keys */
- public static final String DESCRIPTION = "desc";
+ public static final String DESC = "desc";
public static final String CONFIG = "config";
public static final String TEZ_VERSION = "tezVersion";
@@ -116,4 +118,10 @@ public class ATSConstants {
public static final String REVISION = "revision";
public static final String BUILD_TIME = "buildTime";
+ /* Caller Context Related Keys */
+ public static final String CONTEXT = "context";
+ public static final String CALLER_ID = "callerId";
+ public static final String CALLER_TYPE = "callerType";
+ public static final String DESCRIPTION = "description";
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
index 8c2f118..93d373b 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
@@ -162,7 +162,7 @@ public class TezUtils {
JSONObject jsonObject = new JSONObject();
try {
if (description != null && !description.isEmpty()) {
- jsonObject.put(ATSConstants.DESCRIPTION, description);
+ jsonObject.put(ATSConstants.DESC, description);
}
if (conf != null) {
JSONObject confJson = new JSONObject();
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index ad656cd..68b6d52 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -33,9 +33,11 @@ import java.util.Stack;
import org.apache.commons.collections4.BidiMap;
import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.client.CallerContext;
import org.apache.tez.common.JavaOptsChecker;
import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.slf4j.Logger;
@@ -95,6 +97,7 @@ public class DAG {
private DAGAccessControls dagAccessControls;
Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
String dagInfo;
+ CallerContext callerContext;
private Map<String,String> dagConf = new HashMap<String, String>();
private VertexExecutionContext defaultExecutionContext;
@@ -170,12 +173,25 @@ public class DAG {
* In the case of Hive, this could be the SQL query text.
* @return {@link DAG}
*/
+ @Deprecated
public synchronized DAG setDAGInfo(String dagInfo) {
Preconditions.checkNotNull(dagInfo);
this.dagInfo = dagInfo;
return this;
}
+
+ /**
+ * Set the Context in which Tez is being called.
+ * @param callerContext Caller Context
+ * @return {@link DAG}
+ */
+ public synchronized DAG setCallerContext(CallerContext callerContext) {
+ Preconditions.checkNotNull(callerContext);
+ this.callerContext = callerContext;
+ return this;
+ }
+
/**
* Create a group of vertices that share a common output. This can be used to implement
* unions efficiently.
@@ -730,6 +746,10 @@ public class DAG {
DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
dagBuilder.setName(this.name);
+
+ if (this.callerContext != null) {
+ dagBuilder.setCallerContext(DagTypeConverters.convertCallerContextToProto(callerContext));
+ }
if (this.dagInfo != null && !this.dagInfo.isEmpty()) {
dagBuilder.setDagInfo(this.dagInfo);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 2823a86..5733da8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezAppMasterStatus;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.CounterGroup;
@@ -57,6 +58,7 @@ import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
@@ -523,7 +525,7 @@ public class DagTypeConverters {
PlanLocalResourcesProto.newBuilder();
for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) {
PlanLocalResource plr = convertLocalResourceToPlanLocalResource(
- entry.getKey(), entry.getValue());
+ entry.getKey(), entry.getValue());
builder.addLocalResources(plr);
}
return builder.build();
@@ -837,4 +839,28 @@ public class DagTypeConverters {
return pluginDescriptorBuilder.build();
}
+ public static CallerContextProto convertCallerContextToProto(CallerContext callerContext) {
+ CallerContextProto.Builder callerContextBuilder = CallerContextProto.newBuilder();
+ callerContextBuilder.setContext(callerContext.getContext());
+ if (callerContext.getCallerId() != null) {
+ callerContextBuilder.setCallerId(callerContext.getCallerId());
+ }
+ if (callerContext.getCallerType() != null) {
+ callerContextBuilder.setCallerType(callerContext.getCallerType());
+ }
+ if (callerContext.getBlob() != null) {
+ callerContextBuilder.setBlob(callerContext.getBlob());
+ }
+ return callerContextBuilder.build();
+ }
+
+ public static CallerContext convertCallerContextFromProto(CallerContextProto proto) {
+ CallerContext callerContext = CallerContext.create(proto.getContext(),
+ (proto.hasBlob() ? proto.getBlob() : null));
+ if (proto.hasCallerType() && proto.hasCallerId()) {
+ callerContext.setCallerIdAndType(proto.getCallerId(), proto.getCallerType());
+ }
+ return callerContext;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 193f7b8..d016d60 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -190,6 +190,13 @@ message ConfigurationProto {
optional AMPluginDescriptorProto am_plugin_descriptor = 2;
}
+message CallerContextProto {
+ optional string context = 1;
+ optional string callerType = 2;
+ optional string callerId = 3;
+ optional string blob = 4;
+}
+
message DAGPlan {
required string name = 1;
repeated VertexPlan vertex = 2;
@@ -200,6 +207,7 @@ message DAGPlan {
repeated PlanLocalResource local_resource = 7;
optional string dag_info = 8;
optional VertexExecutionContextProto default_execution_context = 9;
+ optional CallerContextProto caller_context = 10;
}
// DAG monitoring messages
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index 268267b..24c20b5 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.api;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.CallerContext;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
@@ -308,4 +309,27 @@ public class TestDAG {
}
}
+ @Test
+ public void testCallerContext() {
+ DAG dag = DAG.create("dag1");
+ try {
+ CallerContext callerContext = CallerContext.create("ctxt", "", "", "desc");
+ Assert.fail("Expected failure for invalid args");
+ } catch (Exception e) {
+ // Expected
+ }
+ try {
+ CallerContext callerContext = CallerContext.create("", "desc");
+ Assert.fail("Expected failure for invalid args");
+ } catch (Exception e) {
+ // Expected
+ }
+
+ CallerContext callerContext;
+ callerContext = CallerContext.create("ctxt", "a", "a", "desc");
+ callerContext = CallerContext.create("ctxt", "desc");
+ callerContext = CallerContext.create("ctxt", null);
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index d39c47f..c88fa67 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -168,7 +168,7 @@ public class TestTezUtils {
JSONObject jsonObject = new JSONObject(confToJson);
- Assert.assertFalse(jsonObject.has(ATSConstants.DESCRIPTION));
+ Assert.assertFalse(jsonObject.has(ATSConstants.DESC));
Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
JSONObject confObject = jsonObject.getJSONObject(ATSConstants.CONFIG);
@@ -178,8 +178,8 @@ public class TestTezUtils {
confToJson = TezUtils.convertToHistoryText(desc, conf);
jsonObject = new JSONObject(confToJson);
- Assert.assertTrue(jsonObject.has(ATSConstants.DESCRIPTION));
- String descFromJson = jsonObject.getString(ATSConstants.DESCRIPTION);
+ Assert.assertTrue(jsonObject.has(ATSConstants.DESC));
+ String descFromJson = jsonObject.getString(ATSConstants.DESC);
Assert.assertEquals(desc, descFromJson);
Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
@@ -201,7 +201,7 @@ public class TestTezUtils {
JSONObject jsonObject = new JSONObject(confToJson);
- Assert.assertFalse(jsonObject.has(ATSConstants.DESCRIPTION));
+ Assert.assertFalse(jsonObject.has(ATSConstants.DESC));
Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
JSONObject confObject = jsonObject.getJSONObject(ATSConstants.CONFIG);
@@ -213,8 +213,8 @@ public class TestTezUtils {
confToJson = TezUtils.convertToHistoryText(desc, conf);
jsonObject = new JSONObject(confToJson);
- Assert.assertTrue(jsonObject.has(ATSConstants.DESCRIPTION));
- String descFromJson = jsonObject.getString(ATSConstants.DESCRIPTION);
+ Assert.assertTrue(jsonObject.has(ATSConstants.DESC));
+ String descFromJson = jsonObject.getString(ATSConstants.DESC);
Assert.assertEquals(desc, descFromJson);
Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index e41d59c..e165397 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -63,6 +63,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
+import org.apache.tez.client.CallerContext;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.SessionNotRunning;
@@ -2303,7 +2304,14 @@ public class DAGAppMaster extends AbstractService {
cumulativeAdditionalResources.putAll(lrDiff);
}
- LOG.info("Running DAG: " + dagPlan.getName());
+ String callerContextStr = "";
+ if (dagPlan.hasCallerContext()) {
+ CallerContext callerContext = DagTypeConverters.convertCallerContextFromProto(
+ dagPlan.getCallerContext());
+ callerContextStr = ", callerContext=" + callerContext.toString();
+ }
+ LOG.info("Running DAG: " + dagPlan.getName() + callerContextStr);
+
String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
System.err.println(timeStamp + " Running Dag: " + newDAG.getID());
System.out.println(timeStamp + " Running Dag: "+ newDAG.getID());
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 649eb61..bf63045 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -483,6 +483,15 @@ public class HistoryEventJsonConversion {
JSONObject primaryFilters = new JSONObject();
primaryFilters.put(ATSConstants.DAG_NAME,
event.getDAGName());
+ if (event.getDAGPlan().hasCallerContext()
+ && event.getDAGPlan().getCallerContext().hasCallerId()
+ && event.getDAGPlan().getCallerContext().hasCallerType()) {
+ primaryFilters.put(ATSConstants.CALLER_CONTEXT_ID,
+ event.getDAGPlan().getCallerContext().getCallerId());
+ primaryFilters.put(ATSConstants.CALLER_CONTEXT_TYPE,
+ event.getDAGPlan().getCallerContext().getCallerType());
+ }
+
jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
// TODO decide whether this goes into different events,
@@ -499,6 +508,14 @@ public class HistoryEventJsonConversion {
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.DAG_PLAN,
DAGUtils.generateSimpleJSONPlan(event.getDAGPlan()));
+ if (event.getDAGPlan().hasCallerContext()
+ && event.getDAGPlan().getCallerContext().hasCallerId()
+ && event.getDAGPlan().getCallerContext().hasCallerType()) {
+ otherInfo.put(ATSConstants.CALLER_CONTEXT_ID,
+ event.getDAGPlan().getCallerContext().getCallerId());
+ otherInfo.put(ATSConstants.CALLER_CONTEXT_TYPE,
+ event.getDAGPlan().getCallerContext().getCallerType());
+ }
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 76e592e..781120c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -49,10 +49,13 @@ import org.apache.tez.dag.records.TezTaskID;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
+import com.google.common.base.Preconditions;
+
public class DAGUtils {
public static final String DAG_NAME_KEY = "dagName";
public static final String DAG_INFO_KEY = "dagInfo";
+ public static final String DAG_CONTEXT_KEY = "dagContext";
public static final String VERTICES_KEY = "vertices";
public static final String EDGES_KEY = "edges";
public static final String VERTEX_GROUPS_KEY = "vertexGroups";
@@ -165,15 +168,34 @@ public class DAGUtils {
return object;
}
+ static Map<String, String> createDagInfoMap(DAGPlan dagPlan) {
+ Preconditions.checkArgument(dagPlan.hasCallerContext());
+ Map<String, String> dagInfo = new TreeMap<String, String>();
+ dagInfo.put(ATSConstants.CONTEXT, dagPlan.getCallerContext().getContext());
+ if (dagPlan.getCallerContext().hasCallerId()) {
+ dagInfo.put(ATSConstants.CALLER_ID, dagPlan.getCallerContext().getCallerId());
+ }
+ if (dagPlan.getCallerContext().hasCallerType()) {
+ dagInfo.put(ATSConstants.CALLER_TYPE, dagPlan.getCallerContext().getCallerType());
+ }
+ if (dagPlan.getCallerContext().hasBlob()) {
+ dagInfo.put(ATSConstants.DESCRIPTION, dagPlan.getCallerContext().getBlob());
+ }
+ return dagInfo;
+ }
+
public static Map<String,Object> convertDAGPlanToATSMap(DAGPlan dagPlan) throws IOException {
final String VERSION_KEY = "version";
- final int version = 1;
+ final int version = 2;
Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
dagMap.put(DAG_NAME_KEY, dagPlan.getName());
if (dagPlan.hasDagInfo()) {
dagMap.put(DAG_INFO_KEY, dagPlan.getDagInfo());
}
+ if (dagPlan.hasCallerContext()) {
+ dagMap.put(DAG_CONTEXT_KEY, createDagInfoMap(dagPlan));
+ }
dagMap.put(VERSION_KEY, version);
ArrayList<Object> verticesList = new ArrayList<Object>();
for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index cb7e0c8..4d4577a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -28,6 +28,8 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.CallerContext;
+import org.apache.tez.common.ATSConstants;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -53,6 +55,7 @@ import com.google.common.collect.Sets;
public class TestDAGUtils {
+ @SuppressWarnings("deprecation")
private DAGPlan createDAG() {
// Create a plan with 3 vertices: A, B, C. Group(A,B)->C
Configuration conf = new Configuration(false);
@@ -71,6 +74,7 @@ public class TestDAGUtils {
dummyTaskCount, dummyTaskResource);
DAG dag = DAG.create("testDag");
+ dag.setCallerContext(CallerContext.create("context1", "callerId1", "callerType1", "desc1"));
dag.setDAGInfo("dagInfo");
String groupName1 = "uv12";
org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
@@ -113,10 +117,17 @@ public class TestDAGUtils {
Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY));
Assert.assertEquals("testDag", atsMap.get(DAGUtils.DAG_NAME_KEY));
Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_INFO_KEY));
+ Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_CONTEXT_KEY));
+ Map<String, String> contextMap = (Map<String, String>)atsMap.get(DAGUtils.DAG_CONTEXT_KEY);
+ Assert.assertEquals("context1", contextMap.get(ATSConstants.CONTEXT));
+ Assert.assertEquals("callerId1", contextMap.get(ATSConstants.CALLER_ID));
+ Assert.assertEquals("callerType1", contextMap.get(ATSConstants.CALLER_TYPE));
+ Assert.assertEquals("desc1", contextMap.get(ATSConstants.DESCRIPTION));
+
Assert.assertEquals("dagInfo", atsMap.get(DAGUtils.DAG_INFO_KEY));
Assert.assertEquals(dagPlan.getName(), atsMap.get(DAGUtils.DAG_NAME_KEY));
Assert.assertTrue(atsMap.containsKey("version"));
- Assert.assertEquals(1, atsMap.get("version"));
+ Assert.assertEquals(2, atsMap.get("version"));
Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTICES_KEY));
Assert.assertTrue(atsMap.containsKey(DAGUtils.EDGES_KEY));
Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTEX_GROUPS_KEY));
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
index 5e89fdc..fff7c1b 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
@@ -20,6 +20,7 @@ package org.apache.tez.examples;
import java.io.IOException;
+import org.apache.tez.client.CallerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index fb33612..c88c833 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -27,6 +27,8 @@ import java.util.Set;
import com.google.common.collect.Sets;
import org.apache.commons.cli.Options;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.client.CallerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -138,6 +140,14 @@ public abstract class TezExampleBase extends Configured implements Tool {
public int runDag(DAG dag, boolean printCounters, Logger logger) throws TezException,
InterruptedException, IOException {
tezClientInternal.waitTillReady();
+
+ CallerContext callerContext = CallerContext.create("TezExamples",
+ "Tez Example DAG: " + dag.getName());
+ ApplicationId appId = tezClientInternal.getAppMasterApplicationId();
+ if (appId != null) {
+ callerContext.setCallerIdAndType(appId.toString(), "TezExampleApplication");
+ }
+
DAGClient dagClient = tezClientInternal.submitDAG(dag);
Set<StatusGetOpts> getOpts = Sets.newHashSet();
if (printCounters) {
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
index 3a24f15..dce79e2 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
@@ -50,10 +50,15 @@ public class Constants extends ATSConstants {
public static final String INITIALIZER = "initializer";
public static final String USER_PAYLOAD_TEXT = "userPayloadAsText";
+ public static final String DAG_CONTEXT = "dagContext";
+
//constants for ATS data export
public static final String DAG = "dag";
public static final String VERTICES = "vertices";
public static final String TASKS = "tasks";
public static final String TASK_ATTEMPTS = "task_attempts";
public static final String APPLICATION = "application";
+
+
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
index 5ea94d6..5fb760c 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
@@ -33,6 +33,7 @@ import org.apache.commons.collections.bidimap.DualHashBidiMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringInterner;
+import org.apache.tez.client.CallerContext;
import org.apache.tez.dag.api.event.VertexState;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
@@ -65,6 +66,7 @@ public class DagInfo extends BaseInfo {
private final String status;
private final String diagnostics;
private VersionInfo versionInfo;
+ private CallerContext callerContext;
//VertexID --> VertexName & vice versa
private final BidiMap vertexNameIDMapping;
@@ -135,10 +137,34 @@ public class DagInfo extends BaseInfo {
}
private void parseDAGPlan(JSONObject dagPlan) throws JSONException {
+ int version = dagPlan.optInt(Constants.VERSION, 1);
parseEdges(dagPlan.optJSONArray(Constants.EDGES));
JSONArray verticesInfo = dagPlan.optJSONArray(Constants.VERTICES);
parseBasicVertexInfo(verticesInfo);
+
+ if (version > 1) {
+ parseDAGContext(dagPlan.optJSONObject(Constants.DAG_CONTEXT));
+ }
+ }
+
+ private void parseDAGContext(JSONObject callerContextInfo) {
+ if (callerContextInfo == null) {
+ LOG.info("No DAG Caller Context available");
+ return;
+ }
+ String context = callerContextInfo.optString(Constants.CONTEXT);
+ String callerId = callerContextInfo.optString(Constants.CALLER_ID);
+ String callerType = callerContextInfo.optString(Constants.CALLER_TYPE);
+ String description = callerContextInfo.optString(Constants.DESCRIPTION);
+
+ this.callerContext = CallerContext.create(context, description);
+ if (callerId != null && !callerId.isEmpty() && callerType != null && !callerType.isEmpty()) {
+ this.callerContext.setCallerIdAndType(callerId, callerType);
+ } else {
+ LOG.info("No DAG Caller Context Id and Type available");
+ }
+
}
private void parseBasicVertexInfo(JSONArray verticesInfo) throws JSONException {
@@ -329,6 +355,10 @@ public class DagInfo extends BaseInfo {
return versionInfo;
}
+ public final CallerContext getCallerContext() {
+ return callerContext;
+ }
+
public final String getName() {
return name;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
index a1b0ba6..dedd9ef 100644
--- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TaskCounter;
@@ -645,6 +647,16 @@ public class TestHistoryParser {
Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty()));
TezClient tezClient = getTezClient(withTimeline);
+
+ // Update Caller Context
+ CallerContext callerContext = CallerContext.create("TezExamples", "Tez WordCount Example Job");
+ ApplicationId appId = tezClient.getAppMasterApplicationId();
+ if (appId == null) {
+ appId = ApplicationId.newInstance(1001l, 1);
+ }
+ callerContext.setCallerIdAndType(appId.toString(), "TezApplication");
+ dag.setCallerContext(callerContext);
+
DAGClient client = tezClient.submitDAG(dag);
client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), 1);
@@ -690,6 +702,12 @@ public class TestHistoryParser {
assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime());
assertTrue(dagInfo.getTimeTaken() > 0);
+ assertNotNull(dagInfo.getCallerContext());
+ assertEquals("TezExamples", dagInfo.getCallerContext().getContext());
+ assertEquals("Tez WordCount Example Job", dagInfo.getCallerContext().getBlob());
+ assertNotNull(dagInfo.getCallerContext().getCallerId());
+ assertEquals("TezApplication", dagInfo.getCallerContext().getCallerType());
+
//Verify all vertices
for (VertexInfo vertexInfo : dagInfo.getVertices()) {
verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0);
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 77b00c4..0d6cbcb 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -248,7 +248,7 @@ public class HistoryEventTimelineConversion {
event.getApplicationAttemptId().getApplicationId().toString());
atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID,
- event.getContainerId().toString());
+ event.getContainerId().toString());
atsEntity.setStartTime(event.getLaunchTime());
TimelineEvent launchEvt = new TimelineEvent();
@@ -391,6 +391,15 @@ public class HistoryEventTimelineConversion {
atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
event.getDagID().getApplicationId().toString());
+ if (event.getDAGPlan().hasCallerContext()
+ && event.getDAGPlan().getCallerContext().hasCallerId()
+ && event.getDAGPlan().getCallerContext().hasCallerType()) {
+ atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_ID,
+ event.getDAGPlan().getCallerContext().getCallerId());
+ atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_TYPE,
+ event.getDAGPlan().getCallerContext().getCallerType());
+ }
+
try {
atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
@@ -405,6 +414,14 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION);
atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL + "_"
+ event.getApplicationAttemptId().getAttemptId(), event.getContainerLogs());
+ if (event.getDAGPlan().hasCallerContext()
+ && event.getDAGPlan().getCallerContext().hasCallerId()
+ && event.getDAGPlan().getCallerContext().hasCallerType()) {
+ atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_ID,
+ event.getDAGPlan().getCallerContext().getCallerId());
+ atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_TYPE,
+ event.getDAGPlan().getCallerContext().getCallerType());
+ }
return atsEntity;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 4245be3..0ad1b43 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.client.CallerContext;
import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.VersionInfo;
import org.apache.tez.common.counters.TezCounters;
@@ -43,6 +44,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.VertexState;
@@ -112,7 +114,13 @@ public class TestHistoryEventTimelineConversion {
tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
- dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+ CallerContextProto.Builder callerContextProto = CallerContextProto.newBuilder();
+ callerContextProto.setContext("ctxt");
+ callerContextProto.setCallerId("Caller_ID");
+ callerContextProto.setCallerType("Caller_Type");
+ callerContextProto.setBlob("Desc_1");
+ dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock")
+ .setCallerContext(callerContextProto).build();
containerId = ContainerId.newInstance(applicationAttemptId, 111);
nodeId = NodeId.newInstance("node", 13435);
}
@@ -425,18 +433,24 @@ public class TestHistoryEventTimelineConversion {
Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue());
- Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+ Assert.assertEquals(5, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(
dagPlan.getName()));
Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.CALLER_CONTEXT_ID).contains(
+ dagPlan.getCallerContext().getCallerId()));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.CALLER_CONTEXT_TYPE).contains(
+ dagPlan.getCallerContext().getCallerType()));
+ Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
applicationAttemptId.getApplicationId().toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
- Assert.assertEquals(6, timelineEntity.getOtherInfo().size());
+ Assert.assertEquals(8, timelineEntity.getOtherInfo().size());
Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN));
Assert.assertEquals(applicationId.toString(),
timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
@@ -451,6 +465,14 @@ public class TestHistoryEventTimelineConversion {
Assert.assertEquals(containerLogs,
timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL + "_"
+ applicationAttemptId.getAttemptId()));
+ Assert.assertEquals(
+ timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_ID),
+ dagPlan.getCallerContext().getCallerId());
+ Assert.assertEquals(
+ timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE),
+ dagPlan.getCallerContext().getCallerType());
+
+
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index eb56795..6966e8d 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
@@ -276,7 +277,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
vertices.add(finalReduceVertex);
DAG dag = DAG.create("OrderedWordCount" + dagIndex);
- dag.setDAGInfo("{ \"context\": \"Tez\", \"description\": \"TestOrderedWordCount Job\" }");
+ dag.setCallerContext(CallerContext.create("Tez", "TestOrderedWordCount Job"));
for (int i = 0; i < vertices.size(); ++i) {
dag.addVertex(vertices.get(i));
}