You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:16 UTC

[07/50] [abbrv] tez git commit: TEZ-1716. Additional ATS data for UI. (hitesh)

TEZ-1716. Additional ATS data for UI. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ede0e645
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ede0e645
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ede0e645

Branch: refs/heads/TEZ-8
Commit: ede0e645a9f4fdda44842a46b7fcf9edecdf50b2
Parents: a6c8006
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Oct 30 08:06:29 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Oct 30 08:06:29 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/common/ATSConstants.java     |   5 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  11 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  56 +++++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   2 +
 .../tez/dag/history/HistoryEventType.java       |   1 +
 .../dag/history/events/AppLaunchedEvent.java    | 104 ++++++++++
 .../dag/history/events/DAGFinishedEvent.java    |  11 +-
 .../dag/history/events/DAGInitializedEvent.java |  10 +-
 .../dag/history/events/DAGSubmittedEvent.java   |   8 +-
 .../dag/history/events/TaskFinishedEvent.java   |   1 +
 .../tez/dag/history/logging/EntityTypes.java    |   1 +
 .../impl/HistoryEventJsonConversion.java        |  51 ++++-
 .../apache/tez/dag/history/utils/DAGUtils.java  |  28 +--
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   |   4 +-
 .../TestHistoryEventsProtoConversion.java       |  30 ++-
 .../impl/TestHistoryEventJsonConversion.java    |  12 +-
 .../tez/dag/history/utils/TestDAGUtils.java     |   5 +-
 .../ats/HistoryEventTimelineConversion.java     |  57 +++++-
 .../ats/TestHistoryEventTimelineConversion.java | 188 ++++++++++++++++++-
 20 files changed, 533 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ca1da2e..5d6ebf5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -89,6 +89,7 @@ ALL CHANGES:
   invocations
   TEZ-1700. Replace containerId from TaskLocationHint with [TaskIndex+Vertex]
   based affinity
