You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 19:47:47 UTC

[06/50] [abbrv] tez git commit: TEZ-3267. Publish queue name to ATS as part of dag summary. Contributed by Harish Jaiprakash.

TEZ-3267. Publish queue name to ATS as part of dag summary. Contributed by Harish Jaiprakash.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/16b93de8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/16b93de8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/16b93de8

Branch: refs/heads/TEZ-1190
Commit: 16b93de8f31a815cab63e0be0dc563549a688566
Parents: 11815a7
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 8 18:32:26 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Feb 8 18:32:26 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/common/ATSConstants.java     |  1 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 11 ++++++++-
 .../dag/history/events/DAGSubmittedEvent.java   | 17 +++++++++++--
 .../impl/HistoryEventJsonConversion.java        |  3 +++
 tez-dag/src/main/proto/HistoryEvents.proto      |  1 +
 .../apache/tez/dag/app/TestRecoveryParser.java  | 26 ++++++++++----------
 .../dag/history/TestHistoryEventHandler.java    |  2 +-
 .../TestHistoryEventsProtoConversion.java       |  4 ++-
 .../impl/TestHistoryEventJsonConversion.java    | 12 +++++++--
 .../history/recovery/TestRecoveryService.java   |  2 +-
 .../ats/acls/TestATSHistoryWithACLs.java        |  4 +--
 .../ats/TestATSV15HistoryLoggingService.java    |  2 +-
 .../ats/HistoryEventTimelineConversion.java     |  3 +++
 .../ats/TestATSHistoryLoggingService.java       |  2 +-
 .../ats/TestHistoryEventTimelineConversion.java | 10 +++++---
 16 files changed, 72 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0949339..215cb08 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3267. Publish queue name to ATS as part of dag summary.
   TEZ-3609. Improve ATSv15 performance for DAG entities read calls.
   TEZ-3244. Allow overlap of input and output memory when they are not concurrent
   TEZ-3581. Add different logger to enable suppressing logs for specific lines.

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index c56582c..03c9fa1 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -55,6 +55,7 @@ public class ATSConstants {
   public static final String DAG_PLAN = "dagPlan";
   public static final String DAG_NAME = "dagName";
   public static final String DAG_STATE = "dagState";
+  public static final String DAG_SUBMITTED_QUEUE_NAME = "submittedQueueName";
   public static final String DAG_AM_WEB_SERVICE_VERSION = "amWebServiceVersion";
   public static final String RECOVERY_FAILURE_REASON = "recoveryFailureReason";
   public static final String VERTEX_NAME = "vertexName";

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index eaaf18b..7f27064 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -2578,7 +2578,7 @@ public class DAGAppMaster extends AbstractService {
     // for an app later
     final DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
         submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources,
-        newDAG.getUserName(), newDAG.getConf(), containerLogs);
+        newDAG.getUserName(), newDAG.getConf(), containerLogs, getSubmittedQueueName());
     boolean dagLoggingEnabled = newDAG.getConf().getBoolean(
         TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED,
         TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT);
@@ -2671,6 +2671,15 @@ public class DAGAppMaster extends AbstractService {
     });
   }
 
+  private String getSubmittedQueueName() {
+    // TODO: Replace this with constant once the yarn patch is backported. (JIRA: TEZ-3279)
+    String submittedQueueName = System.getenv("YARN_RESOURCEMANAGER_APPLICATION_QUEUE");
+    if (submittedQueueName == null) {
+      submittedQueueName = amConf.get(TezConfiguration.TEZ_QUEUE_NAME);
+    }
+    return submittedQueueName;
+  }
+
   @SuppressWarnings("unchecked")
   private void sendEvent(Event<?> event) {
     dispatcher.getEventHandler().handle(event);

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 07d7c07..1b1fdf3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -57,6 +57,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   private boolean historyLoggingEnabled = true;
   private Configuration conf;
   private String containerLogs;
+  private String queueName;
 
   public DAGSubmittedEvent() {
   }
@@ -64,7 +65,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
       DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId,
       Map<String, LocalResource> cumulativeAdditionalLocalResources,
-      String user, Configuration conf, String containerLogs) {
+      String user, Configuration conf, String containerLogs, String queueName) {
     this.dagID = dagID;
     this.dagName = dagPlan.getName();
     this.submitTime = submitTime;
@@ -74,6 +75,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
     this.user = user;
     this.conf = conf;
     this.containerLogs = containerLogs;
+    this.queueName = queueName;
   }
 
   @Override
