You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/09/29 20:45:54 UTC

tez git commit: TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. (hitesh)

Repository: tez
Updated Branches:
  refs/heads/master 83a3c989b -> 774444312


TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/77444431
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/77444431
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/77444431

Branch: refs/heads/master
Commit: 774444312f8ea586939cb85c140c94251162e731
Parents: 83a3c98
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Sep 29 11:45:36 2015 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Sep 29 11:45:36 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../org/apache/tez/client/CallerContext.java    | 171 +++++++++++++++++++
 .../org/apache/tez/common/ATSConstants.java     |  10 +-
 .../java/org/apache/tez/common/TezUtils.java    |   2 +-
 .../main/java/org/apache/tez/dag/api/DAG.java   |  20 +++
 .../apache/tez/dag/api/DagTypeConverters.java   |  28 ++-
 tez-api/src/main/proto/DAGApiRecords.proto      |   8 +
 .../java/org/apache/tez/dag/api/TestDAG.java    |  24 +++
 .../org/apache/tez/common/TestTezUtils.java     |  12 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  10 +-
 .../impl/HistoryEventJsonConversion.java        |  17 ++
 .../apache/tez/dag/history/utils/DAGUtils.java  |  24 ++-
 .../tez/dag/history/utils/TestDAGUtils.java     |  13 +-
 .../apache/tez/examples/OrderedWordCount.java   |   1 +
 .../org/apache/tez/examples/TezExampleBase.java |  10 ++
 .../tez/history/parser/datamodel/Constants.java |   5 +
 .../tez/history/parser/datamodel/DagInfo.java   |  30 ++++
 .../apache/tez/history/TestHistoryParser.java   |  18 ++
 .../ats/HistoryEventTimelineConversion.java     |  19 ++-
 .../ats/TestHistoryEventTimelineConversion.java |  28 ++-
 .../examples/TestOrderedWordCount.java          |   3 +-
 21 files changed, 439 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f65cfda..d5e8141 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
   TEZ-2859. TestMergeManager.testLocalDiskMergeMultipleTasks failing
   TEZ-2858. Stop using System.currentTimeMillis in TestInputReadyTracker.
   TEZ-2857. Fix flakey tests in TestDAGImpl.
@@ -194,6 +195,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES
+  TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
   TEZ-2858. Stop using System.currentTimeMillis in TestInputReadyTracker.
   TEZ-2857. Fix flakey tests in TestDAGImpl.
   TEZ-2398. Flaky test: TestFaultTolerance
