You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ha...@apache.org on 2017/04/20 10:12:38 UTC

tez git commit: TEZ-3611. Create lightweight summary events for ATS. (harishjp)

Repository: tez
Updated Branches:
  refs/heads/master 59f56a540 -> e9d0b1b26


TEZ-3611. Create lightweight summary events for ATS. (harishjp)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e9d0b1b2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e9d0b1b2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e9d0b1b2

Branch: refs/heads/master
Commit: e9d0b1b266f93a15029159b2da94c5d438d7408b
Parents: 59f56a5
Author: Harish JP <ha...@gmail.com>
Authored: Thu Apr 20 15:42:16 2017 +0530
Committer: Harish JP <ha...@gmail.com>
Committed: Thu Apr 20 15:42:16 2017 +0530

----------------------------------------------------------------------
 .../org/apache/tez/common/ATSConstants.java     |   1 +
 .../tez/dag/history/logging/EntityTypes.java    |   1 +
 .../org/apache/tez/history/ATSImportTool.java   |  19 ++-
 .../logging/ats/TimelineCachePluginImpl.java    |   4 +-
 .../ats/TestTimelineCachePluginImpl.java        |   2 +
 .../ats/ATSV15HistoryLoggingService.java        |  11 +-
 .../ats/TestATSV15HistoryLoggingService.java    |  10 +-
 .../logging/ats/ATSHistoryLoggingService.java   |   8 +-
 .../ats/HistoryEventTimelineConversion.java     | 150 ++++++++++-------
 .../ats/TestATSHistoryLoggingService.java       |   8 +-
 .../ats/TestHistoryEventTimelineConversion.java | 168 ++++++++++++++++---
 11 files changed, 282 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 25c802e..6e07849 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -47,6 +47,7 @@ public class ATSConstants {
   public static final String USER = "user";
   public static final String CALLER_CONTEXT_ID = "callerId";
   public static final String CALLER_CONTEXT_TYPE = "callerType";
+  public static final String CALLER_CONTEXT = "callerContext";
 
   /* Keys used in other info */
   public static final String APP_SUBMIT_TIME = "appSubmitTime";

http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java b/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
index e2f0882..6f6205d 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
@@ -23,6 +23,7 @@ public enum EntityTypes {
   TEZ_APPLICATION_ATTEMPT,
   TEZ_CONTAINER_ID,
   TEZ_DAG_ID,
+  TEZ_DAG_EXTRA_INFO,
   TEZ_VERTEX_ID,
   TEZ_TASK_ID,
   TEZ_TASK_ATTEMPT_ID,

http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
index 3efeb5a..fee226a 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
@@ -35,7 +35,6 @@ import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.MissingOptionException;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -50,6 +49,7 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.history.parser.datamodel.Constants;
 import org.apache.tez.history.parser.utils.Utils;
@@ -68,6 +68,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLEncoder;
+import java.util.Iterator;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
@@ -191,6 +192,22 @@ public class ATSImportTool extends Configured implements Tool {
     //Download dag
     String dagUrl = String.format("%s/%s/%s", baseUri, Constants.TEZ_DAG_ID, dagId);
     JSONObject dagRoot = getJsonRootEntity(dagUrl);
+
+    // We have added dag extra info, if we find any from ATS we copy the info into dag object
+    // extra info.
+    String dagExtraInfoUrl = String.format("%s/%s/%s", baseUri, EntityTypes.TEZ_DAG_EXTRA_INFO,
+        dagId);
+    JSONObject dagExtraInfo = getJsonRootEntity(dagExtraInfoUrl);
+    if (dagExtraInfo.has(Constants.OTHER_INFO)) {
+      JSONObject dagOtherInfo = dagRoot.getJSONObject(Constants.OTHER_INFO);
+      JSONObject extraOtherInfo = dagExtraInfo.getJSONObject(Constants.OTHER_INFO);
+      @SuppressWarnings("unchecked")
+      Iterator<String> iter = extraOtherInfo.keys();
+      while (iter.hasNext()) {
+        String key = iter.next();
+        dagOtherInfo.put(key, extraOtherInfo.get(key));
+      }
+    }
     finalJson.put(Constants.DAG, dagRoot);
 
     //Create a zip entry with dagId as its name.

http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
index 8269714..d211feb 100644
--- a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
+++ b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
@@ -50,6 +50,7 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin implement
   static {
     knownEntityTypes = Sets.newHashSet(
         EntityTypes.TEZ_DAG_ID.name(),
+        EntityTypes.TEZ_DAG_EXTRA_INFO.name(),
         EntityTypes.TEZ_VERTEX_ID.name(),
         EntityTypes.TEZ_TASK_ID.name(),
         EntityTypes.TEZ_TASK_ATTEMPT_ID.name(),
@@ -84,7 +85,8 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin implement
         || entityId == null || entityId.isEmpty()) {
       return null;
     }
-    if (entityType.equals(EntityTypes.TEZ_DAG_ID.name())) {
+    if (entityType.equals(EntityTypes.TEZ_DAG_ID.name()) ||
+        entityType.equals(EntityTypes.TEZ_DAG_EXTRA_INFO.name())) {
       TezDAGID dagId = TezDAGID.fromString(entityId);
       if (dagId != null) {
         return createTimelineEntityGroupIds(dagId);

http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
index 3d1af63..1bfa0a1 100644
--- a/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
+++ b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
@@ -101,12 +101,14 @@ public class TestTimelineCachePluginImpl {
 
     typeIdMap1 = new HashMap<String, String>();
     typeIdMap1.put(EntityTypes.TEZ_DAG_ID.name(), dagID1.toString());
+    typeIdMap1.put(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), dagID1.toString());
     typeIdMap1.put(EntityTypes.TEZ_VERTEX_ID.name(), vertexID1.toString());
     typeIdMap1.put(EntityTypes.TEZ_TASK_ID.name(), taskID1.toString());
     typeIdMap1.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), attemptID1.toString());
 
     typeIdMap2 = new HashMap<String, String>();
     typeIdMap2.put(EntityTypes.TEZ_DAG_ID.name(), dagID2.toString());
+    typeIdMap2.put(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), dagID2.toString());
     typeIdMap2.put(EntityTypes.TEZ_VERTEX_ID.name(), vertexID2.toString());
     typeIdMap2.put(EntityTypes.TEZ_TASK_ID.name(), taskID2.toString());
     typeIdMap2.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), attemptID2.toString());

http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
index a095cbc..a71f0d8 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.logging.ats;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -375,16 +376,20 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
     if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) {
       return;
     }
-
-    TimelineEntity entity = HistoryEventTimelineConversion.convertToTimelineEntity(
+    TimelineEntityGroupId groupId = getGroupId(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(
         event.getHistoryEvent());
+    for (TimelineEntity entity : entities) {
+      logEntity(groupId, entity, domainId);
+    }
+  }
 
+  private void logEntity(TimelineEntityGroupId groupId, TimelineEntity entity, String domainId) {
     if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) {
       historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
     }
 
     try {
-      TimelineEntityGroupId groupId = getGroupId(event);
       TimelinePutResponse response = timelineClient.putEntities(
           appContext.getApplicationAttemptId(), groupId, entity);
       if (response != null

http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
index ef5da81..96c3c80 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
@@ -111,7 +111,7 @@ public class TestATSV15HistoryLoggingService {
     List<TimelineEntity> nonGroupedDagEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
     assertNotNull(nonGroupedDagEvents);
-    assertEquals(4, nonGroupedDagEvents.size());
+    assertEquals(5, nonGroupedDagEvents.size());
 
     service.stop();
   }
@@ -139,7 +139,7 @@ public class TestATSV15HistoryLoggingService {
     List<TimelineEntity> nonGroupedDagEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
     assertNotNull(nonGroupedDagEvents);
-    assertEquals(4, nonGroupedDagEvents.size());
+    assertEquals(5, nonGroupedDagEvents.size());
 
     service.stop();
   }
@@ -185,7 +185,7 @@ public class TestATSV15HistoryLoggingService {
     List<TimelineEntity> groupedDagEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, dagId1.getGroupId(numDagsPerGroup)));
     assertNotNull(groupedDagEvents);
-    assertEquals(8, groupedDagEvents.size());
+    assertEquals(10, groupedDagEvents.size());
 
     nonGroupedDagEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, dagId3.toString()));
@@ -194,7 +194,7 @@ public class TestATSV15HistoryLoggingService {
     groupedDagEvents = entityLog.get(
         TimelineEntityGroupId.newInstance(appId, dagId3.getGroupId(numDagsPerGroup)));
     assertNotNull(groupedDagEvents);
-    assertEquals(4, groupedDagEvents.size());
+    assertEquals(5, groupedDagEvents.size());
 
     service.stop();
   }
@@ -338,7 +338,7 @@ public class TestATSV15HistoryLoggingService {
 
     // calls were made with correct domain ids.
     verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("session-id"));
-    verify(historyACLPolicyManager, times(4)).updateTimelineEntityDomain(any(), eq("dag-id"));
+    verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("dag-id"));
 
     service.stop();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index dc215fd..6d035cc 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -336,11 +336,13 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
       if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) {
         continue;
       }