+  TEZ-1716. Additional ATS data for UI.
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index ab81683..58761d5 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -50,9 +50,11 @@ public class ATSConstants {
   public static final String APP_SUBMIT_TIME = "appSubmitTime";
 
   /* Tez-specific info */
+  public static final String CONFIG = "config";
   public static final String DAG_PLAN = "dagPlan";
   public static final String DAG_NAME = "dagName";
   public static final String VERTEX_NAME = "vertexName";
+  public static final String VERTEX_NAME_ID_MAPPING = "vertexNameIdMapping";
   public static final String SCHEDULED_TIME = "scheduledTime";
   public static final String INIT_REQUESTED_TIME = "initRequestedTime";
   public static final String INIT_TIME = "initTime";
@@ -62,6 +64,7 @@ public class ATSConstants {
   public static final String TIME_TAKEN = "timeTaken";
   public static final String STATUS = "status";
   public static final String DIAGNOSTICS = "diagnostics";
+  public static final String SUCCESSFUL_ATTEMPT_ID = "successfulAttemptId";
   public static final String COUNTERS = "counters";
   public static final String STATS = "stats";
   public static final String NUM_TASKS = "numTasks";
@@ -70,6 +73,8 @@ public class ATSConstants {
   public static final String NUM_SUCCEEDED_TASKS = "numSucceededTasks";
   public static final String NUM_FAILED_TASKS = "numFailedTasks";
   public static final String NUM_KILLED_TASKS = "numKilledTasks";
+  public static final String NUM_FAILED_TASKS_ATTEMPTS = "numFailedTaskAttempts";
+  public static final String NUM_KILLED_TASKS_ATTEMPTS = "numKilledTaskAttempts";
   public static final String PROCESSOR_CLASS_NAME = "processorClassName";
   public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
   public static final String COMPLETED_LOGS_URL = "completedLogsURL";

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 20da85b..789de24 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -145,6 +145,7 @@ import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.events.AMLaunchedEvent;
 import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
@@ -459,6 +460,12 @@ public class DAGAppMaster extends AbstractService {
     super.serviceInit(conf);
 
     if (!versionMismatch) {
+      if (this.appAttemptID.getAttemptId() == 1) {
+        AppLaunchedEvent appLaunchedEvent = new AppLaunchedEvent(appAttemptID.getApplicationId(),
+            startTime, appSubmitTime, appMasterUgi.getShortUserName(), this.amConf);
+        historyEventHandler.handle(
+            new DAGHistoryEvent(appLaunchedEvent));
+      }
       AMLaunchedEvent launchedEvent = new AMLaunchedEvent(appAttemptID,
           startTime, appSubmitTime, appMasterUgi.getShortUserName());
       historyEventHandler.handle(
@@ -749,7 +756,7 @@ public class DAGAppMaster extends AbstractService {
       if (LOG.isDebugEnabled()) {
         LOG.info("JSON dump for submitted DAG, dagId=" + dagId.toString()
             + ", json="
-            + DAGUtils.generateSimpleJSONPlan(dagPB, newDag.getVertexNameIDMapping()).toString());
+            + DAGUtils.generateSimpleJSONPlan(dagPB).toString());
       }
     } catch (JSONException e) {
       LOG.warn("Failed to generate json for DAG", e);
@@ -1915,7 +1922,7 @@ public class DAGAppMaster extends AbstractService {
     // for an app later
     DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
         submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources,
-        newDAG.getUserName(), newDAG.getVertexNameIDMapping());
+        newDAG.getUserName());
     try {
       historyEventHandler.handleCriticalEvent(
           new DAGHistoryEvent(newDAG.getID(), submittedEvent));

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index d3aecd4..6dccf3a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -716,6 +717,40 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
   }
 
+  private ProgressBuilder getDAGProgress() {
+    int totalTaskCount = 0;
+    int totalSucceededTaskCount = 0;
+    int totalRunningTaskCount = 0;
+    int totalFailedTaskCount = 0;
+    int totalKilledTaskCount = 0;
+    int totalFailedTaskAttemptCount = 0;
+    int totalKilledTaskAttemptCount = 0;
+    readLock.lock();
+    try {
+      for(Map.Entry<String, Vertex> entry : vertexMap.entrySet()) {
+        ProgressBuilder progress = entry.getValue().getVertexProgress();
+        totalTaskCount += progress.getTotalTaskCount();
+        totalSucceededTaskCount += progress.getSucceededTaskCount();
+        totalRunningTaskCount += progress.getRunningTaskCount();
+        totalFailedTaskCount += progress.getFailedTaskCount();
+        totalKilledTaskCount += progress.getKilledTaskCount();
+        totalFailedTaskAttemptCount += progress.getFailedTaskAttemptCount();
+        totalKilledTaskAttemptCount += progress.getKilledTaskAttemptCount();
+      }
+      ProgressBuilder dagProgress = new ProgressBuilder();
+      dagProgress.setTotalTaskCount(totalTaskCount);
+      dagProgress.setSucceededTaskCount(totalSucceededTaskCount);
+      dagProgress.setRunningTaskCount(totalRunningTaskCount);
+      dagProgress.setFailedTaskCount(totalFailedTaskCount);
+      dagProgress.setKilledTaskCount(totalKilledTaskCount);
+      dagProgress.setFailedTaskAttemptCount(totalFailedTaskAttemptCount);
+      dagProgress.setKilledTaskAttemptCount(totalKilledTaskAttemptCount);
+      return dagProgress;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   @Override
   public VertexStatusBuilder getVertexStatus(String vertexName,
       Set<StatusGetOpts> statusOptions) {
@@ -940,18 +975,32 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     finishTime = clock.getTime();
   }
 
+  private Map<String, Integer> constructTaskStats(ProgressBuilder progressBuilder) {
+    Map<String, Integer> taskStats = new HashMap<String, Integer>();
+    taskStats.put(ATSConstants.NUM_COMPLETED_TASKS, progressBuilder.getTotalTaskCount());
+    taskStats.put(ATSConstants.NUM_SUCCEEDED_TASKS, progressBuilder.getSucceededTaskCount());
+    taskStats.put(ATSConstants.NUM_FAILED_TASKS, progressBuilder.getFailedTaskCount());
+    taskStats.put(ATSConstants.NUM_KILLED_TASKS, progressBuilder.getKilledTaskCount());
+    taskStats.put(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS,
+        progressBuilder.getFailedTaskAttemptCount());
+    taskStats.put(ATSConstants.NUM_KILLED_TASKS_ATTEMPTS,
+        progressBuilder.getKilledTaskAttemptCount());
+    return taskStats;
+  }
+
   void logJobHistoryFinishedEvent() throws IOException {
     this.setFinishTime();
+    Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
         finishTime, DAGState.SUCCEEDED, "", getAllCounters(),
-        this.userName, this.dagName);
+        this.userName, this.dagName, taskStats);
     this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(dagId, finishEvt));
   }
 
   void logJobHistoryInitedEvent() {
     DAGInitializedEvent initEvt = new DAGInitializedEvent(this.dagId,
-        this.initTime, this.userName, this.dagName);
+        this.initTime, this.userName, this.dagName, this.getVertexNameIDMapping());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(dagId, initEvt));
   }
@@ -964,10 +1013,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   void logJobHistoryUnsuccesfulEvent(DAGState state) throws IOException {
+    Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
         clock.getTime(), state,
         StringUtils.join(getDiagnostics(), LINE_SEPARATOR),
-        getAllCounters(), this.userName, this.dagName);
+        getAllCounters(), this.userName, this.dagName, taskStats);
     this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(dagId, finishEvt));
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 5c76a77..4edd12b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1520,6 +1520,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     taskStats.put(ATSConstants.NUM_SUCCEEDED_TASKS, succeededTaskCount);
     taskStats.put(ATSConstants.NUM_FAILED_TASKS, failedTaskCount);
     taskStats.put(ATSConstants.NUM_KILLED_TASKS, killedTaskCount);