@@ -97,6 +99,9 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
         .setApplicationAttemptId(applicationAttemptId.toString())
         .setDagPlan(dagPlan)
         .setSubmitTime(submitTime);
+    if (queueName != null) {
+      builder.setQueueName(queueName);
+    }
     if (cumulativeAdditionalLocalResources != null && !cumulativeAdditionalLocalResources.isEmpty()) {
       builder.setCumulativeAdditionalAmResources(DagTypeConverters
           .convertFromLocalResources(cumulativeAdditionalLocalResources));
@@ -111,6 +116,9 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
     this.submitTime = proto.getSubmitTime();
     this.applicationAttemptId = ConverterUtils.toApplicationAttemptId(
         proto.getApplicationAttemptId());
+    if (proto.hasQueueName()) {
+      this.queueName = proto.getQueueName();
+    }
     if (proto.hasCumulativeAdditionalAmResources()) {
       this.cumulativeAdditionalLocalResources = DagTypeConverters.convertFromPlanLocalResources(proto
           .getCumulativeAdditionalAmResources());
@@ -134,7 +142,8 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   @Override
   public String toString() {
     return "dagID=" + dagID
-        + ", submitTime=" + submitTime;
+        + ", submitTime=" + submitTime
+        + ", queueName=" + queueName;
   }
 
   @Override
@@ -203,4 +212,8 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   public String getContainerLogs() {
     return containerLogs;
   }
+
+  public String getQueueName() {
+    return queueName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index a767fbf..69c40e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -515,6 +515,9 @@ public class HistoryEventJsonConversion {
       otherInfo.put(ATSConstants.CALLER_CONTEXT_TYPE,
           event.getDAGPlan().getCallerContext().getCallerType());
     }
+    if (event.getQueueName() != null) {
+      otherInfo.put(ATSConstants.DAG_SUBMITTED_QUEUE_NAME, event.getQueueName());
+    }
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
     return jsonObject;

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index ff3707d..7671469 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -54,6 +54,7 @@ message DAGSubmittedProto {
   optional int64 submit_time = 3;
   optional string application_attempt_id = 4;
   optional PlanLocalResourcesProto cumulative_additional_am_resources = 5;
+  optional string queue_name = 6;
 }
 
 message DAGInitializedProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index f4edf9e..6673b39 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -169,7 +169,7 @@ public class TestRecoveryParser {
     rService.start();
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null)));
+            null, "user", new Configuration(), null, null)));
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null)));
     // only for testing, DAGCommitStartedEvent is not supposed to happen at this time.
@@ -215,7 +215,7 @@ public class TestRecoveryParser {
     rService.start();
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null)));
+            null, "user", new Configuration(), null, null)));
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null)));
     rService.handle(new DAGHistoryEvent(dagID,
@@ -264,7 +264,7 @@ public class TestRecoveryParser {
     rService.start();
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null)));
+            null, "user", new Configuration(), null, null)));
     // wait until DAGSubmittedEvent is handled in the RecoveryEventHandling thread
     rService.await();
     rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA");
@@ -310,7 +310,7 @@ public class TestRecoveryParser {
     // write a DAGSubmittedEvent first to initialize summaryStream
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null)));
+            null, "user", new Configuration(), null, null)));
     // write an corrupted SummaryEvent
     rService.summaryStream.writeChars("INVALID_DATA");
     rService.stop();
@@ -344,7 +344,7 @@ public class TestRecoveryParser {
     // write a DAGSubmittedEvent first to initialize summaryStream
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null)));
+            null, "user", new Configuration(), null, null)));
     // It should be fine to skip other events, just for testing.
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGCommitStartedEvent(dagID, 0L)));
@@ -376,7 +376,7 @@ public class TestRecoveryParser {
     // write a DAGSubmittedEvent first to initialize summaryStream
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null)));
+            null, "user", new Configuration(), null, null)));
     // It should be fine to skip other events, just for testing.
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGCommitStartedEvent(dagID, 0L)));
@@ -412,7 +412,7 @@ public class TestRecoveryParser {
     // write a DAGSubmittedEvent first to initialize summaryStream
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null)));
+            null, "user", new Configuration(), null, null)));
     // It should be fine to skip other events, just for testing.
     rService.handle(new DAGHistoryEvent(dagID,
         new VertexCommitStartedEvent(TezVertexID.getInstance(dagID, 0), 0L)));