-      TimelineEntity entity = HistoryEventTimelineConversion.convertToTimelineEntity(
+      List<TimelineEntity> eventEntities = HistoryEventTimelineConversion.convertToTimelineEntities(
           event.getHistoryEvent());
-      entities.add(entity);
+      entities.addAll(eventEntities);
       if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) {
-        historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
+        for (TimelineEntity entity: eventEntities) {
+          historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index faccc98..235a292 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -19,7 +19,9 @@
 package org.apache.tez.dag.history.logging.ats;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
@@ -31,6 +33,7 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
 import org.apache.tez.dag.app.web.AMWebController;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -56,82 +59,82 @@ import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezVertexID;
 
+import com.google.common.collect.Lists;
+
 public class HistoryEventTimelineConversion {
 
-  public static TimelineEntity convertToTimelineEntity(HistoryEvent historyEvent) {
-    if (!historyEvent.isHistoryEvent()) {
+  private static void validateEvent(HistoryEvent event) {
+    if (!event.isHistoryEvent()) {
       throw new UnsupportedOperationException("Invalid Event, does not support history"
-          + ", eventType=" + historyEvent.getEventType());
+          + ", eventType=" + event.getEventType());
     }
-    TimelineEntity timelineEntity;
+  }
+
+  public static List<TimelineEntity> convertToTimelineEntities(HistoryEvent historyEvent) {
+    validateEvent(historyEvent);
     switch (historyEvent.getEventType()) {
       case APP_LAUNCHED:
-        timelineEntity = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent);
-        break;
+        return Collections.singletonList(convertAppLaunchedEvent((AppLaunchedEvent) historyEvent));
       case AM_LAUNCHED:
-        timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
-        break;
+        return Collections.singletonList(convertAMLaunchedEvent((AMLaunchedEvent) historyEvent));
       case AM_STARTED:
-        timelineEntity = convertAMStartedEvent((AMStartedEvent) historyEvent);
-        break;
+        return Collections.singletonList(convertAMStartedEvent((AMStartedEvent) historyEvent));
       case CONTAINER_LAUNCHED:
-        timelineEntity = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
-        break;
+        return Collections.singletonList(
+            convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent));
       case CONTAINER_STOPPED:
-        timelineEntity = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
-        break;
+        return Collections.singletonList(
+            convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent));
       case DAG_SUBMITTED:
-        timelineEntity = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
-        break;
+        return Lists.newArrayList(
+            convertDAGSubmittedToDAGExtraInfoEntity((DAGSubmittedEvent)historyEvent),
+            convertDAGSubmittedEvent((DAGSubmittedEvent)historyEvent));
       case DAG_INITIALIZED:
-        timelineEntity = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
-        break;
+        return Collections.singletonList(
+            convertDAGInitializedEvent((DAGInitializedEvent) historyEvent));
       case DAG_STARTED:
-        timelineEntity = convertDAGStartedEvent((DAGStartedEvent) historyEvent);
-        break;
+        return Collections.singletonList(convertDAGStartedEvent((DAGStartedEvent) historyEvent));
       case DAG_FINISHED:
-        timelineEntity = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
-        break;
+        return Lists.newArrayList(
+            convertDAGFinishedToDAGExtraInfoEntity((DAGFinishedEvent) historyEvent),
+            convertDAGFinishedEvent((DAGFinishedEvent) historyEvent));
       case VERTEX_INITIALIZED:
-        timelineEntity = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
-        break;
+        return Collections.singletonList(
+            convertVertexInitializedEvent((VertexInitializedEvent) historyEvent));
       case VERTEX_STARTED:
-        timelineEntity = convertVertexStartedEvent((VertexStartedEvent) historyEvent);
-        break;
+        return Collections.singletonList(
+            convertVertexStartedEvent((VertexStartedEvent) historyEvent));
       case VERTEX_FINISHED:
-        timelineEntity = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
-      break;
+        return Collections.singletonList(
+            convertVertexFinishedEvent((VertexFinishedEvent) historyEvent));
       case TASK_STARTED:
-        timelineEntity = convertTaskStartedEvent((TaskStartedEvent) historyEvent);
-        break;
+        return Collections.singletonList(convertTaskStartedEvent((TaskStartedEvent) historyEvent));
       case TASK_FINISHED:
-        timelineEntity = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
-        break;
+        return Collections.singletonList(
+            convertTaskFinishedEvent((TaskFinishedEvent) historyEvent));
       case TASK_ATTEMPT_STARTED:
-        timelineEntity = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
-        break;
+        return Collections.singletonList(
+            convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent));
       case TASK_ATTEMPT_FINISHED:
-        timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
-        break;
+        return Collections.singletonList(
+            convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent));
       case VERTEX_CONFIGURE_DONE:
-        timelineEntity = convertVertexReconfigureDoneEvent(
-            (VertexConfigurationDoneEvent) historyEvent);
-        break;
+        return Collections.singletonList(
+            convertVertexReconfigureDoneEvent((VertexConfigurationDoneEvent) historyEvent));
       case DAG_RECOVERED:
-        timelineEntity = convertDAGRecoveredEvent(
-            (DAGRecoveredEvent) historyEvent);
-        break;
+        return Collections.singletonList(
+            convertDAGRecoveredEvent((DAGRecoveredEvent) historyEvent));
       case VERTEX_COMMIT_STARTED:
       case VERTEX_GROUP_COMMIT_STARTED:
       case VERTEX_GROUP_COMMIT_FINISHED:
       case DAG_COMMIT_STARTED:
+      case DAG_KILL_REQUEST:
         throw new UnsupportedOperationException("Invalid Event, does not support history"
             + ", eventType=" + historyEvent.getEventType());
-      default:
-        throw new UnsupportedOperationException("Unhandled Event"
-            + ", eventType=" + historyEvent.getEventType());
+        // Do not add default, if a new event type is added, we'll get a warning for the switch.
     }
