You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/14 19:36:26 UTC

git commit: TEZ-930. Provide additional aggregated task stats at the vertex level. (hitesh)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 537d3bca4 -> 6a2585764


TEZ-930. Provide additional aggregated task stats at the vertex level. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6a258576
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6a258576
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6a258576

Branch: refs/heads/master
Commit: 6a2585764c64d0b70be34b59fab1c964a54588b4
Parents: 537d3bc
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Mar 14 11:36:04 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Mar 14 11:36:04 2014 -0700

----------------------------------------------------------------------
 .../tez/dag/api/oldrecords/TaskReport.java      |  27 +--
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  25 +--
 .../tez/dag/app/dag/impl/TaskReportImpl.java    |  98 +++++++++++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  37 +++-
 .../tez/dag/app/dag/impl/VertexStats.java       | 169 +++++++++++++++++++
 .../dag/history/events/VertexFinishedEvent.java |  16 +-
 .../tez/dag/history/utils/ATSConstants.java     |   1 +
 .../apache/tez/dag/history/utils/DAGUtils.java  |  52 ++++++
 .../tez/dag/app/dag/impl/TestVertexStats.java   | 159 +++++++++++++++++
 .../TestHistoryEventsProtoConversion.java       |   5 +-
 10 files changed, 531 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskReport.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskReport.java b/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskReport.java