@@ -445,7 +445,7 @@ public class TestRecoveryParser {
     // write a DAGSubmittedEvent first to initialize summaryStream
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null)));
+            null, "user", new Configuration(), null, null)));
     // It should be fine to skip other events, just for testing.
     TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
     rService.handle(new DAGHistoryEvent(dagID,
@@ -482,7 +482,7 @@ public class TestRecoveryParser {
     // write a DAGSubmittedEvent first to initialize summaryStream
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null)));
+            null, "user", new Configuration(), null, null)));
     // It should be fine to skip other events, just for testing.
     rService.handle(new DAGHistoryEvent(dagID,
         new VertexGroupCommitStartedEvent(dagID, "group_1", 
@@ -516,7 +516,7 @@ public class TestRecoveryParser {
     // write a DAGSubmittedEvent first to initialize summaryStream
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null)));
+            null, "user", new Configuration(), null, null)));
     // It should be fine to skip other events, just for testing.
     TezVertexID v0 = TezVertexID.getInstance(dagID, 0);
     TezVertexID v1 = TezVertexID.getInstance(dagID, 1);
@@ -565,7 +565,7 @@ public class TestRecoveryParser {
     // write a DAGSubmittedEvent first to initialize summaryStream
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null)));
+            null, "user", new Configuration(), null, null)));
     // It should be fine to skip other events, just for testing.
     TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
     rService.handle(new DAGHistoryEvent(dagID,
@@ -601,7 +601,7 @@ public class TestRecoveryParser {
     // write a DAGSubmittedEvent first to initialize summaryStream
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null)));
+            null, "user", new Configuration(), null, null)));
     // It should be fine to skip other events, just for testing.
     TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
     rService.handle(new DAGHistoryEvent(dagID,
@@ -640,7 +640,7 @@ public class TestRecoveryParser {
     // DAG  DAGSubmittedEvent -> DAGInitializedEvent -> DAGStartedEvent
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null)));
+            null, "user", new Configuration(), null, null)));
     DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagID, 100L, 
         "user", "dagName", null);
     DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagID, 0L, "user", "dagName");

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
index 4c0fe3f..5a71a42 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
@@ -219,7 +219,7 @@ public class TestHistoryEventHandler {
         new AMStartedEvent(attemptId, time, user)));
     historyEvents.add(new DAGHistoryEvent(dagId,
         new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user,
-            conf, null)));
+            conf, null, "default")));
     TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
     historyEvents.add(new DAGHistoryEvent(dagId,
         new VertexStartedEvent(vertexID, time, time)));

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 3d29a5d..47d8389 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -171,12 +171,13 @@ public class TestHistoryEventsProtoConversion {
     logEvents(event, deserializedEvent);
   }
 
+  private final String QUEUE_NAME = "TEST_QUEUE_NAME";
   private void testDAGSubmittedEvent() throws Exception {
     DAGSubmittedEvent event = new DAGSubmittedEvent(TezDAGID.getInstance(
         ApplicationId.newInstance(0, 1), 1), 1001l,
         DAGPlan.newBuilder().setName("foo").build(),
         ApplicationAttemptId.newInstance(
-            ApplicationId.newInstance(0, 1), 1), null, "", null, null);
+            ApplicationId.newInstance(0, 1), 1), null, "", null, null, QUEUE_NAME);
     DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getApplicationAttemptId(),
@@ -189,6 +190,7 @@ public class TestHistoryEventsProtoConversion {
         deserializedEvent.getSubmitTime());
     Assert.assertEquals(event.getDAGPlan(),
         deserializedEvent.getDAGPlan());
+    Assert.assertEquals(event.getQueueName(), deserializedEvent.getQueueName());
     logEvents(event, deserializedEvent);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index 9477118..1bbecd3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -124,7 +124,7 @@ public class TestHistoryEventJsonConversion {
           break;
         case DAG_SUBMITTED:
           event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
-              null, user, null, null);
+              null, user, null, null, "Q_" + eventType.name());
           break;
         case DAG_INITIALIZED:
           event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