-    return timelineEntity;
+    throw new UnsupportedOperationException("Unhandled Event, eventType=" +
+        historyEvent.getEventType());
   }
 
   private static TimelineEntity convertDAGRecoveredEvent(DAGRecoveredEvent event) {
@@ -309,8 +312,6 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
     atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
     atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
-    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
-        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
     atsEntity.addOtherInfo(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID,
         event.getApplicationAttemptId().toString());
 
@@ -324,6 +325,24 @@ public class HistoryEventTimelineConversion {
     return atsEntity;
   }
 
+  private static TimelineEntity convertDAGFinishedToDAGExtraInfoEntity(DAGFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_EXTRA_INFO.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(), event.getDagID().toString());
+
+    TimelineEvent submitEvt = new TimelineEvent();
+    submitEvt.setEventType(HistoryEventType.DAG_FINISHED.name());
+    submitEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(submitEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+    return atsEntity;
+  }
+
+
   private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent event) {
     TimelineEntity atsEntity = new TimelineEntity();
     atsEntity.setEntityId(event.getDagID().toString());
@@ -397,19 +416,15 @@ public class HistoryEventTimelineConversion {
 
     if (event.getDAGPlan().hasCallerContext()
         && event.getDAGPlan().getCallerContext().hasCallerId()) {
-      atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_ID,
-          event.getDAGPlan().getCallerContext().getCallerId());
+      CallerContextProto callerContext = event.getDagPlan().getCallerContext();
+      atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_ID, callerContext.getCallerId());
+      atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_ID, callerContext.getCallerId());
+      atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT, callerContext.getContext());
     }
     if (event.getQueueName() != null) {
       atsEntity.addPrimaryFilter(ATSConstants.DAG_QUEUE_NAME, event.getQueueName());
     }
 
