You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2015/12/07 23:42:57 UTC

[4/5] tez git commit: TEZ-2973. Backport Analyzers to branch-0.7

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
new file mode 100644
index 0000000..5fb760c
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
@@ -0,0 +1,586 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import org.apache.commons.collections.BidiMap;
+import org.apache.commons.collections.bidimap.DualHashBidiMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.tez.client.CallerContext;
+import org.apache.tez.dag.api.event.VertexState;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class DagInfo extends BaseInfo {
+
+  private static final Log LOG = LogFactory.getLog(DagInfo.class);
+
+  //Fields populated via JSON
+  private final String name;
+  private final long startTime;
+  private final long endTime;
+  private final long submitTime;
+  private final int failedTasks;
+  private final String dagId;
+  private final int numVertices;
+  private final String status;
+  private final String diagnostics;
+  private VersionInfo versionInfo;
+  private CallerContext callerContext;
+
+  //VertexID --> VertexName & vice versa
+  private final BidiMap vertexNameIDMapping;
+
+  //edgeId to EdgeInfo mapping
+  private final Map<Integer, EdgeInfo> edgeInfoMap;
+
+  //Only for internal parsing (vertexname mapping)
+  private Map<String, BasicVertexInfo> basicVertexInfoMap;
+
+  //VertexName --> VertexInfo
+  private Map<String, VertexInfo> vertexNameMap;
+
+  private Multimap<Container, TaskAttemptInfo> containerMapping;
+
+  DagInfo(JSONObject jsonObject) throws JSONException {
+    super(jsonObject);
+
+    vertexNameMap = Maps.newHashMap();
+    vertexNameIDMapping = new DualHashBidiMap();
+    edgeInfoMap = Maps.newHashMap();
+    basicVertexInfoMap = Maps.newHashMap();
+    containerMapping = LinkedHashMultimap.create();
+
+    Preconditions.checkArgument(jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
+        (Constants.TEZ_DAG_ID));
+
+    dagId = StringInterner.weakIntern(jsonObject.getString(Constants.ENTITY));
+
+    //Parse additional Info
+    JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+    startTime = otherInfoNode.optLong(Constants.START_TIME);
+    endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+    //TODO: Not getting populated correctly for lots of jobs.  Verify
+    submitTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME);
+    diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
+    failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS);
+    JSONObject dagPlan = otherInfoNode.optJSONObject(Constants.DAG_PLAN);
+    name = StringInterner.weakIntern((dagPlan != null) ? (dagPlan.optString(Constants.DAG_NAME)) : null);
+    if (dagPlan != null) {
+      JSONArray vertices = dagPlan.optJSONArray(Constants.VERTICES);
+      if (vertices != null) {
+        numVertices = vertices.length();
+      } else {
+        numVertices = 0;
+      }
+      parseDAGPlan(dagPlan);
+    } else {
+      numVertices = 0;
+    }
+    status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS));
+
+    //parse name id mapping
+    JSONObject vertexIDMappingJson = otherInfoNode.optJSONObject(Constants.VERTEX_NAME_ID_MAPPING);
+    if (vertexIDMappingJson != null) {
+      //get vertex name
+      for (Map.Entry<String, BasicVertexInfo> entry : basicVertexInfoMap.entrySet()) {
+        String vertexId = vertexIDMappingJson.optString(entry.getKey());
+        //vertexName --> vertexId
+        vertexNameIDMapping.put(entry.getKey(), vertexId);
+      }
+    }
+  }
+
+  public static DagInfo create(JSONObject jsonObject) throws JSONException {
+    DagInfo dagInfo = new DagInfo(jsonObject);
+    return dagInfo;
+  }
+
+  private void parseDAGPlan(JSONObject dagPlan) throws JSONException {
+    int version = dagPlan.optInt(Constants.VERSION, 1);
+    parseEdges(dagPlan.optJSONArray(Constants.EDGES));
+
+    JSONArray verticesInfo = dagPlan.optJSONArray(Constants.VERTICES);
+    parseBasicVertexInfo(verticesInfo);
+
+    if (version > 1) {
+      parseDAGContext(dagPlan.optJSONObject(Constants.DAG_CONTEXT));
+    }
+  }
+
+  private void parseDAGContext(JSONObject callerContextInfo) {
+    if (callerContextInfo == null) {
+      LOG.info("No DAG Caller Context available");
+      return;
+    }
+    String context = callerContextInfo.optString(Constants.CONTEXT);
+    String callerId = callerContextInfo.optString(Constants.CALLER_ID);
+    String callerType = callerContextInfo.optString(Constants.CALLER_TYPE);
+    String description = callerContextInfo.optString(Constants.DESCRIPTION);
+
+    this.callerContext = CallerContext.create(context, description);
+    if (callerId != null && !callerId.isEmpty() && callerType != null && !callerType.isEmpty()) {
+      this.callerContext.setCallerIdAndType(callerId, callerType);
+    } else {
+      LOG.info("No DAG Caller Context Id and Type available");
+    }
+
+  }
+
+  private void parseBasicVertexInfo(JSONArray verticesInfo) throws JSONException {
+    if (verticesInfo == null) {
+      LOG.info("No vertices available.");
+      return;
+    }
+
+    //Parse basic information available in DAG for vertex and edges
+    for (int i = 0; i < verticesInfo.length(); i++) {
+      BasicVertexInfo basicVertexInfo = new BasicVertexInfo();
+
+      JSONObject vJson = verticesInfo.getJSONObject(i);
+      basicVertexInfo.vertexName =
+          vJson.optString(Constants.VERTEX_NAME);
+      JSONArray inEdges = vJson.optJSONArray(Constants.IN_EDGE_IDS);
+      if (inEdges != null) {
+        String[] inEdgeIds = new String[inEdges.length()];
+        for (int j = 0; j < inEdges.length(); j++) {
+          inEdgeIds[j] = inEdges.get(j).toString();
+        }
+        basicVertexInfo.inEdgeIds = inEdgeIds;
+      }
+
+      JSONArray outEdges = vJson.optJSONArray(Constants.OUT_EDGE_IDS);
+      if (outEdges != null) {
+        String[] outEdgeIds = new String[outEdges.length()];
+        for (int j = 0; j < outEdges.length(); j++) {
+          outEdgeIds[j] = outEdges.get(j).toString();
+        }
+        basicVertexInfo.outEdgeIds = outEdgeIds;
+      }
+
+      JSONArray addInputsJson =
+          vJson.optJSONArray(Constants.ADDITIONAL_INPUTS);
+      basicVertexInfo.additionalInputs = parseAdditionalDetailsForVertex(addInputsJson);
+
+      JSONArray addOutputsJson =
+          vJson.optJSONArray(Constants.ADDITIONAL_OUTPUTS);
+      basicVertexInfo.additionalOutputs = parseAdditionalDetailsForVertex(addOutputsJson);
+
+      basicVertexInfoMap.put(basicVertexInfo.vertexName, basicVertexInfo);
+    }
+  }
+
+  /**
+   * get additional details available for every vertex in the dag
+   *
+   * @param jsonArray
+   * @return AdditionalInputOutputDetails[]
+   * @throws JSONException
+   */
+  private AdditionalInputOutputDetails[] parseAdditionalDetailsForVertex(JSONArray jsonArray) throws
+      JSONException {
+    if (jsonArray != null) {
+      AdditionalInputOutputDetails[]
+          additionalInputOutputDetails = new AdditionalInputOutputDetails[jsonArray.length()];
+      for (int j = 0; j < jsonArray.length(); j++) {
+        String name = jsonArray.getJSONObject(j).optString(
+            Constants.NAME);
+        String clazz = jsonArray.getJSONObject(j).optString(
+            Constants.CLASS);
+        String initializer =
+            jsonArray.getJSONObject(j).optString(Constants.INITIALIZER);
+        String userPayloadText = jsonArray.getJSONObject(j).optString(
+            Constants.USER_PAYLOAD_TEXT);
+
+        additionalInputOutputDetails[j] =
+            new AdditionalInputOutputDetails(name, clazz, initializer, userPayloadText);
+
+      }
+      return additionalInputOutputDetails;
+    }
+    return null;
+  }
+
+  /**
+   * Parse edge details in the DAG
+   *
+   * @param edgesArray
+   *
+   * @throws JSONException
+   */
+  private void parseEdges(JSONArray edgesArray) throws JSONException {
+    if (edgesArray == null) {
+      return;
+    }
+    for (int i = 0; i < edgesArray.length(); i++) {
+      JSONObject edge = edgesArray.getJSONObject(i);
+      Integer edgeId = edge.optInt(Constants.EDGE_ID);
+      String inputVertexName =
+          edge.optString(Constants.INPUT_VERTEX_NAME);
+      String outputVertexName =
+          edge.optString(Constants.OUTPUT_VERTEX_NAME);
+      String dataMovementType =
+          edge.optString(Constants.DATA_MOVEMENT_TYPE);
+      String edgeSourceClass =
+          edge.optString(Constants.EDGE_SOURCE_CLASS);
+      String edgeDestinationClass =
+          edge.optString(Constants.EDGE_DESTINATION_CLASS);
+      String inputUserPayloadAsText =
+          edge.optString(Constants.INPUT_PAYLOAD_TEXT);
+      String outputUserPayloadAsText =
+          edge.optString(Constants.OUTPUT_PAYLOAD_TEXT);
+      EdgeInfo edgeInfo = new EdgeInfo(inputVertexName, outputVertexName,
+          dataMovementType, edgeSourceClass, edgeDestinationClass, inputUserPayloadAsText,
+          outputUserPayloadAsText);
+      edgeInfoMap.put(edgeId, edgeInfo);
+    }
+  }
+
+  static class BasicVertexInfo {
+    String vertexName;
+    String[] inEdgeIds;
+    String[] outEdgeIds;
+    AdditionalInputOutputDetails[] additionalInputs;
+    AdditionalInputOutputDetails[] additionalOutputs;
+  }
+
+  void addVertexInfo(VertexInfo vertexInfo) {
+    BasicVertexInfo basicVertexInfo = basicVertexInfoMap.get(vertexInfo.getVertexName());
+
+    Preconditions.checkArgument(basicVertexInfo != null,
+        "VerteName " + vertexInfo.getVertexName()
+            + " not present in DAG's vertices " + basicVertexInfoMap.entrySet());
+
+    //populate additional information in VertexInfo
+    if (basicVertexInfo.additionalInputs != null) {
+      vertexInfo.setAdditionalInputInfoList(Arrays.asList(basicVertexInfo.additionalInputs));
+    }
+    if (basicVertexInfo.additionalOutputs != null) {
+      vertexInfo.setAdditionalOutputInfoList(Arrays.asList(basicVertexInfo.additionalOutputs));
+    }
+
+    //Populate edge information in vertex
+    if (basicVertexInfo.inEdgeIds != null) {
+      for (String edge : basicVertexInfo.inEdgeIds) {
+        EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge));
+        Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG");
+        vertexInfo.addInEdge(edgeInfo);
+      }
+    }
+
+    if (basicVertexInfo.outEdgeIds != null) {
+      for (String edge : basicVertexInfo.outEdgeIds) {
+        EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge));
+        Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG");
+        vertexInfo.addOutEdge(edgeInfo);
+      }
+    }
+
+    vertexNameMap.put(vertexInfo.getVertexName(), vertexInfo);
+  }
+
+  void setVersionInfo(VersionInfo versionInfo) {
+    this.versionInfo = versionInfo;
+  }
+
+  void addContainerMapping(Container container, TaskAttemptInfo taskAttemptInfo) {
+    this.containerMapping.put(container, taskAttemptInfo);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("dagID=").append(getDagId()).append(", ");
+    sb.append("dagName=").append(getName()).append(", ");
+    sb.append("status=").append(getStatus()).append(", ");
+    sb.append("startTime=").append(getStartTimeInterval()).append(", ");
+    sb.append("submitTime=").append(getSubmitTime()).append(", ");
+    sb.append("endTime=").append(getFinishTimeInterval()).append(", ");
+    sb.append("timeTaken=").append(getTimeTaken()).append(", ");
+    sb.append("diagnostics=").append(getDiagnostics()).append(", ");
+    sb.append("vertexNameIDMapping=").append(getVertexNameIDMapping()).append(", ");
+    sb.append("failedTasks=").append(getFailedTaskCount()).append(", ");
+    sb.append("events=").append(getEvents()).append(", ");
+    sb.append("status=").append(getStatus());
+    sb.append("]");
+    return sb.toString();
+  }
+
+  public Multimap<Container, TaskAttemptInfo> getContainerMapping() {
+    return Multimaps.unmodifiableMultimap(containerMapping);
+  }
+
+  public final VersionInfo getVersionInfo() {
+    return versionInfo;
+  }
+
+  public final CallerContext getCallerContext() {
+    return callerContext;
+  }
+
+  public final String getName() {
+    return name;
+  }
+
+  public final Collection<EdgeInfo> getEdges() {
+    return Collections.unmodifiableCollection(edgeInfoMap.values());
+  }
+
+  public final long getSubmitTime() {
+    return submitTime;
+  }
+
+  public final long getStartTime() {
+    return startTime;
+  }
+
+  public final long getFinishTime() {
+    return endTime;
+  }
+
+  /**
+   * Reference start time for the DAG. Vertex, Task, TaskAttempt would map on to this.
+   * If absolute start time is needed, call getAbsStartTime().
+   *
+   * @return starting time w.r.t to dag
+   */
+  public final long getStartTimeInterval() {
+    return 0;
+  }
+
+  @Override
+  public final long getFinishTimeInterval() {
+    long dagEndTime = (endTime - startTime);
+    if (dagEndTime < 0) {
+      //probably dag is not complete or failed in middle. get the last task attempt time
+      for (VertexInfo vertexInfo : getVertices()) {
+        dagEndTime = (vertexInfo.getFinishTimeInterval() > dagEndTime) ? vertexInfo.getFinishTimeInterval() : dagEndTime;
+      }
+    }
+    return dagEndTime;
+  }
+
+  public final long getTimeTaken() {
+    return getFinishTimeInterval();
+  }
+
+  public final String getStatus() {
+    return status;
+  }
+
+  /**
+   * Get vertexInfo for a given vertexid
+   *
+   * @param vertexId
+   * @return VertexInfo
+   */
+  public VertexInfo getVertexFromId(String vertexId) {
+    return vertexNameMap.get(vertexNameIDMapping.getKey(vertexId));
+  }
+
+  /**
+   * Get vertexInfo for a given vertex name
+   *
+   * @param vertexName
+   * @return VertexInfo
+   */
+  public final VertexInfo getVertex(String vertexName) {
+    return vertexNameMap.get(vertexName);
+  }
+
+  public final String getDiagnostics() {
+    return diagnostics;
+  }
+
+  /**
+   * Get all vertices
+   *
+   * @return List<VertexInfo>
+   */
+  public final List<VertexInfo> getVertices() {
+    List<VertexInfo> vertices = Lists.newLinkedList(vertexNameMap.values());
+    Collections.sort(vertices, new Comparator<VertexInfo>() {
+
+      @Override public int compare(VertexInfo o1, VertexInfo o2) {
+        return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 :
+            ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ?
+                0 : 1);
+      }
+    });
+    return Collections.unmodifiableList(vertices);
+  }
+
+  /**
+   * Get list of failed vertices
+   *
+   * @return List<VertexInfo>
+   */
+  public final List<VertexInfo> getFailedVertices() {
+    return getVertices(VertexState.FAILED);
+  }
+
+  /**
+   * Get list of killed vertices
+   *
+   * @return List<VertexInfo>
+   */
+  public final List<VertexInfo> getKilledVertices() {
+    return getVertices(VertexState.KILLED);
+  }
+
+  /**
+   * Get list of failed vertices
+   *
+   * @return List<VertexInfo>
+   */
+  public final List<VertexInfo> getSuccessfullVertices() {
+    return getVertices(VertexState.SUCCEEDED);
+  }
+
+  /**
+   * Get list of vertices belonging to a specific state
+   *
+   * @param state
+   * @return Collection<VertexInfo>
+   */
+  public final List<VertexInfo> getVertices(final VertexState state) {
+    return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList
+                    (vertexNameMap.values()), new Predicate<VertexInfo>() {
+                  @Override public boolean apply(VertexInfo input) {
+                    return input.getStatus() != null && input.getStatus().equals(state.toString());
+                  }
+                }
+            )
+        )
+    );
+  }
+
+  public final Map<String, VertexInfo> getVertexMapping() {
+    return Collections.unmodifiableMap(vertexNameMap);
+  }
+
+  private Ordering<VertexInfo> getVertexOrdering() {
+    return Ordering.from(new Comparator<VertexInfo>() {
+      @Override public int compare(VertexInfo o1, VertexInfo o2) {
+        return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 :
+            ((o1.getTimeTaken() == o2.getTimeTaken()) ?
+                0 : 1);
+      }
+    });
+  }
+
+  /**
+   * Get the slowest vertex in the DAG
+   *
+   * @return VertexInfo
+   */
+  public final VertexInfo getSlowestVertex() {
+    List<VertexInfo> vertexInfoList = getVertices();
+    if (vertexInfoList.size() == 0) {
+      return null;
+    }
+    return getVertexOrdering().max(vertexInfoList);
+  }
+
+  /**
+   * Get the slowest vertex in the DAG
+   *
+   * @return VertexInfo
+   */
+  public final VertexInfo getFastestVertex() {
+    List<VertexInfo> vertexInfoList = getVertices();
+    if (vertexInfoList.size() == 0) {
+      return null;
+    }
+    return getVertexOrdering().min(vertexInfoList);
+  }
+
+  /**
+   * Get node details for this DAG. Would be useful for analyzing node to tasks.
+   *
+   * @return Multimap<String, TaskAttemptInfo> taskAttempt details at every node
+   */
+  public final Multimap<String, TaskAttemptInfo> getNodeDetails() {
+    Multimap<String, TaskAttemptInfo> nodeDetails = LinkedListMultimap.create();
+    for (VertexInfo vertexInfo : getVertices()) {
+      Multimap<Container, TaskAttemptInfo> containerMapping = vertexInfo.getContainersMapping();
+      for (Map.Entry<Container, TaskAttemptInfo> entry : containerMapping.entries()) {
+        nodeDetails.put(entry.getKey().getHost(), entry.getValue());
+      }
+    }
+    return nodeDetails;
+  }
+
+  /**
+   * Get containers used for this DAG
+   *
+   * @return Multimap<Container, TaskAttemptInfo> task attempt details at every container
+   */
+  public final Multimap<Container, TaskAttemptInfo> getContainersToTaskAttemptMapping() {
+    List<VertexInfo> VertexInfoList = getVertices();
+    Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create();
+
+    for (VertexInfo vertexInfo : VertexInfoList) {
+      containerMapping.putAll(vertexInfo.getContainersMapping());
+    }
+    return Multimaps.unmodifiableMultimap(containerMapping);
+  }
+
+  public final Map getVertexNameIDMapping() {
+    return vertexNameIDMapping;
+  }
+
+  public final int getNumVertices() {
+    return numVertices;
+  }
+
+  public final String getDagId() {
+    return dagId;
+  }
+
+  public final int getFailedTaskCount() {
+    return failedTasks;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java
new file mode 100644
index 0000000..ab8e831
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class EdgeInfo {
+
+  private final String inputVertexName;
+  private final String outputVertexName;
+  private final String dataMovementType;
+  private final String edgeSourceClass;
+  private final String edgeDestinationClass;
+  private final String inputUserPayloadAsText;
+  private final String outputUserPayloadAsText;
+
+  private VertexInfo sourceVertex;
+  private VertexInfo destinationVertex;
+
+  public EdgeInfo(String inputVertexName, String outputVertexName, String dataMovementType,
+      String edgeSourceClass, String edgeDestinationClass, String inputUserPayloadAsText, String
+      outputUserPayloadAsText) {
+    this.inputVertexName = inputVertexName;
+    this.outputVertexName = outputVertexName;
+    this.dataMovementType = dataMovementType;
+    this.edgeSourceClass = edgeSourceClass;
+    this.edgeDestinationClass = edgeDestinationClass;
+    this.inputUserPayloadAsText = inputUserPayloadAsText;
+    this.outputUserPayloadAsText = outputUserPayloadAsText;
+  }
+
+  public final String getInputVertexName() {
+    return inputVertexName;
+  }
+
+  public final String getOutputVertexName() {
+    return outputVertexName;
+  }
+
+  public final String getDataMovementType() {
+    return dataMovementType;
+  }
+
+  public final String getEdgeSourceClass() {
+    return edgeSourceClass;
+  }
+
+  public final String getEdgeDestinationClass() {
+    return edgeDestinationClass;
+  }
+
+  public final String getInputUserPayloadAsText() {
+    return inputUserPayloadAsText;
+  }
+
+  public final String getOutputUserPayloadAsText() {
+    return outputUserPayloadAsText;
+  }
+
+  public final VertexInfo getSourceVertex() {
+    return sourceVertex;
+  }
+
+  public final void setSourceVertex(VertexInfo sourceVertex) {
+    this.sourceVertex = sourceVertex;
+  }
+
+  public final VertexInfo getDestinationVertex() {
+    return destinationVertex;
+  }
+
+  public final void setDestinationVertex(VertexInfo destinationVertex) {
+    this.destinationVertex = destinationVertex;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("inputVertexName=").append(inputVertexName).append(", ");
+    sb.append("outputVertexName=").append(outputVertexName).append(", ");
+    sb.append("dataMovementType=").append(dataMovementType).append(", ");
+    sb.append("edgeSourceClass=").append(edgeSourceClass).append(", ");
+    sb.append("edgeDestinationClass=").append(edgeDestinationClass).append(", ");
+    sb.append("inputUserPayloadAsText=").append(inputUserPayloadAsText).append(",");
+    sb.append("outputUserPayloadAsText=").append(outputUserPayloadAsText).append(", ");
+    sb.append("sourceVertex=").append(sourceVertex.getVertexName()).append(", ");
+    sb.append("destinationVertex=").append(destinationVertex.getVertexName()).append(", ");
+    sb.append("outputUserPayloadAsText=").append(outputUserPayloadAsText);
+    sb.append("]");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java
new file mode 100644
index 0000000..70310f3
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class Event {
+  private final String info;
+  private final String type;
+  private final long time;
+
+  private long refTime; //typically dag start time.
+
+  public Event(String info, String type, long time) {
+    this.time = time;
+    this.type = type;
+    this.info = info;
+  }
+
+  void setReferenceTime(long refTime) {
+    this.refTime = refTime;
+  }
+
+  public final String getInfo() {
+    return info;
+  }
+
+  public final String getType() {
+    return type;
+  }
+
+  public final long getAbsoluteTime() {
+    return time;
+  }
+
+  public final long getTime() {
+    return time - refTime;
+  }
+
+  @Override
+  public String toString() {
+    return "[info=" + info + ", type=" + type + ", time=" + time + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
new file mode 100644
index 0000000..d373513
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+
+import org.apache.hadoop.util.StringInterner;
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.history.parser.utils.Utils;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+
+@Public
+@Evolving
+public class TaskAttemptInfo extends BaseInfo {
+
+  private static final String SUCCEEDED = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name());
+
+  private final String taskAttemptId;
+  private final long startTime;
+  private final long endTime;
+  private final String diagnostics;
+
+  private final long creationTime;
+  private final long allocationTime;
+  private final String containerId;
+  private final String nodeId;
+  private final String status;
+  private final String logUrl;
+  private final String creationCausalTA;
+  private final String terminationCause;
+  private final long executionTimeInterval;
+  // this list is in time order - array list for easy walking
+  private final ArrayList<DataDependencyEvent> lastDataEvents = Lists.newArrayList();
+
+  private TaskInfo taskInfo;
+
+  private Container container;
+  
+  public static class DataDependencyEvent {
+    String taId;
+    long timestamp;
+    public DataDependencyEvent(String id, long time) {
+      taId = id;
+      timestamp = time;
+    }
+    public long getTimestamp() {
+      return timestamp;
+    }
+    public String getTaskAttemptId() {
+      return taId;
+    }
+  }
+
+  TaskAttemptInfo(JSONObject jsonObject) throws JSONException {
+    super(jsonObject);
+
+    Preconditions.checkArgument(
+        jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
+            (Constants.TEZ_TASK_ATTEMPT_ID));
+
+    taskAttemptId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY));
+
+    //Parse additional Info
+    final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+    startTime = otherInfoNode.optLong(Constants.START_TIME);
+    endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+    diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
+    creationTime = otherInfoNode.optLong(Constants.CREATION_TIME);
+    creationCausalTA = StringInterner.weakIntern(
+        otherInfoNode.optString(Constants.CREATION_CAUSAL_ATTEMPT));
+    allocationTime = otherInfoNode.optLong(Constants.ALLOCATION_TIME);
+    containerId = StringInterner.weakIntern(otherInfoNode.optString(Constants.CONTAINER_ID));
+    String id = otherInfoNode.optString(Constants.NODE_ID);
+    nodeId = StringInterner.weakIntern((id != null) ? (id.split(":")[0]) : "");
+    logUrl = otherInfoNode.optString(Constants.COMPLETED_LOGS_URL);
+
+    status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS));
+    container = new Container(containerId, nodeId);
+    if (otherInfoNode.has(Constants.LAST_DATA_EVENTS)) {
+      List<DataDependencyEvent> eventInfo = Utils.parseDataEventDependencyFromJSON(
+          otherInfoNode.optJSONObject(Constants.LAST_DATA_EVENTS));
+      long lastTime = 0;
+      for (DataDependencyEvent item : eventInfo) {
+        // check these are in time order
+        Preconditions.checkState(lastTime < item.getTimestamp());
+        lastTime = item.getTimestamp();
+        lastDataEvents.add(item);
+      }
+    }
+    terminationCause = StringInterner
+        .weakIntern(otherInfoNode.optString(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
+    executionTimeInterval = (endTime > startTime) ? (endTime - startTime) : 0;
+  }
+  
+  public static Ordering<TaskAttemptInfo> orderingOnAllocationTime() {
+    return Ordering.from(new Comparator<TaskAttemptInfo>() {
+      @Override
+      public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+        return (o1.getAllocationTime() < o2.getAllocationTime() ? -1
+            : o1.getAllocationTime() > o2.getAllocationTime() ? 1 : 0);
+      }
+    });
+  }
+
+  void setTaskInfo(TaskInfo taskInfo) {
+    Preconditions.checkArgument(taskInfo != null, "Provide valid taskInfo");
+    this.taskInfo = taskInfo;
+    taskInfo.addTaskAttemptInfo(this);
+  }
+
+  @Override
+  public final long getStartTimeInterval() {
+    return startTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
+  }
+
+  @Override
+  public final long getFinishTimeInterval() {
+    return endTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
+  }
+  
+  public final boolean isSucceeded() {
+    return status.equals(SUCCEEDED);
+  }
+  
+  public final List<DataDependencyEvent> getLastDataEvents() {
+    return lastDataEvents;
+  }
+  
+  public final long getExecutionTimeInterval() {
+    return executionTimeInterval;
+  }
+  
+  public final long getPostDataExecutionTimeInterval() {
+    if (getStartTime() > 0 && getFinishTime() > 0) {
+      // start time defaults to the actual start time
+      long postDataStartTime = startTime;
+      if (getLastDataEvents() != null && !getLastDataEvents().isEmpty()) {
+        // if last data event is after the start time then use last data event time
+        long lastEventTime = getLastDataEvents().get(getLastDataEvents().size()-1).getTimestamp();
+        postDataStartTime = startTime > lastEventTime ? startTime : lastEventTime;
+      }
+      return (getFinishTime() - postDataStartTime);
+    }
+    return -1;
+  }
+
+  public final long getAllocationToEndTimeInterval() {
+    return (endTime - allocationTime);
+  }
+  
+  public final long getAllocationToStartTimeInterval() {
+    return (startTime - allocationTime);
+  }
+  
+  public final long getCreationToAllocationTimeInterval() {
+    return (allocationTime - creationTime);
+  }
+
+  public final long getStartTime() {
+    return startTime;
+  }
+
+  public final long getFinishTime() {
+    return endTime;
+  }
+
+  public final long getCreationTime() {
+    return creationTime;
+  }
+  
+  public final DataDependencyEvent getLastDataEventInfo(long timeThreshold) {
+    for (int i=lastDataEvents.size()-1; i>=0; i--) {
+      // walk back in time until we get first event that happened before the threshold
+      DataDependencyEvent item = lastDataEvents.get(i);
+      if (item.getTimestamp() < timeThreshold) {
+        return item;
+      }
+    }
+    return null;
+  }
+  
+  public final long getTimeTaken() {
+    return getFinishTimeInterval() - getStartTimeInterval();
+  }
+
+  public final long getCreationTimeInterval() {
+    return creationTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
+  }
+  
+  public final String getCreationCausalTA() {
+    return creationCausalTA;
+  }
+
+  public final long getAllocationTime() {
+    return allocationTime;
+  }
+  
+  public final String getShortName() {
+    return getTaskInfo().getVertexInfo().getVertexName() + " : " + 
+    taskAttemptId.substring(taskAttemptId.lastIndexOf('_', taskAttemptId.lastIndexOf('_') - 1) + 1);
+  }
+
+  @Override
+  public final String getDiagnostics() {
+    return diagnostics;
+  }
+  
+  public final String getTerminationCause() {
+    return terminationCause;
+  }
+
+  public static TaskAttemptInfo create(JSONObject taskInfoObject) throws JSONException {
+    return new TaskAttemptInfo(taskInfoObject);
+  }
+
+  public final boolean isLocalityInfoAvailable() {
+    Map<String, TezCounter> dataLocalTask = getCounter(DAGCounter.class.getName(),
+        DAGCounter.DATA_LOCAL_TASKS.toString());
+    Map<String, TezCounter> rackLocalTask = getCounter(DAGCounter.class.getName(),
+        DAGCounter.RACK_LOCAL_TASKS.toString());
+
+    Map<String, TezCounter> otherLocalTask = getCounter(DAGCounter.class.getName(),
+        DAGCounter.OTHER_LOCAL_TASKS.toString());
+
+    if (!dataLocalTask.isEmpty() || !rackLocalTask.isEmpty() || !otherLocalTask.isEmpty()) {
+      return true;
+    }
+    return false;
+  }
+  
+  public final String getDetailedStatus() {
+    if (!Strings.isNullOrEmpty(getTerminationCause())) {
+      return getStatus() + ":" + getTerminationCause();
+    }
+    return getStatus();
+  }
+
+  public final TezCounter getLocalityInfo() {
+    Map<String, TezCounter> dataLocalTask = getCounter(DAGCounter.class.getName(),
+        DAGCounter.DATA_LOCAL_TASKS.toString());
+    Map<String, TezCounter> rackLocalTask = getCounter(DAGCounter.class.getName(),
+        DAGCounter.RACK_LOCAL_TASKS.toString());
+    Map<String, TezCounter> otherLocalTask = getCounter(DAGCounter.class.getName(),
+        DAGCounter.OTHER_LOCAL_TASKS.toString());
+
+    if (!dataLocalTask.isEmpty()) {
+      return dataLocalTask.get(DAGCounter.class.getName());
+    }
+
+    if (!rackLocalTask.isEmpty()) {
+      return rackLocalTask.get(DAGCounter.class.getName());
+    }
+
+    if (!otherLocalTask.isEmpty()) {
+      return otherLocalTask.get(DAGCounter.class.getName());
+    }
+    return null;
+  }
+
+  public final TaskInfo getTaskInfo() {
+    return taskInfo;
+  }
+
+  public final String getTaskAttemptId() {
+    return taskAttemptId;
+  }
+
+  public final String getNodeId() {
+    return nodeId;
+  }
+
+  public final String getStatus() {
+    return status;
+  }
+
+  public final Container getContainer() {
+    return container;
+  }
+
+  public final String getLogURL() {
+    return logUrl;
+  }
+
+  /**
+   * Get merge counter per source. Available in case of reducer task
+   *
+   * @return Map<String, TezCounter> merge phase time at every counter group level
+   */
+  public final Map<String, TezCounter> getMergePhaseTime() {
+    return getCounter(null, TaskCounter.MERGE_PHASE_TIME.name());
+  }
+
+  /**
+   * Get shuffle counter per source. Available in case of shuffle
+   *
+   * @return Map<String, TezCounter> shuffle phase time at every counter group level
+   */
+  public final Map<String, TezCounter> getShufflePhaseTime() {
+    return getCounter(null, TaskCounter.SHUFFLE_PHASE_TIME.name());
+  }
+
+  /**
+   * Get OUTPUT_BYTES counter per source. Available in case of map outputs
+   *
+   * @return Map<String, TezCounter> output bytes counter at every counter group
+   */
+  public final Map<String, TezCounter> getTaskOutputBytes() {
+    return getCounter(null, TaskCounter.OUTPUT_BYTES.name());
+  }
+
+  /**
+   * Get number of spills per source.  (SPILLED_RECORDS / OUTPUT_RECORDS)
+   *
+   * @return Map<String, Long> spill count details
+   */
+  public final Map<String, Float> getSpillCount() {
+    Map<String, TezCounter> outputRecords = getCounter(null, "OUTPUT_RECORDS");
+    Map<String, TezCounter> spilledRecords = getCounter(null, "SPILLED_RECORDS");
+    Map<String, Float> result = Maps.newHashMap();
+    for (Map.Entry<String, TezCounter> entry : spilledRecords.entrySet()) {
+      String source = entry.getKey();
+      long spilledVal = entry.getValue().getValue();
+      long outputVal = outputRecords.get(source).getValue();
+      result.put(source, (spilledVal * 1.0f) / (outputVal * 1.0f));
+    }
+    return result;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("taskAttemptId=").append(getTaskAttemptId()).append(", ");
+    sb.append("creationTime=").append(getCreationTimeInterval()).append(", ");
+    sb.append("startTime=").append(getStartTimeInterval()).append(", ");
+    sb.append("finishTime=").append(getFinishTimeInterval()).append(", ");
+    sb.append("timeTaken=").append(getTimeTaken()).append(", ");
+    sb.append("events=").append(getEvents()).append(", ");
+    sb.append("diagnostics=").append(getDiagnostics()).append(", ");
+    sb.append("container=").append(getContainer()).append(", ");
+    sb.append("nodeId=").append(getNodeId()).append(", ");
+    sb.append("logURL=").append(getLogURL()).append(", ");
+    sb.append("status=").append(getStatus());
+    sb.append("]");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
new file mode 100644
index 0000000..c6f89d6
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+
+import org.apache.hadoop.util.StringInterner;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class TaskInfo extends BaseInfo {
+
+  private final long startTime;
+  private final long endTime;
+  private final String diagnostics;
+  private final String successfulAttemptId;
+  private final long scheduledTime;
+  private final String status;
+  private final String taskId;
+
+  private VertexInfo vertexInfo;
+
+  private Map<String, TaskAttemptInfo> attemptInfoMap = Maps
+      .newHashMap();
+
+  TaskInfo(JSONObject jsonObject) throws JSONException {
+    super(jsonObject);
+
+    Preconditions.checkArgument(
+        jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
+            (Constants.TEZ_TASK_ID));
+
+    taskId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY));
+
+    //Parse additional Info
+    final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+    startTime = otherInfoNode.optLong(Constants.START_TIME);
+    endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+    diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
+    successfulAttemptId = StringInterner.weakIntern(
+        otherInfoNode.optString(Constants.SUCCESSFUL_ATTEMPT_ID));
+    scheduledTime = otherInfoNode.optLong(Constants.SCHEDULED_TIME);
+    status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS));
+  }
+
+  @Override
+  public final long getStartTimeInterval() {
+    return startTime - (vertexInfo.getDagInfo().getStartTime());
+  }
+
+  public final long getStartTime() {
+    return startTime;
+  }
+
+  public final long getFinishTime() {
+    return endTime;
+  }
+
+  @Override
+  public final long getFinishTimeInterval() {
+    long taskFinishTime =  endTime - (vertexInfo.getDagInfo().getStartTime());
+    if (taskFinishTime < 0) {
+      //probably vertex is not complete or failed in middle. get the last task attempt time
+      for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+        taskFinishTime = (attemptInfo.getFinishTimeInterval() > taskFinishTime)
+            ? attemptInfo.getFinishTimeInterval() : taskFinishTime;
+      }
+    }
+    return taskFinishTime;
+  }
+
+  @Override
+  public final String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public static TaskInfo create(JSONObject taskInfoObject) throws
+      JSONException {
+    return new TaskInfo(taskInfoObject);
+  }
+
+  void addTaskAttemptInfo(TaskAttemptInfo taskAttemptInfo) {
+    attemptInfoMap.put(taskAttemptInfo.getTaskAttemptId(), taskAttemptInfo);
+  }
+
+  void setVertexInfo(VertexInfo vertexInfo) {
+    Preconditions.checkArgument(vertexInfo != null, "Provide valid vertexInfo");
+    this.vertexInfo = vertexInfo;
+    //link it to vertex
+    vertexInfo.addTaskInfo(this);
+  }
+
+  public final VertexInfo getVertexInfo() {
+    return vertexInfo;
+  }
+
+  /**
+   * Get all task attempts
+   *
+   * @return list of task attempt info
+   */
+  public final List<TaskAttemptInfo> getTaskAttempts() {
+    List<TaskAttemptInfo> attemptsList = Lists.newLinkedList(attemptInfoMap.values());
+    Collections.sort(attemptsList, orderingOnAttemptStartTime());
+    return Collections.unmodifiableList(attemptsList);
+  }
+
+  /**
+   * Get list of failed tasks
+   *
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskAttemptInfo> getFailedTaskAttempts() {
+    return getTaskAttempts(TaskAttemptState.FAILED);
+  }
+
+  /**
+   * Get list of killed tasks
+   *
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskAttemptInfo> getKilledTaskAttempts() {
+    return getTaskAttempts(TaskAttemptState.KILLED);
+  }
+
+  /**
+   * Get list of failed tasks
+   *
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskAttemptInfo> getSuccessfulTaskAttempts() {
+    return getTaskAttempts(TaskAttemptState.SUCCEEDED);
+  }
+
+  /**
+   * Get list of tasks belonging to a specific state
+   *
+   * @param state
+   * @return Collection<TaskAttemptInfo>
+   */
+  public final List<TaskAttemptInfo> getTaskAttempts(final TaskAttemptState state) {
+    return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList
+                    (attemptInfoMap.values()), new Predicate<TaskAttemptInfo>() {
+                  @Override
+                  public boolean apply(TaskAttemptInfo input) {
+                    return input.getStatus() != null && input.getStatus().equals(state.toString());
+                  }
+                }
+            )
+        )
+    );
+  }
+
+  /**
+   * Get the set of containers on which the task attempts ran for this task
+   *
+   * @return Multimap<Container, TaskAttemptInfo> task attempt details at container level
+   */
+  public final Multimap<Container, TaskAttemptInfo> getContainersMapping() {
+    Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create();
+    for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+      containerMapping.put(attemptInfo.getContainer(), attemptInfo);
+    }
+    return Multimaps.unmodifiableMultimap(containerMapping);
+  }
+
+  /**
+   * Get the successful task attempt
+   *
+   * @return TaskAttemptInfo
+   */
+  public final TaskAttemptInfo getSuccessfulTaskAttempt() {
+    if (!Strings.isNullOrEmpty(getSuccessfulAttemptId())) {
+      for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+        if (attemptInfo.getTaskAttemptId().equals(getSuccessfulAttemptId())) {
+          return attemptInfo;
+        }
+      }
+    }
+    // fall back to checking status if successful attempt id is not available
+    for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+      if (attemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.toString())) {
+        return attemptInfo;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get last task attempt to finish
+   *
+   * @return TaskAttemptInfo
+   */
+  public final TaskAttemptInfo getLastTaskAttemptToFinish() {
+    List<TaskAttemptInfo> attemptsList = getTaskAttempts();
+    if (attemptsList.isEmpty()) {
+      return null;
+    }
+
+    return Ordering.from(new Comparator<TaskAttemptInfo>() {
+      @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+        return (o1.getFinishTimeInterval() < o2.getFinishTimeInterval()) ? -1 :
+            ((o1.getFinishTimeInterval() == o2.getFinishTimeInterval()) ?
+                0 : 1);
+      }
+    }).max(attemptsList);
+  }
+
+  /**
+   * Get average task attempt duration. Includes succesful and failed tasks
+   *
+   * @return float
+   */
+  public final float getAvgTaskAttemptDuration() {
+    float totalTaskDuration = 0;
+    List<TaskAttemptInfo> attemptsList = getTaskAttempts();
+    if (attemptsList.size() == 0) {
+      return 0;
+    }
+    for (TaskAttemptInfo attemptInfo : attemptsList) {
+      totalTaskDuration += attemptInfo.getTimeTaken();
+    }
+    return ((totalTaskDuration * 1.0f) / attemptsList.size());
+  }
+
+  private Ordering<TaskAttemptInfo> orderingOnTimeTaken() {
+    return Ordering.from(new Comparator<TaskAttemptInfo>() {
+      @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+        return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 :
+            ((o1.getTimeTaken() == o2.getTimeTaken()) ?
+                0 : 1);
+      }
+    });
+  }
+
+  private Ordering<TaskAttemptInfo> orderingOnAttemptStartTime() {
+    return Ordering.from(new Comparator<TaskAttemptInfo>() {
+      @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+        return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 :
+            ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1);
+      }
+    });
+  }
+
+  /**
+   * Get min task attempt duration.  This includes successful/failed task attempts as well
+   *
+   * @return long
+   */
+  public final long getMinTaskAttemptDuration() {
+    List<TaskAttemptInfo> attemptsList = getTaskAttempts();
+    if (attemptsList.isEmpty()) {
+      return 0;
+    }
+
+    return orderingOnTimeTaken().min(attemptsList).getTimeTaken();
+  }
+
+  /**
+   * Get max task attempt duration.  This includes successful/failed task attempts as well
+   *
+   * @return long
+   */
+  public final long getMaxTaskAttemptDuration() {
+    List<TaskAttemptInfo> attemptsList = getTaskAttempts();
+    if (attemptsList.isEmpty()) {
+      return 0;
+    }
+
+    return orderingOnTimeTaken().max(attemptsList).getTimeTaken();
+  }
+
+  public final int getNumberOfTaskAttempts() {
+    return getTaskAttempts().size();
+  }
+
+  public final String getStatus() {
+    return status;
+  }
+
+  public final String getTaskId() {
+    return taskId;
+  }
+
+  public final long getTimeTaken() {
+    return getFinishTimeInterval() - getStartTimeInterval();
+  }
+
+  public final String getSuccessfulAttemptId() {
+    return successfulAttemptId;
+  }
+
+  public final long getAbsoluteScheduleTime() {
+    return scheduledTime;
+  }
+
+  public final long getScheduledTime() {
+    return scheduledTime - this.getVertexInfo().getDagInfo().getStartTime();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("taskId=").append(getTaskId()).append(", ");
+    sb.append("scheduledTime=").append(getAbsoluteScheduleTime()).append(", ");
+    sb.append("startTime=").append(getStartTimeInterval()).append(", ");
+    sb.append("finishTime=").append(getFinishTimeInterval()).append(", ");
+    sb.append("timeTaken=").append(getTimeTaken()).append(", ");
+    sb.append("events=").append(getEvents()).append(", ");
+    sb.append("diagnostics=").append(getDiagnostics()).append(", ");
+    sb.append("successfulAttempId=").append(getSuccessfulAttemptId()).append(", ");
+    sb.append("status=").append(getStatus()).append(", ");
+    sb.append("vertexName=").append(getVertexInfo().getVertexName());
+    sb.append("]");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java
new file mode 100644
index 0000000..97d18cd
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+public class VersionInfo {
+
+  private final String buildTime;
+  private final String revision;
+  private final String version;
+
+  public VersionInfo(String buildTime, String revision, String version) {
+    this.buildTime = buildTime;
+    this.revision = revision;
+    this.version = version;
+  }
+
+  public String getBuildTime() {
+    return buildTime;
+  }
+
+  public String getRevision() {
+    return revision;
+  }
+
+  public String getVersion() {
+    return version;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
new file mode 100644
index 0000000..50647fe
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
@@ -0,0 +1,636 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+
+import org.apache.hadoop.util.StringInterner;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class VertexInfo extends BaseInfo {
+
+  private final String vertexId;
+  private final String vertexName;
+  private final long finishTime;
+  private final long initTime;
+  private final long initRequestedTime;
+  private final long startTime;
+  private final long startRequestedTime;
+  
+  private final String diagnostics;
+  private final String processorClass;
+
+  private final int numTasks;
+  private final int failedTasks;
+  private final int completedTasks;
+  private final int succeededTasks;
+  private final int killedTasks;
+  private final int numFailedTaskAttempts;
+
+  private final String status;
+
+  //TaskID --> TaskInfo for internal reference
+  private Map<String, TaskInfo> taskInfoMap;
+
+  private final List<EdgeInfo> inEdgeList;
+  private final List<EdgeInfo> outEdgeList;
+
+  private final List<AdditionalInputOutputDetails> additionalInputInfoList;
+  private final List<AdditionalInputOutputDetails> additionalOutputInfoList;
+  
+  private long avgPostDataExecutionTimeInterval = -1;
+
+  private DagInfo dagInfo;
+
+  VertexInfo(JSONObject jsonObject) throws JSONException {
+    super(jsonObject);
+
+    Preconditions.checkArgument(
+        jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
+            (Constants.TEZ_VERTEX_ID));
+
+    vertexId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY));
+    taskInfoMap = Maps.newHashMap();
+
+    inEdgeList = Lists.newLinkedList();
+    outEdgeList = Lists.newLinkedList();
+    additionalInputInfoList = Lists.newLinkedList();
+    additionalOutputInfoList = Lists.newLinkedList();
+
+    //Parse additional Info
+    JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+    initRequestedTime = otherInfoNode.optLong(Constants.INIT_REQUESTED_TIME);
+    startRequestedTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME);
+    startTime = otherInfoNode.optLong(Constants.START_TIME);
+    initTime = otherInfoNode.optLong(Constants.INIT_TIME);
+    finishTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+    diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
+    numTasks = otherInfoNode.optInt(Constants.NUM_TASKS);
+    failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS);
+    succeededTasks =
+        otherInfoNode.optInt(Constants.NUM_SUCCEEDED_TASKS);
+    completedTasks =
+        otherInfoNode.optInt(Constants.NUM_COMPLETED_TASKS);
+    killedTasks = otherInfoNode.optInt(Constants.NUM_KILLED_TASKS);
+    numFailedTaskAttempts =
+        otherInfoNode.optInt(Constants.NUM_FAILED_TASKS_ATTEMPTS);
+    vertexName = StringInterner.weakIntern(otherInfoNode.optString(Constants.VERTEX_NAME));
+    processorClass = StringInterner.weakIntern(otherInfoNode.optString(Constants.PROCESSOR_CLASS_NAME));
+    status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS));
+  }
+
+  public static VertexInfo create(JSONObject vertexInfoObject) throws
+      JSONException {
+    return new VertexInfo(vertexInfoObject);
+  }
+
+  /**
+   * Update edge details with source and destination vertex objects.
+   */
+  private void updateEdgeInfo() {
+    if (dagInfo.getNumVertices() == dagInfo.getVertices().size()) {
+      //We can update EdgeInfo when all vertices are parsed
+      Map<String, VertexInfo> vertexMapping = dagInfo.getVertexMapping();
+      for (EdgeInfo edge : dagInfo.getEdges()) {
+        VertexInfo sourceVertex = vertexMapping.get(edge.getInputVertexName());
+        VertexInfo destinationVertex = vertexMapping.get(edge.getOutputVertexName());
+        edge.setSourceVertex(sourceVertex);
+        edge.setDestinationVertex(destinationVertex);
+      }
+    }
+  }
+
+  void addTaskInfo(TaskInfo taskInfo) {
+    this.taskInfoMap.put(taskInfo.getTaskId(), taskInfo);
+  }
+
+  void setAdditionalInputInfoList(List<AdditionalInputOutputDetails> additionalInputInfoList) {
+    this.additionalInputInfoList.clear();
+    this.additionalInputInfoList.addAll(additionalInputInfoList);
+  }
+  
+  void setAdditionalOutputInfoList(List<AdditionalInputOutputDetails> additionalOutputInfoList) {
+    this.additionalOutputInfoList.clear();
+    this.additionalOutputInfoList.addAll(additionalOutputInfoList);
+  }
+
+  void addInEdge(EdgeInfo edgeInfo) {
+    this.inEdgeList.add(edgeInfo);
+  }
+
+  void addOutEdge(EdgeInfo edgeInfo) {
+    this.outEdgeList.add(edgeInfo);
+  }
+
+  void setDagInfo(DagInfo dagInfo) {
+    Preconditions.checkArgument(dagInfo != null, "Provide valid dagInfo");
+    this.dagInfo = dagInfo;
+    //link vertex to dagInfo
+    dagInfo.addVertexInfo(this);
+    updateEdgeInfo();
+  }
+
+  public List<AdditionalInputOutputDetails> getAdditionalInputInfoList() {
+    return Collections.unmodifiableList(additionalInputInfoList);
+  }
+
+  public List<AdditionalInputOutputDetails> getAdditionalOutputInfoList() {
+    return Collections.unmodifiableList(additionalOutputInfoList);
+  }
+
+  @Override
+  public final long getStartTimeInterval() {
+    return startTime - (dagInfo.getStartTime());
+  }
+
+  public final long getFirstTaskStartTimeInterval() {
+    TaskInfo firstTask = getFirstTaskToStart();
+    if (firstTask == null) {
+      return 0;
+    }
+    return firstTask.getStartTimeInterval();
+  }
+
+  public final long getLastTaskFinishTimeInterval() {
+    if (getLastTaskToFinish() == null || getLastTaskToFinish().getFinishTimeInterval() < 0) {
+        return dagInfo.getFinishTimeInterval();
+    }
+    return getLastTaskToFinish().getFinishTimeInterval();
+  }
+  
+  public final long getAvgPostDataExecutionTimeInterval() {
+    if (avgPostDataExecutionTimeInterval == -1) {
+      long totalExecutionTime = 0;
+      long totalAttempts = 0;
+      for (TaskInfo task : getTasks()) {
+        TaskAttemptInfo attempt = task.getSuccessfulTaskAttempt();
+        if (attempt != null) {
+          // count only time after last data was received
+          long execTime = attempt.getPostDataExecutionTimeInterval();
+          if (execTime >= 0) {
+            totalExecutionTime += execTime;
+            totalAttempts++;
+          }
+        }
+      }
+      if (totalAttempts > 0) {
+        avgPostDataExecutionTimeInterval = Math.round(totalExecutionTime*1.0/totalAttempts);
+      }
+    }
+    return avgPostDataExecutionTimeInterval;
+  }
+
+  public final long getStartTime() {
+    return startTime;
+  }
+
+  public final long getFinishTime() {
+    return finishTime;
+  }
+
+  public final long getInitTime() {
+    return initTime;
+  }
+  
+  public final long getInitRequestedTime() {
+    return initRequestedTime;
+  }
+
+  public final long getStartRequestedTime() {
+    return startRequestedTime;
+  }
+  
+  @Override
+  public final long getFinishTimeInterval() {
+    long vertexEndTime = finishTime - (dagInfo.getStartTime());
+    if (vertexEndTime < 0) {
+      //probably vertex is not complete or failed in middle. get the last task attempt time
+      for (TaskInfo taskInfo : getTasks()) {
+        vertexEndTime = (taskInfo.getFinishTimeInterval() > vertexEndTime)
+            ? taskInfo.getFinishTimeInterval() : vertexEndTime;
+      }
+    }
+    return vertexEndTime;
+  }
+
+  @Override
+  public final String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public final String getVertexName() {
+    return vertexName;
+  }
+  
+  public final String getVertexId() {
+    return vertexId;
+  }
+
+  //Quite possible that getFinishTime is not yet recorded for failed vertices (or killed vertices)
+  //Start time of vertex infers that the dependencies are done and AM has inited it.
+  public final long getTimeTaken() {
+    return (getFinishTimeInterval() - getStartTimeInterval());
+  }
+
+  //Time taken for last task to finish  - time taken for first task to start
+  public final long getTimeTakenForTasks() {
+    return (getLastTaskFinishTimeInterval() - getFirstTaskStartTimeInterval());
+  }
+
+  public final long getInitTimeInterval() {
+    return initTime - dagInfo.getStartTime();
+  }
+
+  public final int getNumTasks() {
+    return numTasks;
+  }
+
+  public final int getFailedTasksCount() {
+    return failedTasks;
+  }
+
+  public final int getKilledTasksCount() {
+    return killedTasks;
+  }
+
+  public final int getCompletedTasksCount() {
+    return completedTasks;
+  }
+
+  public final int getSucceededTasksCount() {
+    return succeededTasks;
+  }
+
+  public final int getNumFailedTaskAttemptsCount() {
+    return numFailedTaskAttempts;
+  }
+
+  public final String getProcessorClassName() {
+    return processorClass;
+
+  }
+
+
+  private List<TaskInfo> getTasksInternal() {
+    return Lists.newLinkedList(taskInfoMap.values());
+  }
+
+  /**
+   * Get all tasks
+   *
+   * @return list of taskInfo
+   */
+  public final List<TaskInfo> getTasks() {
+    return Collections.unmodifiableList(getTasksInternal());
+  }
+
+  /**
+   * Get all tasks in sorted order
+   *
+   * @param sorted
+   * @param ordering
+   * @return list of TaskInfo
+   */
+  public final List<TaskInfo> getTasks(boolean sorted, @Nullable Ordering<TaskInfo> ordering) {
+    List<TaskInfo> taskInfoList = getTasksInternal();
+    if (sorted) {
+      Collections.sort(taskInfoList, ((ordering == null) ? orderingOnStartTime() : ordering));
+    }
+    return Collections.unmodifiableList(taskInfoList);
+  }
+
+  /**
+   * Get list of failed tasks
+   *
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskInfo> getFailedTasks() {
+    return getTasks(TaskState.FAILED);
+  }
+
+  /**
+   * Get list of killed tasks
+   *
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskInfo> getKilledTasks() {
+    return getTasks(TaskState.KILLED);
+  }
+
+  /**
+   * Get list of failed tasks
+   *
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskInfo> getSuccessfulTasks() {
+    return getTasks(TaskState.SUCCEEDED);
+  }
+
+  /**
+   * Get list of tasks belonging to a specific state
+   *
+   * @param state
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskInfo> getTasks(final TaskState state) {
+    return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList
+                    (taskInfoMap.values()), new Predicate<TaskInfo>() {
+                  @Override public boolean apply(TaskInfo input) {
+                    return input.getStatus() != null && input.getStatus().equals(state.toString());
+                  }
+                }
+            )
+        )
+    );
+  }
+
+  /**
+   * Get source vertices for this vertex
+   *
+   * @return List<VertexInfo> list of incoming vertices to this vertex
+   */
+  public final List<VertexInfo> getInputVertices() {
+    List<VertexInfo> inputVertices = Lists.newLinkedList();
+    for (EdgeInfo edge : inEdgeList) {
+      inputVertices.add(edge.getSourceVertex());
+    }
+    return Collections.unmodifiableList(inputVertices);
+  }
+
+  /**
+   * Get destination vertices for this vertex
+   *
+   * @return List<VertexInfo> list of output vertices
+   */
+  public final List<VertexInfo> getOutputVertices() {
+    List<VertexInfo> outputVertices = Lists.newLinkedList();
+    for (EdgeInfo edge : outEdgeList) {
+      outputVertices.add(edge.getDestinationVertex());
+    }
+    return Collections.unmodifiableList(outputVertices);
+  }
+
+  // expensive method to call for large DAGs as it creates big lists on every call
+  private List<TaskAttemptInfo> getTaskAttemptsInternal() {
+    List<TaskAttemptInfo> taskAttemptInfos = Lists.newLinkedList();
+    for (TaskInfo taskInfo : getTasks()) {
+      taskAttemptInfos.addAll(taskInfo.getTaskAttempts());
+    }
+    return taskAttemptInfos;
+  }
+
+  /**
+   * Get all task attempts
+   *
+   * @return List<TaskAttemptInfo> list of attempts
+   */
+  public List<TaskAttemptInfo> getTaskAttempts() {
+    return Collections.unmodifiableList(getTaskAttemptsInternal());
+  }
+
+  /**
+   * Get all task attempts in sorted order
+   *
+   * @param sorted
+   * @param ordering
+   * @return list of TaskAttemptInfo
+   */
+  public final List<TaskAttemptInfo> getTaskAttempts(boolean sorted,
+      @Nullable Ordering<TaskAttemptInfo> ordering) {
+    List<TaskAttemptInfo> taskAttemptInfos = getTaskAttemptsInternal();
+    if (sorted) {
+      Collections.sort(taskAttemptInfos, ((ordering == null) ? orderingOnAttemptStartTime() : ordering));
+    }
+    return Collections.unmodifiableList(taskAttemptInfos);
+  }
+
+  public final TaskInfo getTask(String taskId) {
+    return taskInfoMap.get(taskId);
+  }
+
+  /**
+   * Get incoming edge information for a specific vertex
+   *
+   * @return List<EdgeInfo> list of input edges on this vertex
+   */
+  public final List<EdgeInfo> getInputEdges() {
+    return Collections.unmodifiableList(inEdgeList);
+  }
+
+  /**
+   * Get outgoing edge information for a specific vertex
+   *
+   * @return List<EdgeInfo> list of output edges on this vertex
+   */
+  public final List<EdgeInfo> getOutputEdges() {
+    return Collections.unmodifiableList(outEdgeList);
+  }
+
+  public final Multimap<Container, TaskAttemptInfo> getContainersMapping() {
+    Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create();
+    for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+      containerMapping.put(attemptInfo.getContainer(), attemptInfo);
+    }
+    return Multimaps.unmodifiableMultimap(containerMapping);
+  }
+
+  /**
+   * Get first task to start
+   *
+   * @return TaskInfo
+   */
+  public final TaskInfo getFirstTaskToStart() {
+    List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values());
+    if (taskInfoList.size() == 0) {
+      return null;
+    }
+    Collections.sort(taskInfoList, new Comparator<TaskInfo>() {
+      @Override public int compare(TaskInfo o1, TaskInfo o2) {
+        return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 :
+            ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ?
+                0 : 1);
+      }
+    });
+    return taskInfoList.get(0);
+  }
+
+  /**
+   * Get last task to finish
+   *
+   * @return TaskInfo
+   */
+  public final TaskInfo getLastTaskToFinish() {
+    List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values());
+    if (taskInfoList.size() == 0) {
+      return null;
+    }
+    Collections.sort(taskInfoList, new Comparator<TaskInfo>() {
+      @Override public int compare(TaskInfo o1, TaskInfo o2) {
+        return (o1.getFinishTimeInterval() > o2.getFinishTimeInterval()) ? -1 :
+            ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ?
+                0 : 1);
+      }
+    });
+    return taskInfoList.get(0);
+  }
+
+  /**
+   * Get average task duration
+   *
+   * @return long
+   */
+  public final float getAvgTaskDuration() {
+    float totalTaskDuration = 0;
+    List<TaskInfo> tasksList = getTasks();
+    if (tasksList.size() == 0) {
+      return 0;
+    }
+    for (TaskInfo taskInfo : tasksList) {
+      totalTaskDuration += taskInfo.getTimeTaken();
+    }
+    return ((totalTaskDuration * 1.0f) / tasksList.size());
+  }
+
+  /**
+   * Get min task duration in vertex
+   *
+   * @return long
+   */
+  public final long getMinTaskDuration() {
+    TaskInfo taskInfo = getMinTaskDurationTask();
+    return (taskInfo != null) ? taskInfo.getTimeTaken() : 0;
+  }
+
+  /**
+   * Get max task duration in vertex
+   *
+   * @return long
+   */
+  public final long getMaxTaskDuration() {
+    TaskInfo taskInfo = getMaxTaskDurationTask();
+    return (taskInfo != null) ? taskInfo.getTimeTaken() : 0;
+  }
+
+  private Ordering<TaskInfo> orderingOnTimeTaken() {
+    return Ordering.from(new Comparator<TaskInfo>() {
+      @Override public int compare(TaskInfo o1, TaskInfo o2) {
+        return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 :
+            ((o1.getTimeTaken() == o2.getTimeTaken()) ? 0 : 1);
+      }
+    });
+  }
+
+  private Ordering<TaskInfo> orderingOnStartTime() {
+    return Ordering.from(new Comparator<TaskInfo>() {
+      @Override public int compare(TaskInfo o1, TaskInfo o2) {
+        return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 :
+            ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1);
+      }
+    });
+  }
+
+  private Ordering<TaskAttemptInfo> orderingOnAttemptStartTime() {
+    return Ordering.from(new Comparator<TaskAttemptInfo>() {
+      @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+        return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 :
+            ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1);
+      }
+    });
+  }
+
+  /**
+   * Get min task duration in vertex
+   *
+   * @return TaskInfo
+   */
+  public final TaskInfo getMinTaskDurationTask() {
+    List<TaskInfo> taskInfoList = getTasks();
+    if (taskInfoList.size() == 0) {
+      return null;
+    }
+
+    return orderingOnTimeTaken().min(taskInfoList);
+  }
+
+  /**
+   * Get max task duration in vertex
+   *
+   * @return TaskInfo
+   */
+  public final TaskInfo getMaxTaskDurationTask() {
+    List<TaskInfo> taskInfoList = getTasks();
+    if (taskInfoList.size() == 0) {
+      return null;
+    }
+    return orderingOnTimeTaken().max(taskInfoList);
+  }
+
+  public final String getStatus() {
+    return status;
+  }
+
+  public final DagInfo getDagInfo() {
+    return dagInfo;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("vertexName=").append(getVertexName()).append(", ");
+    sb.append("events=").append(getEvents()).append(", ");
+    sb.append("initTime=").append(getInitTimeInterval()).append(", ");
+    sb.append("startTime=").append(getStartTimeInterval()).append(", ");
+    sb.append("endTime=").append(getFinishTimeInterval()).append(", ");
+    sb.append("timeTaken=").append(getTimeTaken()).append(", ");
+    sb.append("diagnostics=").append(getDiagnostics()).append(", ");
+    sb.append("numTasks=").append(getNumTasks()).append(", ");
+    sb.append("processorClassName=").append(getProcessorClassName()).append(", ");
+    sb.append("numCompletedTasks=").append(getCompletedTasksCount()).append(", ");
+    sb.append("numFailedTaskAttempts=").append(getNumFailedTaskAttemptsCount()).append(", ");
+    sb.append("numSucceededTasks=").append(getSucceededTasksCount()).append(", ");
+    sb.append("numFailedTasks=").append(getFailedTasks()).append(", ");
+    sb.append("numKilledTasks=").append(getKilledTasks()).append(", ");
+    sb.append("tasksCount=").append(taskInfoMap.size()).append(", ");
+    sb.append("status=").append(getStatus());
+    sb.append("]");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
new file mode 100644
index 0000000..ffb854a
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.utils;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.datamodel.Event;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.List;
+
+@InterfaceAudience.Private
+public class Utils {
+
+  private static final String LOG4J_CONFIGURATION = "log4j.configuration";
+
+  /**
+   * Parse tez counters from json
+   *
+   * @param jsonObject
+   * @return TezCounters
+   * @throws JSONException
+   */
+  public static TezCounters parseTezCountersFromJSON(JSONObject jsonObject)
+      throws JSONException {
+    TezCounters counters = new TezCounters();
+
+    if (jsonObject == null) {
+      return counters; //empty counters.
+    }
+
+    final JSONArray counterGroupNodes = jsonObject.optJSONArray(Constants.COUNTER_GROUPS);
+    if (counterGroupNodes != null) {
+      for (int i = 0; i < counterGroupNodes.length(); i++) {
+        JSONObject counterGroupNode = counterGroupNodes.optJSONObject(i);
+        final String groupName = counterGroupNode.optString(Constants.COUNTER_GROUP_NAME);
+        final String groupDisplayName = counterGroupNode.optString(
+            Constants.COUNTER_GROUP_DISPLAY_NAME, groupName);
+
+        CounterGroup group = counters.addGroup(groupName, groupDisplayName);
+
+        final JSONArray counterNodes = counterGroupNode.optJSONArray(Constants.COUNTERS);
+
+        //Parse counter nodes
+        for (int j = 0; j < counterNodes.length(); j++) {
+          JSONObject counterNode = counterNodes.optJSONObject(j);
+          final String counterName = counterNode.getString(Constants.COUNTER_NAME);
+          final String counterDisplayName =
+              counterNode.optString(Constants.COUNTER_DISPLAY_NAME, counterName);
+          final long counterValue = counterNode.getLong(Constants.COUNTER_VALUE);
+          TezCounter counter = group.findCounter(
+              counterName,
+              counterDisplayName);
+          counter.setValue(counterValue);
+        }
+      }
+    }
+    return counters;
+  }
+  
+  public static List<DataDependencyEvent> parseDataEventDependencyFromJSON(JSONObject jsonObject) 
+      throws JSONException {
+    List<DataDependencyEvent> events = Lists.newArrayList();
+    JSONArray fields = jsonObject.optJSONArray(Constants.LAST_DATA_EVENTS);
+    for (int i=0; i<fields.length(); i++) {
+      JSONObject eventMap = fields.getJSONObject(i);
+      events.add(new DataDependencyEvent(
+          StringInterner.weakIntern(eventMap.optString(EntityTypes.TEZ_TASK_ATTEMPT_ID.name())),
+          eventMap.optLong(Constants.TIMESTAMP)));
+    }
+    return events;
+  }
+
+  /**
+   * Parse events from json
+   *
+   * @param eventNodes
+   * @param eventList
+   * @throws JSONException
+   */
+  public static void parseEvents(JSONArray eventNodes, List<Event> eventList) throws
+      JSONException {
+    if (eventNodes == null) {
+      return;
+    }
+    for (int i = 0; i < eventNodes.length(); i++) {
+      JSONObject eventNode = eventNodes.optJSONObject(i);
+      final String eventInfo = eventNode.optString(Constants.EVENT_INFO);
+      final String eventType = eventNode.optString(Constants.EVENT_TYPE);
+      final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP);
+
+      Event event = new Event(eventInfo, eventType, time);
+
+      eventList.add(event);
+
+    }
+  }
+
+  public static void setupRootLogger() {
+    if (Strings.isNullOrEmpty(System.getProperty(LOG4J_CONFIGURATION))) {
+      //By default print to console with INFO level
+      Logger.getRootLogger().
+          addAppender(new ConsoleAppender(new PatternLayout(PatternLayout.TTCC_CONVERSION_PATTERN)));
+      Logger.getRootLogger().setLevel(Level.INFO);
+    }
+  }
+
+}