@@ -464,6 +466,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
   TEZ-2398. Flaky test: TestFaultTolerance
   TEZ-2808. Race condition between preemption and container assignment
   TEZ-2834. Make Tez preemption resilient to incorrect free resource reported

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/client/CallerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/CallerContext.java b/tez-api/src/main/java/org/apache/tez/client/CallerContext.java
new file mode 100644
index 0000000..ba68851
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/CallerContext.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+import com.google.common.base.Preconditions;
+
+@Public
+@Unstable
+public class CallerContext {
+
+  /**
+   * Context in which Tez is being invoked.
+   * For example, HIVE or PIG.
+   */
+  private String context;
+
+  /**
+   * Type of the caller. Should ideally be used along with callerId to uniquely identify the caller.
+   * When used with YARN Timeline, this should map to the Timeline Entity Type.
+   * For example, HIVE_QUERY_ID.
+   */
+  private String callerType;
+
+  /**
+   * Caller ID.
+   * An ID to uniquely identify the caller within the callerType namespace
+   */
+  private String callerId;
+
+  /**
+   * Free-form text or a json-representation of relevant meta-data.
+   * This can be used to describe the work being done. For example, for Hive,
+   * this could be the Hive query text.
+   */
+  private String blob;
+
+  /**
+   * Private Constructor
+   */
+  private CallerContext() {
+  }
+
+  /**
+   * Instantiate the Caller Context
+   * @param context Context in which Tez is being invoked. For example, HIVE or PIG.
+   * @param callerId Caller ID. An ID to uniquely identifier the caller within the callerType
+   *                 namespace
+   * @param callerType Type of the caller. Should ideally be used along with callerId to uniquely
+   *                   identify the caller. When used with YARN Timeline, this should map to
+   *                   the Timeline Entity Type. For example, HIVE_QUERY_ID.
+   * @param blob Free-form text or a json-representation of relevant meta-data.
+   *             This can be used to describe the work being done. For example, for Hive,
+   *             this could be the Hive query text.
+   * @return CallerContext
+   */
+  public static CallerContext create(String context, String callerId,
+      String callerType, @Nullable String blob) {
+    return new CallerContext(context, callerId, callerType, blob);
+  }
+
+  /**
+   * Instantiate the Caller Context
+   * @param context Context in which Tez is being invoked. For example, HIVE or PIG.
+   * @param blob Free-form text or a json-representation of relevant meta-data.
+   *             This can be used to describe the work being done. For example, for Hive,
+   *             this could be the Hive query text.
+   * @return CallerContext
+   */
+  @Private
+  public static CallerContext create(String context, @Nullable String blob) {
+    return new CallerContext(context, blob);
+  }
+
+
+  private CallerContext(String context, String callerId, String callerType,
+      @Nullable String blob) {
+    if (callerId != null || callerType != null) {
+      setCallerIdAndType(callerId, callerType);
+    }
+    setContext(context);
+    setBlob(blob);
+  }
+
+  private CallerContext(String context, @Nullable String blob) {
+    setContext(context);
+    setBlob(blob);
+  }
+
+  public String getCallerType() {
+    return callerType;
+  }
+
+  public String getCallerId() {
+    return callerId;
+  }
+
+  public String getBlob() {
+    return blob;
+  }
+
+  public String getContext() {
+    return context;
+  }
+
+  /**
+   * @param context Context in which Tez is being invoked. For example, HIVE or PIG.
+   */
+  public CallerContext setContext(String context) {
+    Preconditions.checkArgument(context != null && !context.isEmpty(),
+        "Context cannot be null or empty");
+    this.context = context;
+    return this;
+  }
+
+  /**
+   * @param callerId Caller ID. An ID to uniquely identifier the caller within the callerType
+   *                 namespace
+   * @param callerType Type of the caller. Should ideally be used along with callerId to uniquely
+   *                   identify the caller. When used with YARN Timeline, this should map to
+   *                   the Timeline Entity Type. For example, HIVE_QUERY_ID.
+   */
+  public CallerContext setCallerIdAndType(String callerId, String callerType) {
+    Preconditions.checkArgument(callerType != null && !callerType.isEmpty()
+        && callerId != null && !callerId.isEmpty(),
+        "Caller Id and Caller Type cannot be null or empty");
+    this.callerType = callerType;
+    this.callerId = callerId;
+    return this;
+  }
+
+  /**
+   * @param blob Free-form text or a json-representation of relevant meta-data.
+   *             This can be used to describe the work being done. For example, for Hive,
+   *             this could be the Hive query text.
+   */
+  public CallerContext setBlob(@Nullable String blob) {
+    this.blob = blob;
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return "context=" + context
+        + ", callerType=" + callerType
+        + ", callerId=" + callerId
+        + ", blob=" + blob;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index f786a4e..7204943 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -45,6 +45,8 @@ public class ATSConstants {
   public static final String NODE_ID = "nodeId";
   public static final String NODE_HTTP_ADDRESS = "nodeHttpAddress";
   public static final String USER = "user";
+  public static final String CALLER_CONTEXT_ID = "callerId";
+  public static final String CALLER_CONTEXT_TYPE = "callerType";
 
   /* Keys used in other info */
   public static final String APP_SUBMIT_TIME = "appSubmitTime";
@@ -108,7 +110,7 @@ public class ATSConstants {
       "yarn.timeline-service.webapp.https.address";
 
   /* History text related Keys */
-  public static final String DESCRIPTION = "desc";
+  public static final String DESC = "desc";
   public static final String CONFIG = "config";
 
   public static final String TEZ_VERSION = "tezVersion";
@@ -116,4 +118,10 @@ public class ATSConstants {
   public static final String REVISION = "revision";
   public static final String BUILD_TIME = "buildTime";
 
+  /* Caller Context Related Keys */
+  public static final String CONTEXT = "context";
+  public static final String CALLER_ID = "callerId";
+  public static final String CALLER_TYPE = "callerType";
+  public static final String DESCRIPTION = "description";
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
index 8c2f118..93d373b 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
@@ -162,7 +162,7 @@ public class TezUtils {
     JSONObject jsonObject = new JSONObject();
     try {
       if (description != null && !description.isEmpty()) {
-        jsonObject.put(ATSConstants.DESCRIPTION, description);
+        jsonObject.put(ATSConstants.DESC, description);
       }
       if (conf != null) {
         JSONObject confJson = new JSONObject();

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index ad656cd..68b6d52 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -33,9 +33,11 @@ import java.util.Stack;
 import org.apache.commons.collections4.BidiMap;
 import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.client.CallerContext;
 import org.apache.tez.common.JavaOptsChecker;
 import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
 import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 import org.slf4j.Logger;
@@ -95,6 +97,7 @@ public class DAG {
   private DAGAccessControls dagAccessControls;
   Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
   String dagInfo;
+  CallerContext callerContext;
   private Map<String,String> dagConf = new HashMap<String, String>();
   private VertexExecutionContext defaultExecutionContext;
 
@@ -170,12 +173,25 @@ public class DAG {
    *                                In the case of Hive, this could be the SQL query text.
    * @return {@link DAG}
    */
+  @Deprecated
   public synchronized DAG setDAGInfo(String dagInfo) {
     Preconditions.checkNotNull(dagInfo);
     this.dagInfo = dagInfo;
     return this;
   }
 
+
+  /**
+   * Set the Context in which Tez is being called.
+   * @param callerContext Caller Context
+   * @return {@link DAG}
+   */
+  public synchronized DAG setCallerContext(CallerContext callerContext) {
+    Preconditions.checkNotNull(callerContext);
+    this.callerContext = callerContext;
+    return this;
+  }
+
   /**
    * Create a group of vertices that share a common output. This can be used to implement 
    * unions efficiently.
@@ -730,6 +746,10 @@ public class DAG {
 
     DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
     dagBuilder.setName(this.name);
+
+    if (this.callerContext != null) {
+      dagBuilder.setCallerContext(DagTypeConverters.convertCallerContextToProto(callerContext));
+    }
     if (this.dagInfo != null && !this.dagInfo.isEmpty()) {
       dagBuilder.setDagInfo(this.dagInfo);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 2823a86..5733da8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.CallerContext;
 import org.apache.tez.client.TezAppMasterStatus;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.counters.CounterGroup;
@@ -57,6 +58,7 @@ import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
@@ -523,7 +525,7 @@ public class DagTypeConverters {
       PlanLocalResourcesProto.newBuilder();
     for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) {
       PlanLocalResource plr = convertLocalResourceToPlanLocalResource(
-        entry.getKey(), entry.getValue());
+          entry.getKey(), entry.getValue());
       builder.addLocalResources(plr);
     }
     return builder.build();
@@ -837,4 +839,28 @@ public class DagTypeConverters {
     return pluginDescriptorBuilder.build();
   }
 
+  public static CallerContextProto convertCallerContextToProto(CallerContext callerContext) {
+    CallerContextProto.Builder callerContextBuilder = CallerContextProto.newBuilder();
+    callerContextBuilder.setContext(callerContext.getContext());
+    if (callerContext.getCallerId() != null) {
+      callerContextBuilder.setCallerId(callerContext.getCallerId());
+    }
+    if (callerContext.getCallerType() != null) {
+      callerContextBuilder.setCallerType(callerContext.getCallerType());
+    }
+    if (callerContext.getBlob() != null) {
+      callerContextBuilder.setBlob(callerContext.getBlob());
+    }
+    return callerContextBuilder.build();
+  }
+
+  public static CallerContext convertCallerContextFromProto(CallerContextProto proto) {
+    CallerContext callerContext = CallerContext.create(proto.getContext(),
+        (proto.hasBlob() ? proto.getBlob() : null));
+    if (proto.hasCallerType() && proto.hasCallerId()) {
+      callerContext.setCallerIdAndType(proto.getCallerId(), proto.getCallerType());
+    }
+    return callerContext;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 193f7b8..d016d60 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -190,6 +190,13 @@ message ConfigurationProto {
   optional AMPluginDescriptorProto am_plugin_descriptor = 2;
 }
 
+message CallerContextProto {
+  optional string context = 1;
+  optional string callerType = 2;
+  optional string callerId = 3;
+  optional string blob = 4;
+}
+
 message DAGPlan {
   required string name = 1;
   repeated VertexPlan vertex = 2;
@@ -200,6 +207,7 @@ message DAGPlan {
   repeated PlanLocalResource local_resource = 7;
   optional string dag_info = 8;
   optional VertexExecutionContextProto default_execution_context = 9;
+  optional CallerContextProto caller_context = 10;
 }
 
 // DAG monitoring messages

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index 268267b..24c20b5 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.api;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.CallerContext;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
@@ -308,4 +309,27 @@ public class TestDAG {
     }
   }
 
+  @Test
+  public void testCallerContext() {
+    DAG dag = DAG.create("dag1");
+    try {
+      CallerContext callerContext = CallerContext.create("ctxt", "", "", "desc");
+      Assert.fail("Expected failure for invalid args");
+    } catch (Exception e) {
+      // Expected
+    }
+    try {
+      CallerContext callerContext = CallerContext.create("", "desc");
+      Assert.fail("Expected failure for invalid args");
+    } catch (Exception e) {
+      // Expected
+    }
+
+    CallerContext callerContext;
+    callerContext = CallerContext.create("ctxt", "a", "a", "desc");
+    callerContext = CallerContext.create("ctxt", "desc");
+    callerContext = CallerContext.create("ctxt", null);
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index d39c47f..c88fa67 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -168,7 +168,7 @@ public class TestTezUtils {
 
     JSONObject jsonObject = new JSONObject(confToJson);
 
-    Assert.assertFalse(jsonObject.has(ATSConstants.DESCRIPTION));
+    Assert.assertFalse(jsonObject.has(ATSConstants.DESC));
     Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
 
     JSONObject confObject = jsonObject.getJSONObject(ATSConstants.CONFIG);
@@ -178,8 +178,8 @@ public class TestTezUtils {
     confToJson = TezUtils.convertToHistoryText(desc, conf);
     jsonObject = new JSONObject(confToJson);
 
-    Assert.assertTrue(jsonObject.has(ATSConstants.DESCRIPTION));
-    String descFromJson = jsonObject.getString(ATSConstants.DESCRIPTION);
+    Assert.assertTrue(jsonObject.has(ATSConstants.DESC));
+    String descFromJson = jsonObject.getString(ATSConstants.DESC);
     Assert.assertEquals(desc, descFromJson);
 
     Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
@@ -201,7 +201,7 @@ public class TestTezUtils {
 
     JSONObject jsonObject = new JSONObject(confToJson);
 
-    Assert.assertFalse(jsonObject.has(ATSConstants.DESCRIPTION));
+    Assert.assertFalse(jsonObject.has(ATSConstants.DESC));
     Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
 
     JSONObject confObject = jsonObject.getJSONObject(ATSConstants.CONFIG);
@@ -213,8 +213,8 @@ public class TestTezUtils {
     confToJson = TezUtils.convertToHistoryText(desc, conf);
     jsonObject = new JSONObject(confToJson);
 
-    Assert.assertTrue(jsonObject.has(ATSConstants.DESCRIPTION));
-    String descFromJson = jsonObject.getString(ATSConstants.DESCRIPTION);
+    Assert.assertTrue(jsonObject.has(ATSConstants.DESC));
+    String descFromJson = jsonObject.getString(ATSConstants.DESC);
     Assert.assertEquals(desc, descFromJson);
 
     Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index e41d59c..e165397 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -63,6 +63,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
+import org.apache.tez.client.CallerContext;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.SessionNotRunning;
@@ -2303,7 +2304,14 @@ public class DAGAppMaster extends AbstractService {
       cumulativeAdditionalResources.putAll(lrDiff);
     }
 
-    LOG.info("Running DAG: " + dagPlan.getName());
+    String callerContextStr = "";
+    if (dagPlan.hasCallerContext()) {
+      CallerContext callerContext = DagTypeConverters.convertCallerContextFromProto(
+          dagPlan.getCallerContext());
+      callerContextStr = ", callerContext=" + callerContext.toString();
+    }
+    LOG.info("Running DAG: " + dagPlan.getName() + callerContextStr);
+
     String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
     System.err.println(timeStamp + " Running Dag: " + newDAG.getID());
     System.out.println(timeStamp + " Running Dag: "+ newDAG.getID());

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 649eb61..bf63045 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -483,6 +483,15 @@ public class HistoryEventJsonConversion {
     JSONObject primaryFilters = new JSONObject();
     primaryFilters.put(ATSConstants.DAG_NAME,
         event.getDAGName());
+    if (event.getDAGPlan().hasCallerContext()
+        && event.getDAGPlan().getCallerContext().hasCallerId()
+        && event.getDAGPlan().getCallerContext().hasCallerType()) {
+      primaryFilters.put(ATSConstants.CALLER_CONTEXT_ID,
+          event.getDAGPlan().getCallerContext().getCallerId());
+      primaryFilters.put(ATSConstants.CALLER_CONTEXT_TYPE,
+          event.getDAGPlan().getCallerContext().getCallerType());
+    }
+
     jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
 
     // TODO decide whether this goes into different events,
@@ -499,6 +508,14 @@ public class HistoryEventJsonConversion {
     JSONObject otherInfo = new JSONObject();
     otherInfo.put(ATSConstants.DAG_PLAN,
         DAGUtils.generateSimpleJSONPlan(event.getDAGPlan()));
+    if (event.getDAGPlan().hasCallerContext()
+        && event.getDAGPlan().getCallerContext().hasCallerId()
+        && event.getDAGPlan().getCallerContext().hasCallerType()) {
+      otherInfo.put(ATSConstants.CALLER_CONTEXT_ID,
+          event.getDAGPlan().getCallerContext().getCallerId());
+      otherInfo.put(ATSConstants.CALLER_CONTEXT_TYPE,
+          event.getDAGPlan().getCallerContext().getCallerType());
+    }
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
     return jsonObject;

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 76e592e..781120c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -49,10 +49,13 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
+import com.google.common.base.Preconditions;
+
 public class DAGUtils {
 
   public static final String DAG_NAME_KEY = "dagName";
   public static final String DAG_INFO_KEY = "dagInfo";
+  public static final String DAG_CONTEXT_KEY = "dagContext";
   public static final String VERTICES_KEY = "vertices";
   public static final String EDGES_KEY = "edges";
   public static final String VERTEX_GROUPS_KEY = "vertexGroups";
@@ -165,15 +168,34 @@ public class DAGUtils {
     return object;
   }
 
+  static Map<String, String> createDagInfoMap(DAGPlan dagPlan) {
+    Preconditions.checkArgument(dagPlan.hasCallerContext());
+    Map<String, String> dagInfo = new TreeMap<String, String>();
+    dagInfo.put(ATSConstants.CONTEXT, dagPlan.getCallerContext().getContext());
+    if (dagPlan.getCallerContext().hasCallerId()) {
+      dagInfo.put(ATSConstants.CALLER_ID, dagPlan.getCallerContext().getCallerId());
+    }
+    if (dagPlan.getCallerContext().hasCallerType()) {
+      dagInfo.put(ATSConstants.CALLER_TYPE, dagPlan.getCallerContext().getCallerType());
+    }
+    if (dagPlan.getCallerContext().hasBlob()) {
+      dagInfo.put(ATSConstants.DESCRIPTION, dagPlan.getCallerContext().getBlob());
+    }
+    return dagInfo;
+  }
+
   public static Map<String,Object> convertDAGPlanToATSMap(DAGPlan dagPlan) throws IOException {
 
     final String VERSION_KEY = "version";
-    final int version = 1;
+    final int version = 2;
     Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
     dagMap.put(DAG_NAME_KEY, dagPlan.getName());
     if (dagPlan.hasDagInfo()) {
       dagMap.put(DAG_INFO_KEY, dagPlan.getDagInfo());
     }
+    if (dagPlan.hasCallerContext()) {
+      dagMap.put(DAG_CONTEXT_KEY, createDagInfoMap(dagPlan));
+    }
     dagMap.put(VERSION_KEY, version);
     ArrayList<Object> verticesList = new ArrayList<Object>();
     for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index cb7e0c8..4d4577a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -28,6 +28,8 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.CallerContext;
+import org.apache.tez.common.ATSConstants;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -53,6 +55,7 @@ import com.google.common.collect.Sets;
 
 public class TestDAGUtils {
 
+  @SuppressWarnings("deprecation")
   private DAGPlan createDAG() {
     // Create a plan with 3 vertices: A, B, C. Group(A,B)->C
     Configuration conf = new Configuration(false);
@@ -71,6 +74,7 @@ public class TestDAGUtils {
         dummyTaskCount, dummyTaskResource);
 
     DAG dag = DAG.create("testDag");
+    dag.setCallerContext(CallerContext.create("context1", "callerId1", "callerType1", "desc1"));
     dag.setDAGInfo("dagInfo");
     String groupName1 = "uv12";
     org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
@@ -113,10 +117,17 @@ public class TestDAGUtils {
     Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY));
     Assert.assertEquals("testDag", atsMap.get(DAGUtils.DAG_NAME_KEY));
     Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_INFO_KEY));
+    Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_CONTEXT_KEY));
+    Map<String, String> contextMap = (Map<String, String>)atsMap.get(DAGUtils.DAG_CONTEXT_KEY);
+    Assert.assertEquals("context1", contextMap.get(ATSConstants.CONTEXT));
+    Assert.assertEquals("callerId1", contextMap.get(ATSConstants.CALLER_ID));
+    Assert.assertEquals("callerType1", contextMap.get(ATSConstants.CALLER_TYPE));
+    Assert.assertEquals("desc1", contextMap.get(ATSConstants.DESCRIPTION));
+
     Assert.assertEquals("dagInfo", atsMap.get(DAGUtils.DAG_INFO_KEY));
     Assert.assertEquals(dagPlan.getName(), atsMap.get(DAGUtils.DAG_NAME_KEY));
     Assert.assertTrue(atsMap.containsKey("version"));
-    Assert.assertEquals(1, atsMap.get("version"));
+    Assert.assertEquals(2, atsMap.get("version"));
     Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTICES_KEY));
     Assert.assertTrue(atsMap.containsKey(DAGUtils.EDGES_KEY));
     Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTEX_GROUPS_KEY));

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
index 5e89fdc..fff7c1b 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
@@ -20,6 +20,7 @@ package org.apache.tez.examples;
 
 import java.io.IOException;
 
+import org.apache.tez.client.CallerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index fb33612..c88c833 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -27,6 +27,8 @@ import java.util.Set;
 import com.google.common.collect.Sets;
 
 import org.apache.commons.cli.Options;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.client.CallerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -138,6 +140,14 @@ public abstract class TezExampleBase extends Configured implements Tool {
   public int runDag(DAG dag, boolean printCounters, Logger logger) throws TezException,
       InterruptedException, IOException {
     tezClientInternal.waitTillReady();
+
+    CallerContext callerContext = CallerContext.create("TezExamples",
+        "Tez Example DAG: " + dag.getName());
+    ApplicationId appId = tezClientInternal.getAppMasterApplicationId();
+    if (appId != null) {
+      callerContext.setCallerIdAndType(appId.toString(), "TezExampleApplication");
+    }
+
     DAGClient dagClient = tezClientInternal.submitDAG(dag);
     Set<StatusGetOpts> getOpts = Sets.newHashSet();
     if (printCounters) {

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
index 3a24f15..dce79e2 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
@@ -50,10 +50,15 @@ public class Constants extends ATSConstants {
   public static final String INITIALIZER = "initializer";
   public static final String USER_PAYLOAD_TEXT = "userPayloadAsText";
 
+  public static final String DAG_CONTEXT = "dagContext";
+
   //constants for ATS data export
   public static final String DAG = "dag";
   public static final String VERTICES = "vertices";
   public static final String TASKS = "tasks";
   public static final String TASK_ATTEMPTS = "task_attempts";
   public static final String APPLICATION = "application";
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
index 5ea94d6..5fb760c 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
@@ -33,6 +33,7 @@ import org.apache.commons.collections.bidimap.DualHashBidiMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringInterner;
+import org.apache.tez.client.CallerContext;
 import org.apache.tez.dag.api.event.VertexState;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
@@ -65,6 +66,7 @@ public class DagInfo extends BaseInfo {
   private final String status;
   private final String diagnostics;
   private VersionInfo versionInfo;
+  private CallerContext callerContext;
 
   //VertexID --> VertexName & vice versa
   private final BidiMap vertexNameIDMapping;
@@ -135,10 +137,34 @@ public class DagInfo extends BaseInfo {
   }
 
   private void parseDAGPlan(JSONObject dagPlan) throws JSONException {
+    int version = dagPlan.optInt(Constants.VERSION, 1);
     parseEdges(dagPlan.optJSONArray(Constants.EDGES));
 
     JSONArray verticesInfo = dagPlan.optJSONArray(Constants.VERTICES);
     parseBasicVertexInfo(verticesInfo);
+
+    if (version > 1) {
+      parseDAGContext(dagPlan.optJSONObject(Constants.DAG_CONTEXT));
+    }
+  }
+
+  private void parseDAGContext(JSONObject callerContextInfo) {
+    if (callerContextInfo == null) {
+      LOG.info("No DAG Caller Context available");
+      return;
+    }
+    String context = callerContextInfo.optString(Constants.CONTEXT);
+    String callerId = callerContextInfo.optString(Constants.CALLER_ID);
+    String callerType = callerContextInfo.optString(Constants.CALLER_TYPE);
+    String description = callerContextInfo.optString(Constants.DESCRIPTION);
+
+    this.callerContext = CallerContext.create(context, description);
+    if (callerId != null && !callerId.isEmpty() && callerType != null && !callerType.isEmpty()) {
+      this.callerContext.setCallerIdAndType(callerId, callerType);
+    } else {
+      LOG.info("No DAG Caller Context Id and Type available");
+    }
+
   }
 
   private void parseBasicVertexInfo(JSONArray verticesInfo) throws JSONException {
@@ -329,6 +355,10 @@ public class DagInfo extends BaseInfo {
     return versionInfo;
   }
 
+  public final CallerContext getCallerContext() {
+    return callerContext;
+  }
+
   public final String getName() {
     return name;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
index a1b0ba6..dedd9ef 100644
--- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.CallerContext;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TaskCounter;
@@ -645,6 +647,16 @@ public class TestHistoryParser {
         Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty()));
 
     TezClient tezClient = getTezClient(withTimeline);
+
+    // Update Caller Context
+    CallerContext callerContext = CallerContext.create("TezExamples", "Tez WordCount Example Job");
+    ApplicationId appId = tezClient.getAppMasterApplicationId();
+    if (appId == null) {
+      appId = ApplicationId.newInstance(1001l, 1);
+    }
+    callerContext.setCallerIdAndType(appId.toString(), "TezApplication");
+    dag.setCallerContext(callerContext);
+
     DAGClient client = tezClient.submitDAG(dag);
     client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
     TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), 1);
@@ -690,6 +702,12 @@ public class TestHistoryParser {
     assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime());
     assertTrue(dagInfo.getTimeTaken() > 0);
 
+    assertNotNull(dagInfo.getCallerContext());
+    assertEquals("TezExamples", dagInfo.getCallerContext().getContext());
+    assertEquals("Tez WordCount Example Job", dagInfo.getCallerContext().getBlob());
+    assertNotNull(dagInfo.getCallerContext().getCallerId());
+    assertEquals("TezApplication", dagInfo.getCallerContext().getCallerType());
+
     //Verify all vertices
     for (VertexInfo vertexInfo : dagInfo.getVertices()) {
       verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0);

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 77b00c4..0d6cbcb 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -248,7 +248,7 @@ public class HistoryEventTimelineConversion {
         event.getApplicationAttemptId().getApplicationId().toString());
 
     atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID,
-            event.getContainerId().toString());
+        event.getContainerId().toString());
     atsEntity.setStartTime(event.getLaunchTime());
 
     TimelineEvent launchEvt = new TimelineEvent();
@@ -391,6 +391,15 @@ public class HistoryEventTimelineConversion {
     atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
         event.getDagID().getApplicationId().toString());
 
+    if (event.getDAGPlan().hasCallerContext()
+        && event.getDAGPlan().getCallerContext().hasCallerId()
+        && event.getDAGPlan().getCallerContext().hasCallerType()) {
+      atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_ID,
+          event.getDAGPlan().getCallerContext().getCallerId());
+      atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_TYPE,
+          event.getDAGPlan().getCallerContext().getCallerType());
+    }
+
     try {
       atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
           DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
@@ -405,6 +414,14 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION);
     atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL + "_"
         + event.getApplicationAttemptId().getAttemptId(), event.getContainerLogs());
+    if (event.getDAGPlan().hasCallerContext()
+        && event.getDAGPlan().getCallerContext().hasCallerId()
+        && event.getDAGPlan().getCallerContext().hasCallerType()) {
+      atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_ID,
+          event.getDAGPlan().getCallerContext().getCallerId());
+      atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_TYPE,
+          event.getDAGPlan().getCallerContext().getCallerType());
+    }
 
     return atsEntity;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 4245be3..0ad1b43 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.client.CallerContext;
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.VersionInfo;
 import org.apache.tez.common.counters.TezCounters;
@@ -43,6 +44,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.VertexState;
@@ -112,7 +114,13 @@ public class TestHistoryEventTimelineConversion {
     tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
     tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
     tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
-    dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+    CallerContextProto.Builder callerContextProto = CallerContextProto.newBuilder();
+    callerContextProto.setContext("ctxt");
+    callerContextProto.setCallerId("Caller_ID");
+    callerContextProto.setCallerType("Caller_Type");
+    callerContextProto.setBlob("Desc_1");
+    dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock")
+        .setCallerContext(callerContextProto).build();
     containerId = ContainerId.newInstance(applicationAttemptId, 111);
     nodeId = NodeId.newInstance("node", 13435);
   }
@@ -425,18 +433,24 @@ public class TestHistoryEventTimelineConversion {
 
     Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue());
 
-    Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+    Assert.assertEquals(5, timelineEntity.getPrimaryFilters().size());
 
     Assert.assertTrue(
         timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(
             dagPlan.getName()));
     Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.CALLER_CONTEXT_ID).contains(
+            dagPlan.getCallerContext().getCallerId()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.CALLER_CONTEXT_TYPE).contains(
+            dagPlan.getCallerContext().getCallerType()));
+    Assert.assertTrue(
         timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
             applicationAttemptId.getApplicationId().toString()));
     Assert.assertTrue(
         timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
 
-    Assert.assertEquals(6, timelineEntity.getOtherInfo().size());
+    Assert.assertEquals(8, timelineEntity.getOtherInfo().size());
     Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN));
     Assert.assertEquals(applicationId.toString(),
         timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
@@ -451,6 +465,14 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals(containerLogs,
         timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL + "_"
             + applicationAttemptId.getAttemptId()));
+    Assert.assertEquals(
+        timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_ID),
+            dagPlan.getCallerContext().getCallerId());
+    Assert.assertEquals(
+        timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE),
+            dagPlan.getCallerContext().getCallerType());
+
+
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index eb56795..6966e8d 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.CallerContext;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.TezUtils;
@@ -276,7 +277,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
     vertices.add(finalReduceVertex);
 
     DAG dag = DAG.create("OrderedWordCount" + dagIndex);
-    dag.setDAGInfo("{ \"context\": \"Tez\", \"description\": \"TestOrderedWordCount Job\" }");
+    dag.setCallerContext(CallerContext.create("Tez", "TestOrderedWordCount Job"));
     for (int i = 0; i < vertices.size(); ++i) {
       dag.addVertex(vertices.get(i));
     }