-    try {
-      atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
-          DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
-    } catch (IOException e) {
-      throw new TezUncheckedException(e);
-    }
     atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID,
         event.getApplicationAttemptId().getApplicationId().toString());
     atsEntity.addOtherInfo(ATSConstants.APPLICATION_ATTEMPT_ID,
@@ -433,6 +448,29 @@ public class HistoryEventTimelineConversion {
     return atsEntity;
   }
 
+  private static TimelineEntity convertDAGSubmittedToDAGExtraInfoEntity(DAGSubmittedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_EXTRA_INFO.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(), event.getDagID().toString());
+
+    TimelineEvent submitEvt = new TimelineEvent();
+    submitEvt.setEventType(HistoryEventType.DAG_SUBMITTED.name());
+    submitEvt.setTimestamp(event.getSubmitTime());
+    atsEntity.addEvent(submitEvt);
+
+    atsEntity.setStartTime(event.getSubmitTime());
+
+    try {
+      atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
+          DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+    return atsEntity;
+  }
+
   private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
     TimelineEntity atsEntity = new TimelineEntity();
     atsEntity.setEntityId(event.getTaskAttemptID().toString());

http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
index a641cda..6603f4f 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -248,7 +248,7 @@ public class TestATSHistoryLoggingService {
       .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
 
     // All calls made with session domain id.
-    verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("session-id"));
+    verify(historyACLPolicyManager, times(6)).updateTimelineEntityDomain(any(), eq("session-id"));
   }
 
   @Test(timeout=10000)