index 3a0685f..76bfc96 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskReport.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskReport.java
@@ -30,34 +30,11 @@ public interface TaskReport {
   public abstract float getProgress();
   public abstract long getStartTime();
   public abstract long getFinishTime();
-  public abstract TezCounters getCounters();
-  
-  public abstract List<TezTaskAttemptID> getRunningAttemptsList();
-  public abstract TezTaskAttemptID getRunningAttempt(int index);
-  public abstract int getRunningAttemptsCount();
-  
-  public abstract TezTaskAttemptID getSuccessfulAttempt();
-  
-  public abstract List<String> getDiagnosticsList();
-  public abstract String getDiagnostics(int index);
-  public abstract int getDiagnosticsCount();
-  
-  
+
   public abstract void setTaskId(TezTaskID taskId);
   public abstract void setTaskState(TaskState taskState);
   public abstract void setProgress(float progress);
   public abstract void setStartTime(long startTime);
   public abstract void setFinishTime(long finishTime);
-  public abstract void setCounters(TezCounters counters);
-  
-  public abstract void addAllRunningAttempts(List<TezTaskAttemptID> taskAttempts);
-  public abstract void addRunningAttempt(TezTaskAttemptID taskAttempt);
-  public abstract void removeRunningAttempt(int index);
-  public abstract void clearRunningAttempts();
-  
-  public abstract void setSuccessfulAttempt(TezTaskAttemptID taskAttempt);
-  public abstract void addAllDiagnostics(List<String> diagnostics);
-  public abstract void addDiagnostics(String diagnostics);
-  public abstract void removeDiagnostics(int index);
-  public abstract void clearDiagnostics();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 16c063a..f6b3faa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -408,9 +408,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   @Override
   public TaskReport getReport() {
-    // TODO TEZPB This is broken. Records will not work without the PBImpl, which
-    // is in a different package.
-    TaskReport report = Records.newRecord(TaskReport.class);
+    TaskReport report = new TaskReportImpl();
     readLock.lock();
     try {
       report.setTaskId(taskId);
@@ -418,27 +416,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       report.setFinishTime(getFinishTime());
       report.setTaskState(getState());
       report.setProgress(getProgress());
-
-      for (TaskAttempt attempt : attempts.values()) {
-        if (TaskAttemptState.RUNNING.equals(attempt.getState())) {
-          report.addRunningAttempt(attempt.getID());
-        }
-      }
-
-      report.setSuccessfulAttempt(successfulAttempt);
-
-      for (TaskAttempt att : attempts.values()) {
-        String prefix = "AttemptID:" + att.getID() + " Info:";
-        for (CharSequence cs : att.getDiagnostics()) {
-          report.addDiagnostics(prefix + cs);
-
-        }
-      }
-
-      // Add a copy of counters as the last step so that their lifetime on heap
-      // is as small as possible.
-      report.setCounters(getCounters());
-
       return report;
     } finally {
       readLock.unlock();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskReportImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskReportImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskReportImpl.java
new file mode 100644
index 0000000..1a2cee3
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskReportImpl.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.oldrecords.TaskReport;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.records.TezTaskID;
+
+import java.util.List;
+
+public class TaskReportImpl implements TaskReport {
+
+  private TezTaskID taskID;
+  private TaskState taskState;
+  private float progress;
+  private long startTime;
+  private long finishTime;
+
+  public TaskReportImpl() {
+  }
+
+  public TaskReportImpl(TezTaskID taskID, TaskState taskState,
+      float progress, long startTime, long finishTime) {
+    this.taskID = taskID;
+    this.taskState = taskState;
+    this.progress = progress;
+    this.startTime = startTime;
+    this.finishTime = finishTime;
+  }
+
+  @Override
+  public TezTaskID getTaskId() {
+    return taskID;
+  }
+
+  @Override
+  public TaskState getTaskState() {
+    return taskState;
+  }
+
+  @Override
+  public float getProgress() {
+    return progress;
+  }
+
+  @Override
+  public long getStartTime() {
+    return startTime;
+  }
+
+  @Override
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  @Override
+  public void setTaskId(TezTaskID taskId) {
+    this.taskID = taskId;
+  }
+
+  @Override
+  public void setTaskState(TaskState taskState) {
+    this.taskState = taskState;
+  }
+
+  @Override
+  public void setProgress(float progress) {
+    this.progress = progress;
+  }
+
+  @Override
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  @Override
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 7b3b6b4..d3294ea 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -536,6 +536,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   // Recovery related flags
   boolean recoveryInitEventSeen = false;
   boolean recoveryStartEventSeen = false;
+  private VertexStats vertexStats = null;
 
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
       String vertexName, Configuration conf, EventHandler eventHandler,
@@ -719,6 +720,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  public VertexStats getVertexStats() {
+
+    readLock.lock();
+    try {
+      VertexState state = getInternalState();
+      if (state == VertexState.ERROR || state == VertexState.FAILED
+          || state == VertexState.KILLED || state == VertexState.SUCCEEDED) {
+        this.mayBeConstructFinalFullCounters();
+        return this.vertexStats;
+      }
+
+      VertexStats stats = new VertexStats();
+      return updateVertexStats(stats, tasks.values());
+
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+
   public static TezCounters incrTaskCounters(
       TezCounters counters, Collection<Task> tasks) {
     for (Task task : tasks) {
@@ -727,6 +748,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return counters;
   }
 
+  public static VertexStats updateVertexStats(
+      VertexStats stats, Collection<Task> tasks) {
+    for (Task task : tasks) {
+      stats.updateStats(task.getReport());
+    }
+    return stats;
+  }
+
+
   @Override
   public List<String> getDiagnostics() {
     readLock.lock();
@@ -1235,7 +1265,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
         vertexName, initTimeRequested, initedTime, startTimeRequested,
         startedTime, finishTime, VertexState.SUCCEEDED, "",
-        getAllCounters());
+        getAllCounters(), getVertexStats());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGId(), finishEvt));
   }
@@ -1244,7 +1274,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
         vertexName, initTimeRequested, initedTime, startTimeRequested,
         startedTime, clock.getTime(), state, StringUtils.join(LINE_SEPARATOR,
-            getDiagnostics()), getAllCounters());
+            getDiagnostics()), getAllCounters(), getVertexStats());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGId(), finishEvt));
   }