+    taskStats.put(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, failedTaskAttemptCount.get());
+    taskStats.put(ATSConstants.NUM_KILLED_TASKS_ATTEMPTS, killedTaskAttemptCount.get());
 
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, initTimeRequested,
         initedTime, startTimeRequested, startedTime, finishTime, finalState, diagnostics,

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index fd747e0..17df58f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.history;
 
 public enum HistoryEventType {
+  APP_LAUNCHED,
   AM_LAUNCHED,
   AM_STARTED,
   DAG_SUBMITTED,

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
new file mode 100644
index 0000000..4c79c53
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+
+public class AppLaunchedEvent implements HistoryEvent {
+
+  private ApplicationId applicationId;
+  private long launchTime;
+  private long appSubmitTime;
+  private String user;
+  private Configuration conf;
+
+  public AppLaunchedEvent() {
+  }
+
+  public AppLaunchedEvent(ApplicationId appId,
+      long launchTime, long appSubmitTime, String user,
+      Configuration conf) {
+    this.applicationId = appId;
+    this.launchTime = launchTime;
+    this.appSubmitTime = appSubmitTime;
+    this.user = user;
+    this.conf = conf;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.APP_LAUNCHED;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return false;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return true;
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    throw new UnsupportedOperationException("Not a recovery event");
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    throw new UnsupportedOperationException("Not a recovery event");
+  }
+
+  @Override
+  public String toString() {
+    return "applicationId=" + applicationId
+        + ", appSubmitTime=" + appSubmitTime
+        + ", launchTime=" + launchTime;
+  }
+
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+  public long getLaunchTime() {
+    return launchTime;
+  }
+
+  public long getAppSubmitTime() {
+    return appSubmitTime;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 21199f4..e05f043 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -51,13 +52,15 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
   private String user;
   private String dagName;
 
+  Map<String, Integer> dagTaskStats;
+
   public DAGFinishedEvent() {
   }
 
   public DAGFinishedEvent(TezDAGID dagId, long startTime,
       long finishTime, DAGState state,
       String diagnostics, TezCounters counters,
-      String user, String dagName) {
+      String user, String dagName, Map<String, Integer> dagTaskStats) {
     this.dagID = dagId;
     this.startTime = startTime;
     this.finishTime = finishTime;
@@ -66,6 +69,7 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
     this.tezCounters = counters;
     this.user = user;
     this.dagName = dagName;
+    this.dagTaskStats = dagTaskStats;
   }
 
   @Override
@@ -194,4 +198,9 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
   public String getDagName() {
     return dagName;
   }
+
+  public Map<String, Integer> getDagTaskStats() {
+    return dagTaskStats;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
index 6e17da8..98d64d3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
@@ -21,10 +21,12 @@ package org.apache.tez.dag.history.events;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
 
 public class DAGInitializedEvent implements HistoryEvent {
@@ -33,16 +35,18 @@ public class DAGInitializedEvent implements HistoryEvent {
   private long initTime;
   private String user;
   private String dagName;
+  private Map<String, TezVertexID> vertexNameIDMap;
 
   public DAGInitializedEvent() {
   }
 
   public DAGInitializedEvent(TezDAGID dagID, long initTime,
-      String user, String dagName) {
+      String user, String dagName, Map<String, TezVertexID> vertexNameIDMap) {
     this.dagID = dagID;
     this.initTime = initTime;
     this.user = user;
     this.dagName = dagName;
+    this.vertexNameIDMap = vertexNameIDMap;
   }
 
   @Override
@@ -109,4 +113,8 @@ public class DAGInitializedEvent implements HistoryEvent {
     return dagName;
   }
 
+  public Map<String, TezVertexID> getVertexNameIDMap() {
+    return vertexNameIDMap;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 0074a4e..7f0fab3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -54,7 +54,6 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   private ApplicationAttemptId applicationAttemptId;
   private String user;
   private Map<String, LocalResource> cumulativeAdditionalLocalResources;
-  private Map<String, TezVertexID> vertexNameIDMap;
 
   public DAGSubmittedEvent() {
   }
@@ -62,7 +61,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
       DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId,
       Map<String, LocalResource> cumulativeAdditionalLocalResources,
-      String user, Map<String, TezVertexID> vertexNameIDMap) {
+      String user) {
     this.dagID = dagID;
     this.dagName = dagPlan.getName();
     this.submitTime = submitTime;
@@ -70,7 +69,6 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
     this.applicationAttemptId = applicationAttemptId;
     this.cumulativeAdditionalLocalResources = cumulativeAdditionalLocalResources;
     this.user = user;
-    this.vertexNameIDMap = vertexNameIDMap;
   }
 
   @Override
@@ -185,8 +183,4 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
     return user;
   }
 
-  public Map<String, TezVertexID> getVertexNameIDMap() {
-    return vertexNameIDMap;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index 9323270..c367d5c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -56,6 +56,7 @@ public class TaskFinishedEvent implements HistoryEvent {
     this.state = state;
     this.diagnostics = diagnostics;
     this.tezCounters = counters;
+    this.successfulAttemptID = successfulAttemptID;
   }
 
   public TaskFinishedEvent() {

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
index 00cac28..e2f0882 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.history.logging;
 
 public enum EntityTypes {
+  TEZ_APPLICATION,
   TEZ_APPLICATION_ATTEMPT,
   TEZ_CONTAINER_ID,
   TEZ_DAG_ID,

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index ad42392..0b6f9d2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import java.util.Map.Entry;
+import java.util.TreeMap;
 
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
@@ -30,6 +31,7 @@ import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.AMLaunchedEvent;
 import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
 import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
 import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
@@ -60,6 +62,9 @@ public class HistoryEventJsonConversion {
     }
     JSONObject jsonObject = null;
     switch (historyEvent.getEventType()) {
+      case APP_LAUNCHED:
+        jsonObject = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent);
+        break;
       case AM_LAUNCHED:
         jsonObject = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
         break;
@@ -122,6 +127,24 @@ public class HistoryEventJsonConversion {
     return jsonObject;
   }
 
+  private static JSONObject convertAppLaunchedEvent(AppLaunchedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + event.getApplicationId().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_APPLICATION.name());
+
+    // Other info to tag with Tez App
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.USER, event.getUser());
+    otherInfo.put(ATSConstants.CONFIG, new JSONObject(
+        DAGUtils.convertConfigurationToATSMap(event.getConf())));
+
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
   private static JSONObject convertAMLaunchedEvent(AMLaunchedEvent event) throws JSONException {
     JSONObject jsonObject = new JSONObject();
     jsonObject.put(ATSConstants.ENTITY,
@@ -307,6 +330,14 @@ public class HistoryEventJsonConversion {
     otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     otherInfo.put(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToJSON(event.getTezCounters()));
+
+    final Map<String, Integer> dagTaskStats = event.getDagTaskStats();
+    if (dagTaskStats != null) {
+      for(Entry<String, Integer> entry : dagTaskStats.entrySet()) {
+        otherInfo.put(entry.getKey(), entry.getValue().intValue());
+      }
+    }
+
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
     return jsonObject;
@@ -330,6 +361,17 @@ public class HistoryEventJsonConversion {
     events.put(initEvent);
     jsonObject.put(ATSConstants.EVENTS, events);
 
+    JSONObject otherInfo = new JSONObject();
+
+    if (event.getVertexNameIDMap() != null) {
+      Map<String, String> nameIdStrMap = new TreeMap<String, String>();
+      for (Entry<String, TezVertexID> entry : event.getVertexNameIDMap().entrySet()) {
+        nameIdStrMap.put(entry.getKey(), entry.getValue().toString());
+      }
+      otherInfo.put(ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap);
+    }
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
     return jsonObject;
   }
 
@@ -411,7 +453,7 @@ public class HistoryEventJsonConversion {
     // Other info such as dag plan
     JSONObject otherInfo = new JSONObject();
     otherInfo.put(ATSConstants.DAG_PLAN,
-        DAGUtils.generateSimpleJSONPlan(event.getDAGPlan(), event.getVertexNameIDMap()));
+        DAGUtils.generateSimpleJSONPlan(event.getDAGPlan()));
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
     return jsonObject;
@@ -510,6 +552,9 @@ public class HistoryEventJsonConversion {
     otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     otherInfo.put(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToJSON(event.getTezCounters()));
+    if (event.getSuccessfulAttemptID() != null) {
+      otherInfo.put(ATSConstants.SUCCESSFUL_ATTEMPT_ID, event.getSuccessfulAttemptID().toString());
+    }
 
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
@@ -576,8 +621,8 @@ public class HistoryEventJsonConversion {
 
     final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
     if (vertexTaskStats != null) {
-      for(String key : vertexTaskStats.keySet()) {
-        otherInfo.put(key, vertexTaskStats.get(key));
+      for(Entry<String, Integer> entry : vertexTaskStats.entrySet()) {
+        otherInfo.put(entry.getKey(), entry.getValue().intValue());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 115f739..0bcbcbe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -22,9 +22,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
@@ -85,11 +89,10 @@ public class DAGUtils {
 
 
 
-  public static JSONObject generateSimpleJSONPlan(DAGPlan dagPlan,
-      Map<String, TezVertexID> vertexNameIDMap) throws JSONException {
+  public static JSONObject generateSimpleJSONPlan(DAGPlan dagPlan) throws JSONException {
     JSONObject dagJson;
     try {
-      dagJson = new JSONObject(convertDAGPlanToATSMap(dagPlan, vertexNameIDMap));
+      dagJson = new JSONObject(convertDAGPlanToATSMap(dagPlan));
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }
@@ -129,8 +132,7 @@ public class DAGUtils {
     return object;
   }
 
-  public static Map<String,Object> convertDAGPlanToATSMap(
-      DAGPlan dagPlan, Map<String, TezVertexID> vertexNameIDMap) throws IOException {
+  public static Map<String,Object> convertDAGPlanToATSMap(DAGPlan dagPlan) throws IOException {
 
     final String VERSION_KEY = "version";
     final int version = 1;
@@ -141,12 +143,6 @@ public class DAGUtils {
     for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
       Map<String,Object> vertexMap = new LinkedHashMap<String, Object>();
       vertexMap.put(VERTEX_NAME_KEY, vertexPlan.getName());
-      if (vertexNameIDMap != null && !vertexNameIDMap.isEmpty()) {
-        TezVertexID vertexID = vertexNameIDMap.get(vertexPlan.getName());
-        if (vertexID != null) {
-          vertexMap.put(VERTEX_ID_KEY, vertexID.toString());
-        }
-      }
       if (vertexPlan.hasProcessorDescriptor()) {
         vertexMap.put(PROCESSOR_CLASS_KEY,
             vertexPlan.getProcessorDescriptor().getClassName());
@@ -361,4 +357,14 @@ public class DAGUtils {
     return jsonDescriptor;
   }
 
+  public static Map<String, String> convertConfigurationToATSMap(Configuration conf) {
+    Iterator<Entry<String, String>> iter = conf.iterator();
+    Map<String, String> atsConf = new TreeMap<String, String>();
+    while (iter.hasNext()) {
+      Entry<String, String> entry = iter.next();
+      atsConf.put(entry.getKey(), entry.getValue());
+    }
+    return atsConf;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index f05a330..1edefa7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -100,7 +100,7 @@ public class TestDAGRecovery {
   private void restoreFromDAGInitializedEvent() {
     DAGState recoveredState =
         dag.restoreFromEvent(new DAGInitializedEvent(dagId, initTime, user,
-            dagName));
+            dagName, null));
     assertEquals(DAGState.INITED, recoveredState);
     assertEquals(initTime, dag.initTime);
     assertEquals(6, dag.getVertices().size());
@@ -144,7 +144,7 @@ public class TestDAGRecovery {
   private void restoreFromDAGFinishedEvent(DAGState finalState) {
     DAGState recoveredState =
         dag.restoreFromEvent(new DAGFinishedEvent(dagId, startTime, finishTime,
-            finalState, "", tezCounters, user, dagName));
+            finalState, "", tezCounters, user, dagName, null));
     assertEquals(finishTime, dag.finishTime);
     assertFalse(dag.recoveryCommitInProgress);
     assertEquals(finalState, recoveredState);

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index ad508b6..a7a23db 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -18,9 +18,12 @@
 
 package org.apache.tez.dag.history.events;
 
+import static org.junit.Assert.fail;
+
 import java.nio.ByteBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -114,6 +117,20 @@ public class TestHistoryEventsProtoConversion {
     LOG.info("Deserialized Event toString: " + deserializedEvent.toString());
   }
 
+  private void testAppLaunchedEvent() throws Exception {
+    AppLaunchedEvent event = new AppLaunchedEvent(ApplicationId.newInstance(0, 1),
+        100, 100, null, new Configuration(false));
+    try {
+      testProtoConversion(event);
+      fail("Expected to fail on conversion");
+    } catch (UnsupportedOperationException e) {
+      // Expected
+    }
+
+    LOG.info("Initial Event toString: " + event.toString());
+
+  }
+
   private void testAMLaunchedEvent() throws Exception {
     AMLaunchedEvent event = new AMLaunchedEvent(
         ApplicationAttemptId.newInstance(
@@ -148,7 +165,7 @@ public class TestHistoryEventsProtoConversion {
         ApplicationId.newInstance(0, 1), 1), 1001l,
         DAGPlan.newBuilder().setName("foo").build(),
         ApplicationAttemptId.newInstance(
-            ApplicationId.newInstance(0, 1), 1), null, "", null);
+            ApplicationId.newInstance(0, 1), 1), null, "");
     DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getApplicationAttemptId(),
@@ -167,7 +184,7 @@ public class TestHistoryEventsProtoConversion {
   private void testDAGInitializedEvent() throws Exception {
     DAGInitializedEvent event = new DAGInitializedEvent(
         TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l,
-        "user", "dagName");
+        "user", "dagName", null);
     DAGInitializedEvent deserializedEvent = (DAGInitializedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getDagID(),
@@ -192,7 +209,7 @@ public class TestHistoryEventsProtoConversion {
     {
       DAGFinishedEvent event = new DAGFinishedEvent(
           TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
-          DAGState.FAILED, null, null, "user", "dagName");
+          DAGState.FAILED, null, null, "user", "dagName", null);
       DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(
@@ -213,7 +230,7 @@ public class TestHistoryEventsProtoConversion {
       DAGFinishedEvent event = new DAGFinishedEvent(
           TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
           DAGState.FAILED, "bad diagnostics", tezCounters,
-          "user", "dagName");
+          "user", "dagName", null);
       DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(
@@ -531,7 +548,7 @@ public class TestHistoryEventsProtoConversion {
       event = new VertexRecoverableEventsGeneratedEvent(
           TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), null);
-      Assert.fail("Invalid creation should have errored out");
+      fail("Invalid creation should have errored out");
     } catch (RuntimeException e) {
       // Expected
     }
@@ -617,6 +634,9 @@ public class TestHistoryEventsProtoConversion {
   public void testDefaultProtoConversion() throws Exception {
     for (HistoryEventType eventType : HistoryEventType.values()) {
       switch (eventType) {
+        case APP_LAUNCHED:
+          testAppLaunchedEvent();
+          break;
         case AM_LAUNCHED:
           testAMLaunchedEvent();
           break;

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index c9384e1..c3d51c3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -38,6 +39,7 @@ import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.AMLaunchedEvent;
 import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
 import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
 import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
@@ -101,6 +103,10 @@ public class TestHistoryEventJsonConversion {
     for (HistoryEventType eventType : HistoryEventType.values()) {
       HistoryEvent event = null;
       switch (eventType) {
+        case APP_LAUNCHED:
+          event = new AppLaunchedEvent(applicationId, random.nextInt(), random.nextInt(),
+              user, new Configuration(false));
+          break;
         case AM_LAUNCHED:
           event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
               user);
@@ -110,17 +116,17 @@ public class TestHistoryEventJsonConversion {
           break;
         case DAG_SUBMITTED:
           event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
-              null, user, null);
+              null, user);
           break;
         case DAG_INITIALIZED:
-          event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+          event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
           break;
         case DAG_STARTED:
           event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
           break;
         case DAG_FINISHED:
           event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
-              null, null, user, dagPlan.getName());
+              null, null, user, dagPlan.getName(), null);
           break;
         case VERTEX_INITIALIZED:
           event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index e50c67c..6f4202a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -108,7 +108,7 @@ public class TestDAGUtils {
     idNameMap.put("vertex2", vId2);
     idNameMap.put("vertex3", vId3);
 
-    Map<String, Object> atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan, idNameMap);
+    Map<String, Object> atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan);
     Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY));
     Assert.assertEquals(dagPlan.getName(), atsMap.get(DAGUtils.DAG_NAME_KEY));
     Assert.assertTrue(atsMap.containsKey("version"));
@@ -128,10 +128,7 @@ public class TestDAGUtils {
     for (Object o : ((Collection<?>) atsMap.get(DAGUtils.VERTICES_KEY))) {
       Map<String, Object> v = (Map<String, Object>) o;
       Assert.assertTrue(v.containsKey(DAGUtils.VERTEX_NAME_KEY));
-      Assert.assertTrue(v.containsKey(DAGUtils.VERTEX_ID_KEY));
-      String vId = (String)v.get(DAGUtils.VERTEX_ID_KEY);
       String vName = (String)v.get(DAGUtils.VERTEX_NAME_KEY);
-      Assert.assertEquals(idNameMap.get(vName).toString(), vId);
       Assert.assertTrue(v.containsKey(DAGUtils.PROCESSOR_CLASS_KEY));
       Assert.assertTrue(v.containsKey(DAGUtils.USER_PAYLOAD_AS_TEXT));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 1d569d6..a492408 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.TreeMap;
 
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
@@ -32,6 +33,7 @@ import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.AMLaunchedEvent;
 import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
 import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
 import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
@@ -48,6 +50,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TezVertexID;
 
 public class HistoryEventTimelineConversion {
 
@@ -58,6 +61,9 @@ public class HistoryEventTimelineConversion {
     }
     TimelineEntity timelineEntity = null;
     switch (historyEvent.getEventType()) {
+      case APP_LAUNCHED:
+        timelineEntity = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent);
+        break;
       case AM_LAUNCHED:
         timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
         break;
@@ -121,6 +127,24 @@ public class HistoryEventTimelineConversion {
     return timelineEntity;
   }
 
+  private static TimelineEntity convertAppLaunchedEvent(AppLaunchedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getApplicationId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION.name());
+
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+        event.getApplicationId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+
+    atsEntity.addOtherInfo(ATSConstants.CONFIG,
+        DAGUtils.convertConfigurationToATSMap(event.getConf()));
+
+    return atsEntity;
+  }
+
   private static TimelineEntity convertAMLaunchedEvent(AMLaunchedEvent event) {
     TimelineEntity atsEntity = new TimelineEntity();
     atsEntity.setEntityId("tez_"
@@ -234,6 +258,13 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToATSMap(event.getTezCounters()));
 
+    final Map<String, Integer> dagTaskStats = event.getDagTaskStats();
+    if (dagTaskStats != null) {
+      for(Entry<String, Integer> entry : dagTaskStats.entrySet()) {
+        atsEntity.addOtherInfo(entry.getKey(), entry.getValue().intValue());
+      }
+    }
+
     return atsEntity;
   }
 
@@ -242,16 +273,24 @@ public class HistoryEventTimelineConversion {
     atsEntity.setEntityId(event.getDagID().toString());
     atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
 
-    TimelineEvent finishEvt = new TimelineEvent();
-    finishEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name());
-    finishEvt.setTimestamp(event.getInitTime());
-    atsEntity.addEvent(finishEvt);
+    TimelineEvent initEvt = new TimelineEvent();
+    initEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name());
+    initEvt.setTimestamp(event.getInitTime());
+    atsEntity.addEvent(initEvt);
 
     atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
     atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
 
     atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitTime());
 
+    if (event.getVertexNameIDMap() != null) {
+      Map<String, String> nameIdStrMap = new TreeMap<String, String>();
+      for (Entry<String, TezVertexID> entry : event.getVertexNameIDMap().entrySet()) {
+        nameIdStrMap.put(entry.getKey(), entry.getValue().toString());
+      }
+      atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap);
+    }
+
     return atsEntity;
   }
 
@@ -301,7 +340,7 @@ public class HistoryEventTimelineConversion {
 
     try {
       atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
-          DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan(), event.getVertexNameIDMap()));
+          DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }
@@ -396,6 +435,10 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
     atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
     atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+    if (event.getSuccessfulAttemptID() != null) {
+      atsEntity.addOtherInfo(ATSConstants.SUCCESSFUL_ATTEMPT_ID,
+          event.getSuccessfulAttemptID().toString());
+    }
 
     atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     atsEntity.addOtherInfo(ATSConstants.COUNTERS,
@@ -459,8 +502,8 @@ public class HistoryEventTimelineConversion {
 
     final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
     if (vertexTaskStats != null) {
-      for(String key : vertexTaskStats.keySet()) {
-        atsEntity.addOtherInfo(key, vertexTaskStats.get(key));
+      for(Entry<String, Integer> entry : vertexTaskStats.entrySet()) {
+        atsEntity.addOtherInfo(entry.getKey(), entry.getValue().intValue());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 8f95c1e..ba71d46 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -35,10 +36,12 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.AMLaunchedEvent;
 import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
 import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
 import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
@@ -101,6 +104,10 @@ public class TestHistoryEventTimelineConversion {
     for (HistoryEventType eventType : HistoryEventType.values()) {
       HistoryEvent event = null;
       switch (eventType) {
+        case APP_LAUNCHED:
+          event = new AppLaunchedEvent(applicationId, random.nextInt(), random.nextInt(),
+              user, new Configuration(false));
+          break;
         case AM_LAUNCHED:
           event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
               user);
@@ -110,17 +117,17 @@ public class TestHistoryEventTimelineConversion {
           break;
         case DAG_SUBMITTED:
           event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
-              null, user, null);
+              null, user);
           break;
         case DAG_INITIALIZED:
-          event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+          event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
           break;
         case DAG_STARTED:
           event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
           break;
         case DAG_FINISHED:
           event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
-              null, null, user, dagPlan.getName());
+              null, null, user, dagPlan.getName(), null);
           break;
         case VERTEX_INITIALIZED:
           event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
@@ -185,6 +192,41 @@ public class TestHistoryEventTimelineConversion {
   }
 
   @Test
+  public void testConvertAppLaunchedEvent() {
+    long launchTime = random.nextLong();
+    long submitTime = random.nextLong();
+    Configuration conf = new Configuration(false);
+    conf.set("foo", "bar");
+    conf.set("applicationId", "1234");
+
+
+    AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime,
+        submitTime, user, conf);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+    Assert.assertEquals(EntityTypes.TEZ_APPLICATION.name(), timelineEntity.getEntityType());
+    Assert.assertEquals("tez_" + applicationId.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(2, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains(
+            applicationId.toString()));
+
+    Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+
+    Assert.assertEquals(1, timelineEntity.getOtherInfo().size());
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.CONFIG));
+
+    Map<String, String> config =
+        (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.CONFIG);
+    Assert.assertEquals(conf.get("foo"), config.get("foo"));
+    Assert.assertEquals(conf.get("applicationId"), config.get("applicationId"));
+  }
+
+  @Test
   public void testConvertContainerLaunchedEvent() {
     long launchTime = random.nextLong();
     ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, launchTime,
@@ -219,7 +261,7 @@ public class TestHistoryEventTimelineConversion {
     long submitTime = random.nextLong();
 
     DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan,
-        applicationAttemptId, null, user, null);
+        applicationAttemptId, null, user);
 
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
@@ -263,6 +305,92 @@ public class TestHistoryEventTimelineConversion {
   }
 
   @Test
+  public void testConvertDAGInitializedEvent() {
+    long initTime = random.nextLong();
+
+    Map<String,TezVertexID> nameIdMap = new HashMap<String, TezVertexID>();
+    nameIdMap.put("foo", tezVertexID);
+
+    DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName",
+        nameIdMap);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.DAG_INITIALIZED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(initTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("dagName"));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(
+        ATSConstants.VERTEX_NAME_ID_MAPPING));
+    Map<String, String> vIdMap = (Map<String, String>) timelineEntity.getOtherInfo().get(
+        ATSConstants.VERTEX_NAME_ID_MAPPING);
+    Assert.assertEquals(1, vIdMap.size());
+    Assert.assertNotNull(vIdMap.containsKey("foo"));
+    Assert.assertEquals(tezVertexID.toString(), vIdMap.get("foo"));
+
+  }
+
+  @Test
+  public void testConvertDAGFinishedEvent() {
+    long finishTime = random.nextLong();
+    long startTime = random.nextLong();
+    Map<String,Integer> taskStats = new HashMap<String, Integer>();
+    taskStats.put("FOO", 100);
+    taskStats.put("BAR", 200);
+
+    DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR,
+        "diagnostics", null, user, dagPlan.getName(), taskStats);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(dagPlan.getName()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains(
+            DAGState.ERROR.name()));
+
+    Assert.assertEquals(startTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
+    Assert.assertEquals(finishTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
+    Assert.assertEquals(finishTime - startTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS));
+    Assert.assertEquals(DAGState.ERROR.name(),
+        timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
+    Assert.assertEquals("diagnostics",
+        timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
+
+    Assert.assertEquals(100,
+        ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue());
+    Assert.assertEquals(200,
+        ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue());
+  }
+
+  @Test
   public void testConvertVertexInitializedEvent() {
     long initRequestedTime = random.nextLong();
     long initedTime = random.nextLong();
@@ -306,6 +434,58 @@ public class TestHistoryEventTimelineConversion {
   }
 
   @Test
+  public void testConvertVertexFinishedEvent() {
+    long initRequestedTime = random.nextLong();
+    long initedTime = random.nextLong();
+    long startRequestedTime = random.nextLong();
+    long startTime = random.nextLong();
+    long finishTime = random.nextLong();
+    Map<String,Integer> taskStats = new HashMap<String, Integer>();
+    taskStats.put("FOO", 100);
+    taskStats.put("BAR", 200);
+    VertexStats vertexStats = new VertexStats();
+
+    VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", initRequestedTime,
+        initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR,
+        "diagnostics", null, vertexStats, taskStats);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+    Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+            tezDAGID.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains(
+            VertexState.ERROR.name()));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.VERTEX_FINISHED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(finishTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
+    Assert.assertEquals(finishTime - startTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
+    Assert.assertEquals(VertexState.ERROR.name(),
+        timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
+    Assert.assertEquals("diagnostics",
+        timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
+
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS));
+
+    Assert.assertEquals(100,
+        ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue());
+    Assert.assertEquals(200,
+        ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue());
+  }
+
+  @Test
   public void testConvertTaskStartedEvent() {
     long scheduleTime = random.nextLong();
     long startTime = random.nextLong();