@@ -299,7 +299,7 @@ public class TestATSHistoryLoggingService {
 
     // All calls made with session domain id.
     verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("session-id"));
-    Assert.assertEquals(5, atsEntitiesCounter);
+    Assert.assertEquals(6, atsEntitiesCounter);
   }
 
   @Test(timeout=10000)
@@ -333,7 +333,7 @@ public class TestATSHistoryLoggingService {
 
     // All calls made with session domain id.
     verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("test-domain"));
-    verify(historyACLPolicyManager, times(4)).updateTimelineEntityDomain(any(), eq("dag-domain"));
+    verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("dag-domain"));
   }
 
   @Test(timeout=10000)
@@ -433,7 +433,7 @@ public class TestATSHistoryLoggingService {
 
     // All calls made with session domain id.
     verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
-    Assert.assertEquals(5, atsEntitiesCounter);
+    Assert.assertEquals(6, atsEntitiesCounter);
   }
 
   private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,

http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 28fd5da..1663cb0 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -221,7 +221,7 @@ public class TestHistoryEventTimelineConversion {
       if (event == null || !event.isHistoryEvent()) {
         continue;
       }
-      HistoryEventTimelineConversion.convertToTimelineEntity(event);
+      HistoryEventTimelineConversion.convertToTimelineEntities(event);
     }
   }
 
@@ -259,7 +259,7 @@ public class TestHistoryEventTimelineConversion {
       MockVersionInfo mockVersionInfo = new MockVersionInfo();
       AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime,
           submitTime, user, conf, mockVersionInfo);
-      HistoryEventTimelineConversion.convertToTimelineEntity(event);
+      HistoryEventTimelineConversion.convertToTimelineEntities(event);
     } finally {
       shutdown.set(true);
       confChanger.join();
@@ -279,7 +279,9 @@ public class TestHistoryEventTimelineConversion {
     AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime,
         submitTime, user, conf, mockVersionInfo);
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
 
     Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue());
 
@@ -322,7 +324,9 @@ public class TestHistoryEventTimelineConversion {
     long submitTime = random.nextLong();
     AMLaunchedEvent event = new AMLaunchedEvent(applicationAttemptId, launchTime, submitTime, user);
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
 
     Assert.assertEquals("tez_" + applicationAttemptId.toString(), timelineEntity.getEntityId());
     Assert.assertEquals(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), timelineEntity.getEntityType());
@@ -357,7 +361,9 @@ public class TestHistoryEventTimelineConversion {
 
     AMStartedEvent event = new AMStartedEvent(applicationAttemptId, startTime, user);
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
 
     Assert.assertEquals("tez_" + applicationAttemptId.toString(), timelineEntity.getEntityId());
     Assert.assertEquals(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), timelineEntity.getEntityType());