@@ -2476,7 +2506,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   @Private
   public void constructFinalFullcounters() {
     this.fullCounters = new TezCounters();
+    this.vertexStats = new VertexStats();
+
     for (Task t : this.tasks.values()) {
+      vertexStats.updateStats(t.getReport());
       TezCounters counters = t.getCounters();
       this.fullCounters.incrAllCounters(counters);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexStats.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexStats.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexStats.java
new file mode 100644
index 0000000..86d1797
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexStats.java
@@ -0,0 +1,169 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.tez.dag.api.oldrecords.TaskReport;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.records.TezTaskID;
+
+public class VertexStats {
+
+  long firstTaskStartTime = -1;
+  Set<TezTaskID> firstTasksToStart = new HashSet<TezTaskID>();
+  long lastTaskFinishTime = -1;
+  Set<TezTaskID> lastTasksToFinish = new HashSet<TezTaskID>();
+
+  long minTaskDuration = -1;
+  long maxTaskDuration = -1;
+  double avgTaskDuration = -1;
+  long numSuccessfulTasks = 0;
+
+  Set<TezTaskID> shortestDurationTasks = new HashSet<TezTaskID>();
+  Set<TezTaskID> longestDurationTasks = new HashSet<TezTaskID>();
+
+  public long getFirstTaskStartTime() {
+    return firstTaskStartTime;
+  }
+
+  public Set<TezTaskID> getFirstTaskToStart() {
+    return Collections.unmodifiableSet(firstTasksToStart);
+  }
+
+  public long getLastTaskFinishTime() {
+    return lastTaskFinishTime;
+  }
+
+  public Set<TezTaskID> getLastTaskToFinish() {
+    return Collections.unmodifiableSet(lastTasksToFinish);
+  }
+
+  public long getMinTaskDuration() {
+    return minTaskDuration;
+  }
+
+  public long getMaxTaskDuration() {
+    return maxTaskDuration;
+  }
+
+  public double getAvgTaskDuration() {
+    return avgTaskDuration;
+  }
+
+  public Set<TezTaskID> getShortestDurationTask() {
+    return Collections.unmodifiableSet(shortestDurationTasks);
+  }
+
+  public Set<TezTaskID> getLongestDurationTask() {
+    return Collections.unmodifiableSet(longestDurationTasks);
+  }
+
+  void updateStats(TaskReport taskReport) {
+    if (firstTaskStartTime == -1
+      || firstTaskStartTime >= taskReport.getStartTime()) {
+      if (firstTaskStartTime != taskReport.getStartTime()) {
+        firstTasksToStart.clear();
+      }
+      firstTasksToStart.add(taskReport.getTaskId());
+      firstTaskStartTime = taskReport.getStartTime();
+    }
+    if ((taskReport.getFinishTime() > 0) &&
+        (lastTaskFinishTime == -1
+        || lastTaskFinishTime <= taskReport.getFinishTime())) {
+      if (lastTaskFinishTime != taskReport.getFinishTime()) {
+        lastTasksToFinish.clear();
+      }
+      lastTasksToFinish.add(taskReport.getTaskId());
+      lastTaskFinishTime = taskReport.getFinishTime();
+    }
+
+    if (!taskReport.getTaskState().equals(
+        TaskState.SUCCEEDED)) {
+      // ignore non-successful tasks when calculating durations
+      return;
+    }
+
+    long taskDuration = taskReport.getFinishTime() -
+        taskReport.getStartTime();
+    if (taskDuration < 0) {
+      return;
+    }
+
+    ++numSuccessfulTasks;
+    if (minTaskDuration == -1
+      || minTaskDuration >= taskDuration) {
+      if (minTaskDuration != taskDuration) {
+        shortestDurationTasks.clear();
+      }
+      minTaskDuration = taskDuration;
+      shortestDurationTasks.add(taskReport.getTaskId());
+    }
+    if (maxTaskDuration == -1
+      || maxTaskDuration <= taskDuration) {
+      if (maxTaskDuration != taskDuration) {
+        longestDurationTasks.clear();
+      }
+      maxTaskDuration = taskDuration;
+      longestDurationTasks.add(taskReport.getTaskId());
+    }
+
+    avgTaskDuration = ((avgTaskDuration * (numSuccessfulTasks-1)) + taskDuration)
+        /numSuccessfulTasks;
+  }
+
+  private void appendTaskIdSet(StringBuilder sb,
+      Set<TezTaskID> taskIDs) {
+    sb.append("[ ");
+    boolean first = true;
+    if (taskIDs != null) {
+      for (TezTaskID tezTaskID : taskIDs) {
+        if (!first) {
+          sb.append(",");
+        } else {
+          first = false;
+        }
+        sb.append(tezTaskID.toString());
+      }
+    }
+    sb.append(" ]");
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("firstTaskStartTime=").append(firstTaskStartTime)
+       .append(", firstTasksToStart=");
+    appendTaskIdSet(sb, firstTasksToStart);
+    sb.append(", lastTaskFinishTime=").append(lastTaskFinishTime)
+       .append(", lastTasksToFinish=");
+    appendTaskIdSet(sb, lastTasksToFinish);
+    sb.append(", minTaskDuration=").append(minTaskDuration)
+       .append(", maxTaskDuration=").append(maxTaskDuration)
+       .append(", avgTaskDuration=").append(avgTaskDuration)
+       .append(", numSuccessfulTasks=").append(numSuccessfulTasks)
+       .append(", shortestDurationTasks=");
+    appendTaskIdSet(sb, shortestDurationTasks);
+    sb.append(", longestDurationTasks=");
+    appendTaskIdSet(sb, longestDurationTasks);
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 6f07c91..8321a38 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -25,6 +25,7 @@ import java.io.OutputStream;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
@@ -53,11 +54,13 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
   private String diagnostics;
   private TezCounters tezCounters;
   private boolean fromSummary = false;
+  private VertexStats vertexStats;
 
   public VertexFinishedEvent(TezVertexID vertexId,
-      String vertexName, long initRequestedTime, long initedTime, long startRequestedTime, long startedTime, long finishTime,
-      VertexState state, String diagnostics,
-      TezCounters counters) {
+      String vertexName, long initRequestedTime, long initedTime,
+      long startRequestedTime, long startedTime, long finishTime,
+      VertexState state, String diagnostics, TezCounters counters,
+      VertexStats vertexStats) {
     this.vertexName = vertexName;
     this.vertexID = vertexId;
     this.initRequestedTime = initRequestedTime;
@@ -67,7 +70,8 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
     this.finishTime = finishTime;
     this.state = state;
     this.diagnostics = diagnostics;
-    tezCounters = counters;
+    this.tezCounters = counters;
+    this.vertexStats = vertexStats;
   }
 
   public VertexFinishedEvent() {
@@ -169,7 +173,9 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
         + ", diagnostics=" + diagnostics
         + ", counters=" + ( tezCounters == null ? "null" :
           tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+            .replaceAll("\\n", ", ").replaceAll("\\s+", " "))
+        + ", vertexStats=" + (vertexStats == null ? "null"
+              : vertexStats.toString());
   }
 
   public TezVertexID getVertexID() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
index 8f680ce..4050df3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
@@ -57,6 +57,7 @@ public class ATSConstants {
   public static final String STATUS = "status";
   public static final String DIAGNOSTICS = "diagnostics";
   public static final String COUNTERS = "counters";
+  public static final String STATS = "stats";
   public static final String NUM_TASKS = "numTasks";
   public static final String PROCESSOR_CLASS_NAME = "processorClassName";
   public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 782fff1..b8ea673 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -18,10 +18,14 @@
 
 package org.apache.tez.dag.history.utils;
 
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
@@ -149,4 +153,52 @@ public class DAGUtils {
     return jsonObject;
   }
 
+  public static Map<String,Object> convertVertexStatsToATSMap(
+      VertexStats vertexStats) {
+    Map<String,Object> vertexStatsMap = new LinkedHashMap<String, Object>();
+    if (vertexStats == null) {
+      return vertexStatsMap;
+    }
+
+    final String FIRST_TASK_START_TIME_KEY = "firstTaskStartTime";
+    final String FIRST_TASK_TO_START_KEY = "firstTaskToStart";
+    final String LAST_TASK_FINISH_TIME_KEY = "lastTaskFinishTime";
+    final String LAST_TASK_TO_FINISH_KEY = "lastTaskToFinish";
+
+    final String MIN_TASK_DURATION = "minTaskDuration";
+    final String MAX_TASK_DURATION = "maxTaskDuration";
+    final String AVG_TASK_DURATION = "avgTaskDuration";
+
+    final String SHORTEST_DURATION_TASK = "shortestDurationTask";
+    final String LONGEST_DURATION_TASK = "longestDurationTask";
+
+    vertexStatsMap.put(FIRST_TASK_START_TIME_KEY, vertexStats.getFirstTaskStartTime());
+    if (vertexStats.getFirstTaskToStart() != null) {
+      vertexStatsMap.put(FIRST_TASK_TO_START_KEY,
+          vertexStats.getFirstTaskToStart().toString());
+    }
+    vertexStatsMap.put(LAST_TASK_FINISH_TIME_KEY, vertexStats.getLastTaskFinishTime());
+    if (vertexStats.getLastTaskToFinish() != null) {
+      vertexStatsMap.put(LAST_TASK_TO_FINISH_KEY,
+          vertexStats.getLastTaskToFinish().toString());
+    }
+
+    vertexStatsMap.put(MIN_TASK_DURATION, vertexStats.getMinTaskDuration());
+    vertexStatsMap.put(MAX_TASK_DURATION, vertexStats.getMaxTaskDuration());
+    vertexStatsMap.put(AVG_TASK_DURATION, vertexStats.getAvgTaskDuration());
+
+    if (vertexStats.getShortestDurationTask() != null) {
+      vertexStatsMap.put(SHORTEST_DURATION_TASK,
+          vertexStats.getShortestDurationTask().toString());
+    }
+    if (vertexStats.getLongestDurationTask() != null) {
+      vertexStatsMap.put(LONGEST_DURATION_TASK,
+          vertexStats.getLongestDurationTask().toString());
+    }
+
+    return vertexStatsMap;
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexStats.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexStats.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexStats.java
new file mode 100644
index 0000000..47b6366
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexStats.java
@@ -0,0 +1,159 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestVertexStats {
+
+  @Test
+  public void testBasicStats() {
+    VertexStats stats = new VertexStats();
+    Assert.assertEquals(-1, stats.firstTaskStartTime);
+    Assert.assertEquals(-1, stats.lastTaskFinishTime);
+    Assert.assertEquals(-1, stats.minTaskDuration);
+    Assert.assertEquals(-1, stats.maxTaskDuration);
+    Assert.assertTrue(-1 == stats.avgTaskDuration);
+    Assert.assertEquals(0, stats.firstTasksToStart.size());
+    Assert.assertEquals(0, stats.lastTasksToFinish.size());
+    Assert.assertEquals(0, stats.shortestDurationTasks.size());
+    Assert.assertEquals(0, stats.longestDurationTasks.size());
+
+    TezVertexID tezVertexID = TezVertexID.getInstance(
+        TezDAGID.getInstance(
+            ApplicationId.newInstance(100l, 1), 1), 1);
+    TezTaskID tezTaskID1 = TezTaskID.getInstance(tezVertexID, 1);
+    TezTaskID tezTaskID2 = TezTaskID.getInstance(tezVertexID, 2);
+    TezTaskID tezTaskID3 = TezTaskID.getInstance(tezVertexID, 3);
+    TezTaskID tezTaskID4 = TezTaskID.getInstance(tezVertexID, 4);
+    TezTaskID tezTaskID5 = TezTaskID.getInstance(tezVertexID, 5);
+    TezTaskID tezTaskID6 = TezTaskID.getInstance(tezVertexID, 6);
+
+    stats.updateStats(new TaskReportImpl(tezTaskID1,
+        TaskState.SUCCEEDED, 1, 100, 200));
+    Assert.assertEquals(100, stats.firstTaskStartTime);
+    Assert.assertEquals(200, stats.lastTaskFinishTime);
+    Assert.assertEquals(100, stats.minTaskDuration);
+    Assert.assertEquals(100, stats.maxTaskDuration);
+    Assert.assertTrue(100 == stats.avgTaskDuration);
+    Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID1));
+    Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID1));
+    Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID1));
+    Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID1));
+    Assert.assertEquals(1, stats.firstTasksToStart.size());
+    Assert.assertEquals(1, stats.lastTasksToFinish.size());
+    Assert.assertEquals(1, stats.shortestDurationTasks.size());
+    Assert.assertEquals(1, stats.longestDurationTasks.size());
+
+    stats.updateStats(new TaskReportImpl(tezTaskID2,
+        TaskState.FAILED, 1, 150, 300));
+    Assert.assertEquals(100, stats.firstTaskStartTime);
+    Assert.assertEquals(300, stats.lastTaskFinishTime);
+    Assert.assertEquals(100, stats.minTaskDuration);
+    Assert.assertEquals(100, stats.maxTaskDuration);
+    Assert.assertTrue(100 == stats.avgTaskDuration);
+    Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID1));
+    Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID2));
+    Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID1));
+    Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID1));
+    Assert.assertEquals(1, stats.firstTasksToStart.size());
+    Assert.assertEquals(1, stats.lastTasksToFinish.size());
+    Assert.assertEquals(1, stats.shortestDurationTasks.size());
+    Assert.assertEquals(1, stats.longestDurationTasks.size());
+
+    stats.updateStats(new TaskReportImpl(tezTaskID3,
+        TaskState.RUNNING, 1, 50, 550));
+    Assert.assertEquals(50, stats.firstTaskStartTime);
+    Assert.assertEquals(550, stats.lastTaskFinishTime);
+    Assert.assertEquals(100, stats.minTaskDuration);
+    Assert.assertEquals(100, stats.maxTaskDuration);
+    Assert.assertTrue(100 == stats.avgTaskDuration);
+    Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID1));
+    Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID1));
+    Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID3));
+    Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID3));
+    Assert.assertEquals(1, stats.firstTasksToStart.size());
+    Assert.assertEquals(1, stats.lastTasksToFinish.size());
+    Assert.assertEquals(1, stats.shortestDurationTasks.size());
+    Assert.assertEquals(1, stats.longestDurationTasks.size());
+
+    stats.updateStats(new TaskReportImpl(tezTaskID4,
+        TaskState.SUCCEEDED, 1, 50, 450));
+    Assert.assertEquals(50, stats.firstTaskStartTime);
+    Assert.assertEquals(550, stats.lastTaskFinishTime);
+    Assert.assertEquals(100, stats.minTaskDuration);
+    Assert.assertEquals(400, stats.maxTaskDuration);
+    Assert.assertTrue(250 == stats.avgTaskDuration);
+    Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID4));
+    Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID3));
+    Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID3));
+    Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID1));
+    Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID4));
+    Assert.assertEquals(2, stats.firstTasksToStart.size());
+    Assert.assertEquals(1, stats.lastTasksToFinish.size());
+    Assert.assertEquals(1, stats.shortestDurationTasks.size());
+    Assert.assertEquals(1, stats.longestDurationTasks.size());
+
+    stats.updateStats(new TaskReportImpl(tezTaskID5,
+        TaskState.SUCCEEDED, 1, 50, 450));
+    Assert.assertEquals(50, stats.firstTaskStartTime);
+    Assert.assertEquals(550, stats.lastTaskFinishTime);
+    Assert.assertEquals(100, stats.minTaskDuration);
+    Assert.assertEquals(400, stats.maxTaskDuration);
+    Assert.assertTrue(300 == stats.avgTaskDuration);
+    Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID5));
+    Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID4));
+    Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID3));
+    Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID3));
+    Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID1));
+    Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID4));
+    Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID5));
+    Assert.assertEquals(3, stats.firstTasksToStart.size());
+    Assert.assertEquals(1, stats.lastTasksToFinish.size());
+    Assert.assertEquals(1, stats.shortestDurationTasks.size());
+    Assert.assertEquals(2, stats.longestDurationTasks.size());
+
+    stats.updateStats(new TaskReportImpl(tezTaskID6,
+        TaskState.SUCCEEDED, 1, 450, 550));
+    Assert.assertEquals(50, stats.firstTaskStartTime);
+    Assert.assertEquals(550, stats.lastTaskFinishTime);
+    Assert.assertEquals(100, stats.minTaskDuration);
+    Assert.assertEquals(400, stats.maxTaskDuration);
+    Assert.assertTrue(250 == stats.avgTaskDuration);
+    Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID5));
+    Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID4));
+    Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID3));
+    Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID3));
+    Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID6));
+    Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID1));
+    Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID6));
+    Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID4));
+    Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID5));
+    Assert.assertEquals(3, stats.firstTasksToStart.size());
+    Assert.assertEquals(2, stats.lastTasksToFinish.size());
+    Assert.assertEquals(2, stats.shortestDurationTasks.size());
+    Assert.assertEquals(2, stats.longestDurationTasks.size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 2cf3eaf..b9cae70 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -33,6 +33,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
@@ -312,7 +313,7 @@ public class TestHistoryEventsProtoConversion {
           new VertexFinishedEvent(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
               "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
-              null, null);
+              null, null, null);
       VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
@@ -328,7 +329,7 @@ public class TestHistoryEventsProtoConversion {
           new VertexFinishedEvent(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
               "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
-              "diagnose", new TezCounters());
+              "diagnose", new TezCounters(), new VertexStats());
       VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());