@@ -200,7 +200,15 @@ public class TestHistoryEventJsonConversion {
       if (event == null || !event.isHistoryEvent()) {
         continue;
       }
-      HistoryEventJsonConversion.convertToJson(event);
+      JSONObject json = HistoryEventJsonConversion.convertToJson(event);
+      if (eventType == HistoryEventType.DAG_SUBMITTED) {
+        try {
+          Assert.assertEquals("Q_" + eventType.name(), json.getJSONObject(ATSConstants.OTHER_INFO)
+              .getString(ATSConstants.DAG_SUBMITTED_QUEUE_NAME));
+        } catch (JSONException ex) {
+          Assert.fail("Exception: " + ex.getMessage() + " for type: " + eventType);
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
index 3dec1d7..790e2d8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
@@ -333,7 +333,7 @@ public class TestRecoveryService {
     DAGPlan dagPlan = DAGPlan.newBuilder().setName("test_dag").build();
     // This writes to recovery immediately.
     recoveryService.handle(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(
-        dagId, startTime, dagPlan, appAttemptId, null, "nobody", conf, null)));
+        dagId, startTime, dagPlan, appAttemptId, null, "nobody", conf, null, "default")));
     waitForDrain(-1);
     verify(summaryFos, times(1)).hflush();
     verify(dagFos, times(1)).hflush();

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
index 6b3ebd7..8e5c95c 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
@@ -401,7 +401,7 @@ public class TestATSHistoryWithACLs {
     DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
     DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID,
           1, dagPlan, appAttemptId, null,
-          "usr", tezConf, null);
+          "usr", tezConf, null, null);
     submittedEvent.setHistoryLoggingEnabled(false);
     DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent);
     historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent));
@@ -446,7 +446,7 @@ public class TestATSHistoryWithACLs {
     DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
     DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID,
             1, dagPlan, appAttemptId, null,
-            "usr", tezConf, null);
+            "usr", tezConf, null, null);
     submittedEvent.setHistoryLoggingEnabled(true);
     DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent);
     historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent));

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
index 9111195..cbded35 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
@@ -450,7 +450,7 @@ public class TestATSV15HistoryLoggingService {
         new AMStartedEvent(attemptId, time, user)));
     historyEvents.add(new DAGHistoryEvent(dagId,
         new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user,
-            conf, null)));
+            conf, null, "default")));
     TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
     historyEvents.add(new DAGHistoryEvent(dagId,
         new VertexStartedEvent(vertexID, time, time)));

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 96239c3..8d0c547 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -423,6 +423,9 @@ public class HistoryEventTimelineConversion {
       atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_TYPE,
           event.getDAGPlan().getCallerContext().getCallerType());
     }
+    if (event.getQueueName() != null) {
+      atsEntity.addOtherInfo(ATSConstants.DAG_SUBMITTED_QUEUE_NAME, event.getQueueName());
+    }
 
     return atsEntity;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
index da57eb2..a641cda 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -444,7 +444,7 @@ public class TestATSHistoryLoggingService {
     Configuration conf = new Configuration(service.getConfig());
     historyEvents.add(new DAGHistoryEvent(null, new AMStartedEvent(attemptId, time, "user")));
     historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time,
-        DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null)));
+        DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null, "default")));
     TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
     historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time)));
     TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);

http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 62fb335..bb189d3 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -145,7 +145,7 @@ public class TestHistoryEventTimelineConversion {
           break;
         case DAG_SUBMITTED:
           event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
-              null, user, null, containerLogs);
+              null, user, null, containerLogs, null);
           break;
         case DAG_INITIALIZED:
           event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
@@ -473,8 +473,9 @@ public class TestHistoryEventTimelineConversion {
   public void testConvertDAGSubmittedEvent() {
     long submitTime = random.nextLong();
 
+    final String queueName = "TEST_DAG_SUBMITTED";
     DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan,
-        applicationAttemptId, null, user, null, containerLogs);
+        applicationAttemptId, null, user, null, containerLogs, queueName);
 
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
@@ -509,7 +510,7 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertTrue(
         timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
 
-    Assert.assertEquals(8, timelineEntity.getOtherInfo().size());
+    Assert.assertEquals(9, timelineEntity.getOtherInfo().size());
     Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN));
     Assert.assertEquals(applicationId.toString(),
         timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
@@ -530,7 +531,8 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals(
         timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE),
             dagPlan.getCallerContext().getCallerType());
-
+    Assert.assertEquals(
+        queueName, timelineEntity.getOtherInfo().get(ATSConstants.DAG_SUBMITTED_QUEUE_NAME));
 
   }