@@ -383,7 +389,9 @@ public class TestHistoryEventTimelineConversion {
     long launchTime = random.nextLong();
     ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, launchTime,
         applicationAttemptId);
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
 
     Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType());
     Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId());
@@ -414,7 +422,9 @@ public class TestHistoryEventTimelineConversion {
     int exitStatus = random.nextInt();
     ContainerStoppedEvent event = new ContainerStoppedEvent(containerId, stopTime, exitStatus,
         applicationAttemptId);
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
 
     Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId());
     Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType());
@@ -446,7 +456,10 @@ public class TestHistoryEventTimelineConversion {
     long startTime = random.nextLong();
     String dagName = "testDagName";
     DAGStartedEvent event = new DAGStartedEvent(tezDAGID, startTime, user, dagName);
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
+
 
     Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
     Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
@@ -477,7 +490,21 @@ public class TestHistoryEventTimelineConversion {
     DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan,
         applicationAttemptId, null, user, null, containerLogs, queueName);
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(2, entities.size());
+
+
+    if (entities.get(0).getEntityType().equals(EntityTypes.TEZ_DAG_ID.name())) {
+      assertDagSubmittedEntity(submitTime, event, entities.get(0));
+      assertDagSubmittedExtraInfoEntity(submitTime, event, entities.get(1));
+    } else {
+      assertDagSubmittedExtraInfoEntity(submitTime, event, entities.get(0));
+      assertDagSubmittedEntity(submitTime, event, entities.get(1));
+    }
+  }
+
+  private void assertDagSubmittedEntity(long submitTime, DAGSubmittedEvent event,
+      TimelineEntity timelineEntity) {
     Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
     Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
 
@@ -511,10 +538,9 @@ public class TestHistoryEventTimelineConversion {
         timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
     Assert.assertTrue(
         timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_QUEUE_NAME)
-            .contains(queueName));
+            .contains(event.getQueueName()));
 
     Assert.assertEquals(9, timelineEntity.getOtherInfo().size());
-    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN));
     Assert.assertEquals(applicationId.toString(),
         timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
     Assert.assertEquals(applicationAttemptId.toString(),
@@ -534,9 +560,31 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals(
         timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE),
             dagPlan.getCallerContext().getCallerType());
+    Assert.assertEquals(dagPlan.getCallerContext().getContext(),
+        timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT));
     Assert.assertEquals(
-        queueName, timelineEntity.getOtherInfo().get(ATSConstants.DAG_QUEUE_NAME));
+        event.getQueueName(), timelineEntity.getOtherInfo().get(ATSConstants.DAG_QUEUE_NAME));
+
+  }
+
+  private void assertDagSubmittedExtraInfoEntity(long submitTime, DAGSubmittedEvent event,
+      TimelineEntity timelineEntity) {
+    Assert.assertEquals(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(timelineEntity.getRelatedEntities()
+        .get(EntityTypes.TEZ_DAG_ID.name()).contains(tezDAGID.toString()));
 
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(submitTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue());
+    Assert.assertEquals(0, timelineEntity.getPrimaryFilters().size());
+    Assert.assertEquals(1, timelineEntity.getOtherInfo().size());
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN));
   }
 
   @SuppressWarnings("unchecked")
@@ -561,7 +609,10 @@ public class TestHistoryEventTimelineConversion {
     TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
         startTime, finishTime, state, TaskFailureType.FATAL, error, diagnostics, counters, events, null, creationTime,
         tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
+
     Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
     Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
 
@@ -613,8 +664,11 @@ public class TestHistoryEventTimelineConversion {
             creationTime,
             tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL",
             "nodeHttpAddress");
-    TimelineEntity timelineEntityWithNullFailureType =
-        HistoryEventTimelineConversion.convertToTimelineEntity(eventWithNullFailureType);
+    List<TimelineEntity> evtEntities = HistoryEventTimelineConversion.convertToTimelineEntities(
+        eventWithNullFailureType);
+    Assert.assertEquals(1, evtEntities.size());
+    TimelineEntity timelineEntityWithNullFailureType = evtEntities.get(0);
+
     Assert.assertNull(
         timelineEntityWithNullFailureType.getOtherInfo().get(ATSConstants.TASK_FAILURE_TYPE));
   }
@@ -630,7 +684,11 @@ public class TestHistoryEventTimelineConversion {
     DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName",
         nameIdMap);
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
+
     Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
     Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
 
@@ -671,7 +729,20 @@ public class TestHistoryEventTimelineConversion {
     DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR,
         "diagnostics", null, user, dagPlan.getName(), taskStats, applicationAttemptId, dagPlan);
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(2, entities.size());
+
+    if (entities.get(0).getEntityType().equals(EntityTypes.TEZ_DAG_ID.name())) {
+      assertDagFinishedEntity(finishTime, startTime, event, entities.get(0));
+      assertDagFinishedExtraInfoEntity(finishTime, entities.get(1));
+    } else {
+      assertDagFinishedExtraInfoEntity(finishTime, entities.get(0));
+      assertDagFinishedEntity(finishTime, startTime, event, entities.get(1));
+    }
+  }
+
+  private void assertDagFinishedEntity(long finishTime, long startTime, DAGFinishedEvent event,
+      TimelineEntity timelineEntity) {
     Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
     Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
 
@@ -703,7 +774,6 @@ public class TestHistoryEventTimelineConversion {
         ((Long) timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
     Assert.assertEquals(finishTime - startTime,
         ((Long) timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
-    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS));
     Assert.assertEquals(DAGState.ERROR.name(),
         timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
     Assert.assertEquals("diagnostics",
@@ -717,6 +787,23 @@ public class TestHistoryEventTimelineConversion {
         ((Integer) timelineEntity.getOtherInfo().get("BAR")).intValue());
   }
 
+  private void assertDagFinishedExtraInfoEntity(long finishTime, TimelineEntity timelineEntity) {
+    Assert.assertEquals(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.TEZ_DAG_ID).contains(
+            tezDAGID.toString()));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
+
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS));
+  }
+
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testConvertVertexInitializedEvent() {
@@ -731,7 +818,11 @@ public class TestHistoryEventTimelineConversion {
             .setTaskSchedulerClassName("def1")
             .setTaskCommunicatorClassName("ghi1"));
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
+
     Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
     Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
 
@@ -794,7 +885,10 @@ public class TestHistoryEventTimelineConversion {
 
     VertexStartedEvent event = new VertexStartedEvent(tezVertexID, startRequestedTime, startTime);
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
+
     Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
     Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
 
@@ -844,7 +938,10 @@ public class TestHistoryEventTimelineConversion {
             .setTaskSchedulerClassName("def1")
             .setTaskCommunicatorClassName("ghi1"));
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
+
     Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
     Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
 
@@ -909,7 +1006,10 @@ public class TestHistoryEventTimelineConversion {
     long startTime = random.nextLong();
     TaskStartedEvent event = new TaskStartedEvent(tezTaskID, "v1", scheduleTime, startTime);
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
+
     Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType());
     Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId());
 
@@ -953,7 +1053,10 @@ public class TestHistoryEventTimelineConversion {
     TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1",
         startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
+
     Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
     Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
 
@@ -1009,7 +1112,9 @@ public class TestHistoryEventTimelineConversion {
 
     TaskFinishedEvent event = new TaskFinishedEvent(tezTaskID, vertexName, startTime, finishTime,
         tezTaskAttemptID, state, diagnostics, counters, 3);
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
 
     Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId());
     Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType());
@@ -1054,7 +1159,10 @@ public class TestHistoryEventTimelineConversion {
     VertexConfigurationDoneEvent event = new VertexConfigurationDoneEvent(vId, 0L, 1, null,
         edgeMgrs, null, true);
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
+
     Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, timelineEntity.getEntityType());
     Assert.assertEquals(vId.toString(), timelineEntity.getEntityId());
     Assert.assertEquals(1, timelineEntity.getEvents().size());
@@ -1092,7 +1200,10 @@ public class TestHistoryEventTimelineConversion {
     DAGRecoveredEvent event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID,
         dagPlan.getName(), user, recoverTime, containerLogs);
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
+
     Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
     Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
 
@@ -1128,7 +1239,10 @@ public class TestHistoryEventTimelineConversion {
         dagPlan.getName(), user, recoverTime, DAGState.ERROR, "mock reason", containerLogs);
 
 
-    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+    Assert.assertEquals(1, entities.size());
+    TimelineEntity timelineEntity = entities.get(0);
+
     Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
     Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());