You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2015/12/07 23:42:56 UTC

[3/5] tez git commit: TEZ-2973. Backport Analyzers to branch-0.7

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
new file mode 100644
index 0000000..2b23294
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -0,0 +1,813 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history;
+
+import com.google.common.collect.Sets;
+import com.sun.tools.internal.ws.processor.ProcessorException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.CallerContext;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.examples.WordCount;
+import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.SimpleHistoryParser;
+import org.apache.tez.history.parser.datamodel.BaseInfo;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.EdgeInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VersionInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.tests.MiniTezClusterWithTimeline;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class TestHistoryParser {
+
+  private static MiniDFSCluster miniDFSCluster;
+  private static MiniTezClusterWithTimeline miniTezCluster;
+
+  //location within miniHDFS cluster's hdfs
+  private static Path inputLoc = new Path("/tmp/sample.txt");
+
+  private final static String INPUT = "Input";
+  private final static String OUTPUT = "Output";
+  private final static String TOKENIZER = "Tokenizer";
+  private final static String SUMMATION = "Summation";
+  private final static String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/";
+  private final static String HISTORY_TXT = "history.txt";
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem fs;
+  private static String TEST_ROOT_DIR =
+      "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + "-tmpDir";
+  private static String TEZ_BASE_DIR =
+      "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + "-tez";
+  private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download";
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false);
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+    miniDFSCluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    fs = miniDFSCluster.getFileSystem();
+    conf.set("fs.defaultFS", fs.getUri().toString());
+
+    setupTezCluster();
+  }
+
+  @AfterClass
+  public static void shutdownCluster() {
+    try {
+      if (miniDFSCluster != null) {
+        miniDFSCluster.shutdown();
+      }
+      if (miniTezCluster != null) {
+        miniTezCluster.stop();
+      }
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
+        FileUtils.deleteDirectory(new File(TEZ_BASE_DIR));
+      } catch (IOException e) {
+        //safe to ignore
+      }
+    }
+  }
+
+  // @Before
+  public static void setupTezCluster() throws Exception {
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2);
+
+    //Enable per edge counters
+    conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService
+        .class.getName());
+
+    conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR);
+
+    miniTezCluster =
+        new MiniTezClusterWithTimeline(TEZ_BASE_DIR, 1, 1, 1, true);
+
+    miniTezCluster.init(conf);
+    miniTezCluster.start();
+
+    createSampleFile(inputLoc);
+
+    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188");
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        ATSHistoryLoggingService.class.getName());
+
+  }
+
+
+  /**
+   * Run a word count example in mini cluster and check if it is possible to download
+   * data from ATS and parse it. Also, run with SimpleHistoryLogging option and verify
+   * if it matches with ATS data.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testParserWithSuccessfulJob() throws Exception {
+    //Run basic word count example.
+    String dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
+        WordCount.SumProcessor.class.getName(), "WordCount", true);
+
+    //Export the data from ATS
+    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
+
+    int result = ATSImportTool.process(args);
+    assertTrue(result == 0);
+
+    //Parse ATS data and verify results
+    DagInfo dagInfoFromATS = getDagInfo(dagId);
+    verifyDagInfo(dagInfoFromATS, true);
+    verifyJobSpecificInfo(dagInfoFromATS);
+
+    //Now run with SimpleHistoryLogging
+    dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
+        WordCount.SumProcessor.class.getName(), "WordCount", false);
+    Thread.sleep(10000); //For all flushes to happen and to avoid half-cooked download.
+
+    DagInfo shDagInfo = getDagInfoFromSimpleHistory(dagId);
+    verifyDagInfo(shDagInfo, false);
+    verifyJobSpecificInfo(shDagInfo);
+
+    //Compare dagInfo by parsing ATS data with DagInfo obtained by parsing SimpleHistoryLog
+    isDAGEqual(dagInfoFromATS, shDagInfo);
+  }
+
+  private DagInfo getDagInfoFromSimpleHistory(String dagId) throws TezException, IOException {
+    TezDAGID tezDAGID = TezDAGID.fromString(dagId);
+    ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(tezDAGID
+        .getApplicationId(), 1);
+    Path historyPath = new Path(conf.get("fs.defaultFS")
+        + SIMPLE_HISTORY_DIR + HISTORY_TXT + "."
+        + applicationAttemptId);
+    FileSystem fs = historyPath.getFileSystem(conf);
+
+    Path localPath = new Path(DOWNLOAD_DIR, HISTORY_TXT);
+    fs.copyToLocalFile(historyPath, localPath);
+    File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT);
+
+    //Now parse via SimpleHistory
+    SimpleHistoryParser parser = new SimpleHistoryParser(localFile);
+    DagInfo dagInfo = parser.getDAGData(dagId);
+    assertTrue(dagInfo.getDagId().equals(dagId));
+    return dagInfo;
+  }
+
+  private void verifyJobSpecificInfo(DagInfo dagInfo) {
+    //Job specific
+    assertTrue(dagInfo.getNumVertices() == 2);
+    assertTrue(dagInfo.getName().equals("WordCount"));
+    assertTrue(dagInfo.getVertex(TOKENIZER).getProcessorClassName().equals(
+        WordCount.TokenProcessor.class.getName()));
+    assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName()
+        .equals(WordCount.SumProcessor.class.getName()));
+    assertTrue(dagInfo.getEdges().size() == 1);
+    EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next();
+    assertTrue(edgeInfo.getDataMovementType().
+        equals(EdgeProperty.DataMovementType.SCATTER_GATHER.toString()));
+    assertTrue(edgeInfo.getSourceVertex().getVertexName().equals(TOKENIZER));
+    assertTrue(edgeInfo.getDestinationVertex().getVertexName().equals(SUMMATION));
+    assertTrue(edgeInfo.getInputVertexName().equals(TOKENIZER));
+    assertTrue(edgeInfo.getOutputVertexName().equals(SUMMATION));
+    assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName()));
+    assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName()));
+    assertTrue(dagInfo.getVertices().size() == 2);
+    String lastSourceTA = null;
+    String lastDataEventSourceTA = null;
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      assertTrue(vertexInfo.getKilledTasksCount() == 0);
+      assertTrue(vertexInfo.getInitRequestedTime() > 0);
+      assertTrue(vertexInfo.getInitTime() > 0);
+      assertTrue(vertexInfo.getStartRequestedTime() > 0);
+      assertTrue(vertexInfo.getStartTime() > 0);
+      assertTrue(vertexInfo.getFinishTime() > 0);
+      long finishTime = 0;
+      for (TaskInfo taskInfo : vertexInfo.getTasks()) {
+        assertTrue(taskInfo.getNumberOfTaskAttempts() == 1);
+        assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0);
+        assertTrue(taskInfo.getMinTaskAttemptDuration() >= 0);
+        assertTrue(taskInfo.getAvgTaskAttemptDuration() >= 0);
+        assertTrue(taskInfo.getLastTaskAttemptToFinish() != null);
+        assertTrue(taskInfo.getContainersMapping().size() > 0);
+        assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
+        assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
+        assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
+        List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts();
+        if (vertexInfo.getVertexName().equals(TOKENIZER)) {
+          // get the last task to finish and track its successful attempt
+          if (finishTime < taskInfo.getFinishTime()) {
+            finishTime = taskInfo.getFinishTime();
+            lastSourceTA = taskInfo.getSuccessfulAttemptId();
+          }
+        } else {
+          for (TaskAttemptInfo attempt : attempts) {
+            DataDependencyEvent item = attempt.getLastDataEvents().get(0);
+            assertTrue(item.getTimestamp() > 0);
+            
+            if (lastDataEventSourceTA == null) {
+              lastDataEventSourceTA = item.getTaskAttemptId();
+            } else {
+              // all attempts should have the same last data event source TA
+              assertTrue(lastDataEventSourceTA.equals(item.getTaskAttemptId()));
+            }
+          }
+        }
+        for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+          assertTrue(attemptInfo.getCreationTime() > 0);
+          assertTrue(attemptInfo.getAllocationTime() > 0);
+          assertTrue(attemptInfo.getStartTime() > 0);
+        }
+      }
+      assertTrue(vertexInfo.getLastTaskToFinish() != null);
+      if (vertexInfo.getVertexName().equals(TOKENIZER)) {
+        assertTrue(vertexInfo.getInputEdges().size() == 0);
+        assertTrue(vertexInfo.getOutputEdges().size() == 1);
+        assertTrue(vertexInfo.getOutputVertices().size() == 1);
+        assertTrue(vertexInfo.getInputVertices().size() == 0);
+      } else {
+        assertTrue(vertexInfo.getInputEdges().size() == 1);
+        assertTrue(vertexInfo.getOutputEdges().size() == 0);
+        assertTrue(vertexInfo.getOutputVertices().size() == 0);
+        assertTrue(vertexInfo.getInputVertices().size() == 1);
+      }
+    }
+    assertTrue(lastSourceTA.equals(lastDataEventSourceTA));
+  }
+
+  /**
+   * Run a word count example in mini cluster.
+   * Provide invalid URL for ATS.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testParserWithSuccessfulJob_InvalidATS() throws Exception {
+    //Run basic word count example.
+    String dagId =  runWordCount(WordCount.TokenProcessor.class.getName(),
+        WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL", true);
+
+    //Export the data from ATS
+    String atsAddress = "--atsAddress=http://atsHost:8188";
+    String[] args = { "--dagId=" + dagId,
+        "--downloadDir=" + DOWNLOAD_DIR,
+        atsAddress
+      };
+
+    try {
+      int result = ATSImportTool.process(args);
+      fail("Should have failed with processException");
+    } catch(ParseException e) {
+      //expects exception
+    }
+  }
+
+  /**
+   * Run a failed job and parse the data from ATS
+   */
+  @Test
+  public void testParserWithFailedJob() throws Exception {
+    //Run a job which would fail
+    String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), FailProcessor.class
+        .getName(), "WordCount-With-Exception", true);
+
+    //Export the data from ATS
+    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
+
+    int result = ATSImportTool.process(args);
+    assertTrue(result == 0);
+
+    //Parse ATS data
+    DagInfo dagInfo = getDagInfo(dagId);
+
+    //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
+    verifyDagInfo(dagInfo, true);
+
+    //Dag specific
+    VertexInfo summationVertex = dagInfo.getVertex(SUMMATION);
+    assertTrue(summationVertex.getFailedTasks().size() == 1); //1 task, 4 attempts failed
+    assertTrue(summationVertex.getFailedTasks().get(0).getFailedTaskAttempts().size() == 4);
+    assertTrue(summationVertex.getStatus().equals(VertexState.FAILED.toString()));
+
+    assertTrue(dagInfo.getFailedVertices().size() == 1);
+    assertTrue(dagInfo.getFailedVertices().get(0).getVertexName().equals(SUMMATION));
+    assertTrue(dagInfo.getSuccessfullVertices().size() == 1);
+    assertTrue(dagInfo.getSuccessfullVertices().get(0).getVertexName().equals(TOKENIZER));
+
+    assertTrue(dagInfo.getStatus().equals(DAGState.FAILED.toString()));
+
+    verifyCounter(dagInfo.getCounter(DAGCounter.NUM_FAILED_TASKS.toString()), null, 4);
+    verifyCounter(dagInfo.getCounter(DAGCounter.NUM_SUCCEEDED_TASKS.toString()), null, 1);
+    verifyCounter(dagInfo.getCounter(DAGCounter.TOTAL_LAUNCHED_TASKS.toString()), null, 5);
+
+    verifyCounter(dagInfo.getCounter(TaskCounter.INPUT_RECORDS_PROCESSED.toString()),
+        "TaskCounter_Tokenizer_INPUT_Input", 10);
+    verifyCounter(dagInfo.getCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ.toString()),
+        "TaskCounter_Tokenizer_OUTPUT_Summation", 0);
+    verifyCounter(dagInfo.getCounter(TaskCounter.OUTPUT_RECORDS.toString()),
+        "TaskCounter_Tokenizer_OUTPUT_Summation",
+        20); //Every line has 2 words. 10 lines x 2 words = 20
+    verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()),
+        "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above
+
+    for (TaskInfo taskInfo : summationVertex.getTasks()) {
+      TaskAttemptInfo lastAttempt = null;
+      for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+        if (lastAttempt != null) {
+          // failed attempt should be causal TA of next attempt
+          assertTrue(lastAttempt.getTaskAttemptId().equals(attemptInfo.getCreationCausalTA()));
+          assertTrue(lastAttempt.getTerminationCause() != null);
+        }
+        lastAttempt = attemptInfo;
+      }
+    }
+
+    //TODO: Need to check for SUMMATION vertex counters. Since all attempts are failed, counters are not getting populated.
+    //TaskCounter.REDUCE_INPUT_RECORDS
+
+    //Verify if the processor exception is given in diagnostics
+    assertTrue(dagInfo.getDiagnostics().contains("Failing this processor for some reason"));
+
+  }
+
+  /**
+   * Adding explicit equals here instead of in DAG/Vertex/Edge where hashCode also needs to
+   * change. Also, some custom comparisons are done here for unit testing.
+   */
+  private void isDAGEqual(DagInfo dagInfo1, DagInfo dagInfo2) {
+    assertNotNull(dagInfo1);
+    assertNotNull(dagInfo2);
+    assertEquals(dagInfo1.getStatus(), dagInfo2.getStatus());
+    isEdgeEqual(dagInfo1.getEdges(), dagInfo2.getEdges());
+    isVertexEqual(dagInfo1.getVertices(), dagInfo2.getVertices());
+  }
+
+  private void isVertexEqual(VertexInfo vertexInfo1, VertexInfo vertexInfo2) {
+    assertTrue(vertexInfo1 != null);
+    assertTrue(vertexInfo2 != null);
+    assertTrue(vertexInfo1.getVertexName().equals(vertexInfo2.getVertexName()));
+    assertTrue(vertexInfo1.getProcessorClassName().equals(vertexInfo2.getProcessorClassName()));
+    assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks());
+    assertTrue(vertexInfo1.getCompletedTasksCount() == vertexInfo2.getCompletedTasksCount());
+    assertTrue(vertexInfo1.getStatus().equals(vertexInfo2.getStatus()));
+
+    isEdgeEqual(vertexInfo1.getInputEdges(), vertexInfo2.getInputEdges());
+    isEdgeEqual(vertexInfo1.getOutputEdges(), vertexInfo2.getOutputEdges());
+
+    assertTrue(vertexInfo1.getInputVertices().size() == vertexInfo2.getInputVertices().size());
+    assertTrue(vertexInfo1.getOutputVertices().size() == vertexInfo2.getOutputVertices().size());
+
+    assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks());
+    isTaskEqual(vertexInfo1.getTasks(), vertexInfo2.getTasks());
+  }
+
+  private void isVertexEqual(List<VertexInfo> vertexList1, List<VertexInfo> vertexList2) {
+    assertTrue("Vertices sizes should be the same", vertexList1.size() == vertexList2.size());
+    Iterator<VertexInfo> it1 = vertexList1.iterator();
+    Iterator<VertexInfo> it2 = vertexList2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      VertexInfo info1 = it1.next();
+      VertexInfo info2 = it2.next();
+      isVertexEqual(info1, info2);
+    }
+  }
+
+  private void isEdgeEqual(EdgeInfo edgeInfo1, EdgeInfo edgeInfo2) {
+    assertTrue(edgeInfo1 != null);
+    assertTrue(edgeInfo2 != null);
+    String info1 = edgeInfo1.toString();
+    String info2 = edgeInfo1.toString();
+    assertTrue(info1.equals(info2));
+  }
+
+  private void isEdgeEqual(Collection<EdgeInfo> info1, Collection<EdgeInfo> info2) {
+    assertTrue("sizes should be the same", info1.size() == info1.size());
+    Iterator<EdgeInfo> it1 = info1.iterator();
+    Iterator<EdgeInfo> it2 = info2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      isEdgeEqual(it1.next(), it2.next());
+    }
+  }
+
+  private void isTaskEqual(Collection<TaskInfo> info1, Collection<TaskInfo> info2) {
+    assertTrue("sizes should be the same", info1.size() == info1.size());
+    Iterator<TaskInfo> it1 = info1.iterator();
+    Iterator<TaskInfo> it2 = info2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      isTaskEqual(it1.next(), it2.next());
+    }
+  }
+
+  private void isTaskEqual(TaskInfo taskInfo1, TaskInfo taskInfo2) {
+    assertTrue(taskInfo1 != null);
+    assertTrue(taskInfo2 != null);
+    assertTrue(taskInfo1.getVertexInfo() != null);
+    assertTrue(taskInfo2.getVertexInfo() != null);
+    assertTrue(taskInfo1.getStatus().equals(taskInfo2.getStatus()));
+    assertTrue(
+        taskInfo1.getVertexInfo().getVertexName()
+            .equals(taskInfo2.getVertexInfo().getVertexName()));
+    isTaskAttemptEqual(taskInfo1.getTaskAttempts(), taskInfo2.getTaskAttempts());
+
+    //Verify counters
+    isCountersSame(taskInfo1, taskInfo2);
+  }
+
+  private void isCountersSame(BaseInfo info1, BaseInfo info2) {
+    isCounterSame(info1.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()),
+        info2.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.SPILLED_RECORDS.name()),
+        info2.getCounter(TaskCounter.SPILLED_RECORDS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()),
+        info2.getCounter(TaskCounter.OUTPUT_RECORDS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.OUTPUT_BYTES.name()),
+        info2.getCounter(TaskCounter.OUTPUT_BYTES.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()),
+        info2.getCounter(TaskCounter.OUTPUT_RECORDS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()),
+        info2.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()),
+        info2.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()));
+  }
+
+  private void isCounterSame(Map<String, TezCounter> counter1, Map<String, TezCounter> counter2) {
+    for (Map.Entry<String, TezCounter> entry : counter1.entrySet()) {
+      String source = entry.getKey();
+      long val = entry.getValue().getValue();
+
+      //check if other counter has the same value
+      assertTrue(counter2.containsKey(entry.getKey()));
+      assertTrue(counter2.get(entry.getKey()).getValue() == val);
+    }
+  }
+
+  private void isTaskAttemptEqual(Collection<TaskAttemptInfo> info1,
+      Collection<TaskAttemptInfo> info2) {
+    assertTrue("sizes should be the same", info1.size() == info1.size());
+    Iterator<TaskAttemptInfo> it1 = info1.iterator();
+    Iterator<TaskAttemptInfo> it2 = info2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      isTaskAttemptEqual(it1.next(), it2.next());
+    }
+  }
+
+  private void isTaskAttemptEqual(TaskAttemptInfo info1, TaskAttemptInfo info2) {
+    assertTrue(info1 != null);
+    assertTrue(info2 != null);
+    assertTrue(info1.getTaskInfo() != null);
+    assertTrue(info2.getTaskInfo() != null);
+    assertTrue(info1.getStatus().equals(info2.getStatus()));
+    assertTrue(info1.getTaskInfo().getVertexInfo().getVertexName().equals(info2.getTaskInfo()
+        .getVertexInfo().getVertexName()));
+
+    //Verify counters
+    isCountersSame(info1, info2);
+  }
+
+
+  /**
+   * Create sample file for wordcount program
+   *
+   * @param inputLoc
+   * @throws IOException
+   */
+  private static void createSampleFile(Path inputLoc) throws IOException {
+    fs.deleteOnExit(inputLoc);
+    FSDataOutputStream out = fs.create(inputLoc);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+    for (int i = 0; i < 10; i++) {
+      writer.write("Sample " + RandomStringUtils.randomAlphanumeric(5));
+      writer.newLine();
+    }
+    writer.close();
+  }
+
+  private DagInfo getDagInfo(String dagId) throws TezException {
+    //Parse downloaded contents
+    File downloadedFile = new File(DOWNLOAD_DIR
+        + Path.SEPARATOR + dagId + ".zip");
+    ATSFileParser parser = new ATSFileParser(downloadedFile);
+    DagInfo dagInfo = parser.getDAGData(dagId);
+    assertTrue(dagInfo.getDagId().equals(dagId));
+    return dagInfo;
+  }
+
+  private void verifyCounter(Map<String, TezCounter> counterMap,
+      String counterGroupName, long expectedVal) {
+    //Iterate through group-->tezCounter
+    for (Map.Entry<String, TezCounter> entry : counterMap.entrySet()) {
+      if (counterGroupName != null) {
+        if (entry.getKey().equals(counterGroupName)) {
+          assertTrue(entry.getValue().getValue() == expectedVal);
+        }
+      } else {
+        assertTrue(entry.getValue().getValue() == expectedVal);
+      }
+    }
+  }
+
+  TezClient getTezClient(boolean withTimeline) throws Exception {
+    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    if (withTimeline) {
+      tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, withTimeline);
+      tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188");
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          ATSHistoryLoggingService.class.getName());
+    } else {
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          SimpleHistoryLoggingService.class.getName());
+    }
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+
+    TezClient tezClient = TezClient.create("WordCount", tezConf, false);
+    tezClient.start();
+    tezClient.waitTillReady();
+    return tezClient;
+  }
+
+  private String runWordCount(String tokenizerProcessor, String summationProcessor,
+      String dagName, boolean withTimeline)
+      throws Exception {
+    //HDFS path
+    Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis());
+
+    DataSourceDescriptor dataSource = MRInput.createConfigBuilder(conf,
+        TextInputFormat.class, inputLoc.toString()).build();
+
+    DataSinkDescriptor dataSink =
+        MROutput.createConfigBuilder(conf, TextOutputFormat.class, outputLoc.toString()).build();
+
+    Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create(
+        tokenizerProcessor)).addDataSource(INPUT, dataSource);
+
+    OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig
+        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
+            HashPartitioner.class.getName()).build();
+
+    Vertex summationVertex = Vertex.create(SUMMATION,
+        ProcessorDescriptor.create(summationProcessor), 1).addDataSink(OUTPUT, dataSink);
+
+    // Create DAG and add the vertices. Connect the producer and consumer vertices via the edge
+    DAG dag = DAG.create(dagName);
+    dag.addVertex(tokenizerVertex).addVertex(summationVertex).addEdge(
+        Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty()));
+
+    TezClient tezClient = getTezClient(withTimeline);
+
+    // Update Caller Context
+    CallerContext callerContext = CallerContext.create("TezExamples", "Tez WordCount Example Job");
+    ApplicationId appId = tezClient.getAppMasterApplicationId();
+    if (appId == null) {
+      appId = ApplicationId.newInstance(1001l, 1);
+    }
+    callerContext.setCallerIdAndType(appId.toString(), "TezApplication");
+    dag.setCallerContext(callerContext);
+
+    DAGClient client = tezClient.submitDAG(dag);
+    client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+    TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), 1);
+
+    if (tezClient != null) {
+      tezClient.stop();
+    }
+    return tezDAGID.toString();
+  }
+
+  /**
+   * Processor which would just throw exception.
+   */
+  public static class FailProcessor extends SimpleMRProcessor {
+    public FailProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      throw new Exception("Failing this processor for some reason");
+    }
+  }
+
+  private void verifyDagInfo(DagInfo dagInfo, boolean ats) {
+    if (ats) {
+      VersionInfo versionInfo = dagInfo.getVersionInfo();
+      assertTrue(versionInfo != null); //should be present post 0.5.4
+      assertTrue(versionInfo.getVersion() != null);
+      assertTrue(versionInfo.getRevision() != null);
+      assertTrue(versionInfo.getBuildTime() != null);
+    }
+
+    assertTrue(dagInfo.getStartTime() > 0);
+    assertTrue(dagInfo.getFinishTimeInterval() > 0);
+    assertTrue(dagInfo.getStartTimeInterval() == 0);
+    assertTrue(dagInfo.getStartTime() > 0);
+    if (dagInfo.getStatus().equalsIgnoreCase(DAGState.SUCCEEDED.toString())) {
+      assertTrue(dagInfo.getFinishTime() >= dagInfo.getStartTime());
+    }
+    assertTrue(dagInfo.getFinishTimeInterval() > dagInfo.getStartTimeInterval());
+
+    assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime());
+    assertTrue(dagInfo.getTimeTaken() > 0);
+
+    assertNotNull(dagInfo.getCallerContext());
+    assertEquals("TezExamples", dagInfo.getCallerContext().getContext());
+    assertEquals("Tez WordCount Example Job", dagInfo.getCallerContext().getBlob());
+    assertNotNull(dagInfo.getCallerContext().getCallerId());
+    assertEquals("TezApplication", dagInfo.getCallerContext().getCallerType());
+
+    //Verify all vertices
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0);
+    }
+
+    VertexInfo fastestVertex = dagInfo.getFastestVertex();
+    assertTrue(fastestVertex != null);
+
+    if (dagInfo.getStatus().equals(DAGState.SUCCEEDED)) {
+      assertTrue(dagInfo.getSlowestVertex() != null);
+    }
+  }
+
+  private void verifyVertex(VertexInfo vertexInfo, boolean hasFailedTasks) {
+    assertTrue(vertexInfo != null);
+    if (hasFailedTasks) {
+      assertTrue(vertexInfo.getFailedTasksCount() > 0);
+    }
+    assertTrue(vertexInfo.getStartTimeInterval() > 0);
+    assertTrue(vertexInfo.getStartTime() > 0);
+    assertTrue(vertexInfo.getFinishTimeInterval() > 0);
+    assertTrue(vertexInfo.getStartTimeInterval() < vertexInfo.getFinishTimeInterval());
+    assertTrue(vertexInfo.getVertexName() != null);
+    if (!hasFailedTasks) {
+      assertTrue(vertexInfo.getFinishTime() > 0);
+      assertTrue(vertexInfo.getFailedTasks().size() == 0);
+      assertTrue(vertexInfo.getSucceededTasksCount() == vertexInfo.getSuccessfulTasks().size());
+      assertTrue(vertexInfo.getFailedTasksCount() == 0);
+      assertTrue(vertexInfo.getAvgTaskDuration() > 0);
+      assertTrue(vertexInfo.getMaxTaskDuration() > 0);
+      assertTrue(vertexInfo.getMinTaskDuration() > 0);
+      assertTrue(vertexInfo.getTimeTaken() > 0);
+      assertTrue(vertexInfo.getStatus().equalsIgnoreCase(VertexState.SUCCEEDED.toString()));
+      assertTrue(vertexInfo.getCompletedTasksCount() > 0);
+      assertTrue(vertexInfo.getFirstTaskToStart() != null);
+      assertTrue(vertexInfo.getSucceededTasksCount() > 0);
+      assertTrue(vertexInfo.getTasks().size() > 0);
+    }
+
+    for (TaskInfo taskInfo : vertexInfo.getTasks()) {
+      if (taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())) {
+        verifyTask(taskInfo, false);
+      }
+    }
+
+    for (TaskInfo taskInfo : vertexInfo.getFailedTasks()) {
+      verifyTask(taskInfo, true);
+    }
+
+    assertTrue(vertexInfo.getProcessorClassName() != null);
+    assertTrue(vertexInfo.getStatus() != null);
+    assertTrue(vertexInfo.getDagInfo() != null);
+    assertTrue(vertexInfo.getInitTimeInterval() > 0);
+    assertTrue(vertexInfo.getNumTasks() > 0);
+  }
+
+  private void verifyTask(TaskInfo taskInfo, boolean hasFailedAttempts) {
+    assertTrue(taskInfo != null);
+    assertTrue(taskInfo.getStatus() != null);
+    assertTrue(taskInfo.getStartTimeInterval() > 0);
+
+    //Not testing for killed attempts. So if there are no failures, it should succeed
+    if (!hasFailedAttempts) {
+      assertTrue(taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString()));
+      assertTrue(taskInfo.getFinishTimeInterval() > 0 && taskInfo.getFinishTime() > taskInfo
+          .getFinishTimeInterval());
+      assertTrue(
+          taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > taskInfo.getStartTimeInterval());
+      assertTrue(taskInfo.getSuccessfulAttemptId() != null);
+      assertTrue(taskInfo.getSuccessfulTaskAttempt() != null);
+    }
+    assertTrue(taskInfo.getTaskId() != null);
+
+    for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+      verifyTaskAttemptInfo(attemptInfo);
+    }
+  }
+
+  private void verifyTaskAttemptInfo(TaskAttemptInfo attemptInfo) {
+    if (attemptInfo.getStatus() != null && attemptInfo.getStatus()
+        .equals(TaskAttemptState.SUCCEEDED)) {
+      assertTrue(attemptInfo.getStartTimeInterval() > 0);
+      assertTrue(attemptInfo.getFinishTimeInterval() > 0);
+      assertTrue(attemptInfo.getCreationTime() > 0);
+      assertTrue(attemptInfo.getAllocationTime() > 0);
+      assertTrue(attemptInfo.getStartTime() > 0);
+      assertTrue(attemptInfo.getFinishTime() > 0);
+      assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime());
+      assertTrue(attemptInfo.getFinishTime() > attemptInfo.getFinishTimeInterval());
+      assertTrue(attemptInfo.getStartTime() > attemptInfo.getStartTimeInterval());
+      assertTrue(attemptInfo.getNodeId() != null);
+      assertTrue(attemptInfo.getTimeTaken() != -1);
+      assertTrue(attemptInfo.getEvents() != null);
+      assertTrue(attemptInfo.getTezCounters() != null);
+      assertTrue(attemptInfo.getContainer() != null);
+    }
+    assertTrue(attemptInfo.getTaskInfo() != null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml b/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml
new file mode 100644
index 0000000..5bebb05
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml
@@ -0,0 +1,28 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<FindBugsFilter>
+
+
+  <Match>
+    <Class name="org.apache.tez.analyzer.CSVResult"/>
+    <Bug pattern="EI_EXPOSE_REP2"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.analyzer.CSVResult"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
+
+
+</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/pom.xml b/tez-tools/analyzers/job-analyzer/pom.xml
new file mode 100644
index 0000000..627c444
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/pom.xml
@@ -0,0 +1,168 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez-perf-analyzer</artifactId>
+    <version>0.7.1-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-job-analyzer</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>io.dropwizard.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-history-parser</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-yarn-timeline-history</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-yarn-timeline-history</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jettison</groupId>
+      <artifactId>jettison</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-json</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+    <plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+     <artifactId>maven-jar-plugin</artifactId>
+      <configuration>
+       <archive>
+         <manifest>
+           <mainClass>org.apache.tez.analyzer.plugins.AnalyzerDriver</mainClass>
+         </manifest>
+       </archive>
+     </configuration>
+    </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
new file mode 100644
index 0000000..6021c58
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+
+
+public interface Analyzer {
+
+  /**
+   * Analyze Dag
+   *
+   * @param dagInfo
+   * @throws TezException
+   */
+  public void analyze(DagInfo dagInfo) throws TezException;
+
+  /**
+   * Get the result of analysis
+   *
+   * @return analysis result
+   * @throws TezException
+   */
+  public Result getResult() throws TezException;
+
+  /**
+   * Get name of the analyzer
+   *
+   * @return name of analyze
+   */
+  public String getName();
+
+  /**
+   * Get description of the analyzer
+   *
+   * @return description of analyzer
+   */
+  public String getDescription();
+
+  /**
+   * Get config properties related to this analyzer
+   *
+   * @return config related to analyzer
+   */
+  public Configuration getConfiguration();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
new file mode 100644
index 0000000..5246c68
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.tez.dag.api.TezException;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Simple placeholder for storing CSV results.
+ * Contains headers and records in string format.
+ */
+public class CSVResult implements Result {
+
+  private final String[] headers;
+  private final List<String[]> recordsList;
+  private String comments;
+
+  public CSVResult(String[] header) {
+    this.headers = header;
+    recordsList = Lists.newLinkedList();
+  }
+
+  public String[] getHeaders() {
+    return headers;
+  }
+
+  public void addRecord(String[] record) {
+    Preconditions.checkArgument(record != null, "Record can't be null");
+    Preconditions.checkArgument(record.length == headers.length, "Record length" + record.length +
+        " does not match headers length " + headers.length);
+    recordsList.add(record);
+  }
+
+  public Iterator<String[]> getRecordsIterator() {
+    return Iterators.unmodifiableIterator(recordsList.iterator());
+  }
+
+
+  public void setComments(String comments) {
+    this.comments = comments;
+  }
+
+  @Override public String toJson() throws TezException {
+    return "";
+  }
+
+  @Override public String getComments() {
+    return comments;
+  }
+
+  @Override public String toString() {
+    return "CSVResult{" +
+        "headers=" + Arrays.toString(headers) +
+        ", recordsList=" + recordsList +
+        '}';
+  }
+
+  //For testing
+  public void dumpToFile(String fileName) throws IOException {
+    OutputStreamWriter writer = new OutputStreamWriter(
+        new FileOutputStream(new File(fileName)),
+        Charset.forName("UTF-8").newEncoder());
+    BufferedWriter bw = new BufferedWriter(writer);
+    bw.write(Joiner.on(",").join(headers));
+    bw.newLine();
+    for (String[] record : recordsList) {
+
+      if (record.length != headers.length) {
+        continue; //LOG error msg?
+      }
+
+      StringBuilder sb = new StringBuilder();
+      for(int i=0;i<record.length;i++) {
+        sb.append(!Strings.isNullOrEmpty(record[i]) ? record[i] : " ");
+        if (i < record.length - 1) {
+          sb.append(",");
+        }
+      }
+      bw.write(sb.toString());
+      bw.newLine();
+    }
+    bw.flush();
+    bw.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java
new file mode 100644
index 0000000..d1881eb
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer;
+
+import org.apache.tez.dag.api.TezException;
+
+public interface Result {
+
+  /**
+   * Convert result to json format
+   *
+   * @return json
+   * @throws TezException
+   */
+  public String toJson() throws TezException;
+
+  /**
+   * Recommendation / comments about the analysis if any.
+   *
+   * @return comments
+   */
+  public String getComments();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
new file mode 100644
index 0000000..57b21cb
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import org.apache.hadoop.util.ProgramDriver;
+
+public class AnalyzerDriver {
+
+  public static void main(String argv[]){
+    int exitCode = -1;
+    ProgramDriver pgd = new ProgramDriver();
+    try {
+      pgd.addClass("CriticalPath", CriticalPathAnalyzer.class,
+          "Find the critical path of a DAG");
+      pgd.addClass("ContainerReuseAnalyzer", ContainerReuseAnalyzer.class,
+          "Print container reuse details in a DAG");
+      pgd.addClass("LocalityAnalyzer", LocalityAnalyzer.class,
+          "Print locality details in a DAG");
+      pgd.addClass("ShuffleTimeAnalyzer", ShuffleTimeAnalyzer.class,
+          "Analyze the shuffle time details in a DAG");
+      pgd.addClass("SkewAnalyzer", SkewAnalyzer.class,
+          "Analyze the skew details in a DAG");
+      pgd.addClass("SlowestVertexAnalyzer", SlowestVertexAnalyzer.class,
+          "Print slowest vertex details in a DAG");
+      pgd.addClass("SlowNodeAnalyzer", SlowNodeAnalyzer.class,
+          "Print node details in a DAG");
+      pgd.addClass("SlowTaskIdentifier", SlowTaskIdentifier.class,
+          "Print slow task details in a DAG");
+      pgd.addClass("SpillAnalyzer", SpillAnalyzerImpl.class,
+          "Print spill details in a DAG");
+      pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class,
+          "Print the task concurrency details in a DAG");
+      pgd.addClass("VertexLevelCriticalPathAnalyzer", VertexLevelCriticalPathAnalyzer.class,
+          "Find critical path at vertex level in a DAG");
+      exitCode = pgd.run(argv);
+    } catch(Throwable e){
+      e.printStackTrace();
+    }
+
+    System.exit(exitCode);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
new file mode 100644
index 0000000..5b862f8
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.Container;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+
+
+/**
+ * Get container reuse information at a per vertex level basis.
+ */
+public class ContainerReuseAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+  private final Configuration config;
+
+  private static final String[] headers =
+      { "vertexName", "taskAttempts", "node", "containerId", "reuseCount" };
+
+  private final CSVResult csvResult;
+
+  public ContainerReuseAnalyzer(Configuration config) {
+    this.config = config;
+    this.csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      Multimap<Container, TaskAttemptInfo> containers = vertexInfo.getContainersMapping();
+      for (Container container : containers.keySet()) {
+        List<String> record = Lists.newLinkedList();
+        record.add(vertexInfo.getVertexName());
+        record.add(vertexInfo.getTaskAttempts().size() + "");
+        record.add(container.getHost());
+        record.add(container.getId());
+        record.add(Integer.toString(containers.get(container).size()));
+        csvResult.addRecord(record.toArray(new String[record.size()]));
+      }
+    }
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Container Reuse Analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Get details on container reuse analysis";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    ContainerReuseAnalyzer analyzer = new ContainerReuseAnalyzer(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
new file mode 100644
index 0000000..d4efdf9
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -0,0 +1,646 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType;
+import org.apache.tez.analyzer.utils.SVGUtils;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+import org.apache.tez.history.parser.datamodel.Container;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+  String succeededState = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name());
+  String failedState = StringInterner.weakIntern(TaskAttemptState.FAILED.name());
+
+  public enum CriticalPathDependency {
+    DATA_DEPENDENCY,
+    INIT_DEPENDENCY,
+    COMMIT_DEPENDENCY,
+    RETRY_DEPENDENCY,
+    OUTPUT_RECREATE_DEPENDENCY
+  }
+
+  public static final String DRAW_SVG = "tez.critical-path-analyzer.draw-svg";
+
+  public static class CriticalPathStep {
+    public enum EntityType {
+      ATTEMPT,
+      VERTEX_INIT,
+      DAG_COMMIT
+    }
+
+    EntityType type;
+    TaskAttemptInfo attempt;
+    CriticalPathDependency reason; // reason linking this to the previous step on the critical path
+    long startCriticalPathTime; // time at which attempt is on critical path
+    long stopCriticalPathTime; // time at which attempt is off critical path
+    List<String> notes = Lists.newLinkedList();
+    
+    public CriticalPathStep(TaskAttemptInfo attempt, EntityType type) {
+      this.type = type;
+      this.attempt = attempt;
+    }
+    public EntityType getType() {
+      return type;
+    }
+    public TaskAttemptInfo getAttempt() {
+      return attempt;
+    }
+    public long getStartCriticalTime() {
+      return startCriticalPathTime;
+    }
+    public long getStopCriticalTime() {
+      return stopCriticalPathTime;
+    }
+    public CriticalPathDependency getReason() {
+      return reason;
+    }
+    public List<String> getNotes() {
+      return notes;
+    }
+  }
+  
+  List<CriticalPathStep> criticalPath = Lists.newLinkedList();
+  
+  Map<String, TaskAttemptInfo> attempts = Maps.newHashMap();
+
+  int maxConcurrency = 0;
+  ArrayList<TimeInfo> concurrencyByTime = Lists.newArrayList();
+
+  public CriticalPathAnalyzer() {
+  }
+
+  @Override 
+  public void analyze(DagInfo dagInfo) throws TezException {
+    // get all attempts in the dag and find the last failed/succeeded attempt.
+    // ignore killed attempt to handle kills that happen upon dag completion
+    TaskAttemptInfo lastAttempt = null;
+    long lastAttemptFinishTime = 0;
+    for (VertexInfo vertex : dagInfo.getVertices()) {
+      for (TaskInfo task : vertex.getTasks()) {
+        for (TaskAttemptInfo attempt : task.getTaskAttempts()) { 
+          attempts.put(attempt.getTaskAttemptId(), attempt);
+          if (attempt.getStatus().equals(succeededState) ||
+              attempt.getStatus().equals(failedState)) {
+            if (lastAttemptFinishTime < attempt.getFinishTime()) {
+              lastAttempt = attempt;
+              lastAttemptFinishTime = attempt.getFinishTime();
+            }
+          }
+        }
+      }
+    }
+    
+    if (lastAttempt == null) {
+      System.out.println("Cannot find last attempt to finish in DAG " + dagInfo.getDagId());
+      return;
+    }
+    
+    createCriticalPath(dagInfo, lastAttempt, lastAttemptFinishTime, attempts);
+    
+    analyzeCriticalPath(dagInfo);
+
+    if (getConf().getBoolean(DRAW_SVG, true)) {
+      saveCriticalPathAsSVG(dagInfo);
+    }
+  }
+  
+  public List<CriticalPathStep> getCriticalPath() {
+    return criticalPath;
+  }
+  
+  private void saveCriticalPathAsSVG(DagInfo dagInfo) {
+    SVGUtils svg = new SVGUtils();
+    String outputFileName = getOutputDir() + File.separator + dagInfo.getDagId() + ".svg";
+    System.out.println("Writing output to: " + outputFileName);
+    svg.saveCriticalPathAsSVG(dagInfo, outputFileName, criticalPath);
+  }
+  
+  static class TimeInfo implements Comparable<TimeInfo> {
+    long timestamp;
+    int count;
+    boolean start;
+    TimeInfo(long timestamp, boolean start) {
+      this.timestamp = timestamp;
+      this.start = start;
+    }
+    
+    @Override
+    public int compareTo(TimeInfo o) {
+      return Long.compare(this.timestamp, o.timestamp);
+    }
+    
+    @Override
+    public int hashCode() {
+      return (int)((timestamp >> 32) ^ timestamp);
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if(o == null) {
+        return false;
+      }
+      if (o.getClass() == this.getClass()) {
+        TimeInfo other = (TimeInfo) o;
+        return (this.compareTo(other) == 0);
+      }
+      else {
+        return false;
+      }
+    }
+  }
+  
+  private void determineConcurrency(DagInfo dag) {
+    ArrayList<TimeInfo> timeInfo = Lists.newArrayList();
+    for (VertexInfo v : dag.getVertices()) {
+      for (TaskInfo t : v.getTasks()) {
+        for (TaskAttemptInfo a : t.getTaskAttempts()) {
+          if (a.getStartTime() > 0) {
+            timeInfo.add(new TimeInfo(a.getStartTime(), true));
+            timeInfo.add(new TimeInfo(a.getFinishTime(), false));
+          }
+        }
+      }
+    }
+    Collections.sort(timeInfo);
+    
+    int concurrency = 0;
+    TimeInfo lastTimeInfo = null;
+    for (TimeInfo t : timeInfo) {
+      concurrency += (t.start) ? 1 : -1;
+      maxConcurrency = (concurrency > maxConcurrency) ? concurrency : maxConcurrency;
+      if (lastTimeInfo == null || lastTimeInfo.timestamp < t.timestamp) {
+        lastTimeInfo = t;
+        lastTimeInfo.count = concurrency;
+        concurrencyByTime.add(lastTimeInfo);        
+      } else {
+        // lastTimeInfo.timestamp == t.timestamp
+        lastTimeInfo.count = concurrency;
+      }
+    }
+//    for (TimeInfo t : concurrencyByTime) {
+//      System.out.println(t.timestamp + " " + t.count);
+//    }
+  }
+  
+  private int getIntervalMaxConcurrency(long begin, long end) {
+    int concurrency = 0;
+    for (TimeInfo timeInfo : concurrencyByTime) {
+      if (timeInfo.timestamp < begin) {
+        continue;
+      }
+      if (timeInfo.timestamp > end) {
+        break;
+      }
+      if (timeInfo.count > concurrency) {
+        concurrency = timeInfo.count;
+      }
+    }
+    return concurrency;
+  }
+  
+  private void analyzeAllocationOverhead(DagInfo dag) {
+    List<TaskAttemptInfo> preemptedAttempts = Lists.newArrayList();
+    for (VertexInfo v : dag.getVertices()) {
+      for (TaskInfo t : v.getTasks()) {
+        for (TaskAttemptInfo a : t.getTaskAttempts()) {
+          if (a.getTerminationCause().equals(
+              TaskAttemptTerminationCause.INTERNAL_PREEMPTION.name())) {
+            System.out.println("Found preempted attempt " + a.getTaskAttemptId());
+            preemptedAttempts.add(a);
+          }
+        }
+      }
+    }
+    for (int i = 0; i < criticalPath.size(); ++i) {
+      CriticalPathStep step = criticalPath.get(i);
+      TaskAttemptInfo attempt = step.attempt;
+      if (step.getType() != EntityType.ATTEMPT) {
+        continue;
+      }
+      
+      long creationTime = attempt.getCreationTime();
+      long allocationTime = attempt.getAllocationTime();
+      long finishTime = attempt.getFinishTime();
+      if (allocationTime < step.startCriticalPathTime) {
+        // allocated before it became critical
+        continue;
+      }
+
+      // the attempt is critical before allocation. So allocation overhead needs analysis
+      Container container = attempt.getContainer();
+      if (container != null) {
+        Collection<TaskAttemptInfo> attempts = dag.getContainerMapping().get(container);
+        if (attempts != null && !attempts.isEmpty()) {
+          // arrange attempts by allocation time
+          List<TaskAttemptInfo> attemptsList = Lists.newArrayList(attempts);
+          Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime());
+          // walk the list to record allocation time before the current attempt
+          long containerPreviousAllocatedTime = 0;
+          int reUsesForVertex = 1;
+          for (TaskAttemptInfo containerAttempt : attemptsList) {
+            if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) {
+              break;
+            }
+            if (containerAttempt.getTaskInfo().getVertexInfo().getVertexId().equals(
+                attempt.getTaskInfo().getVertexInfo().getVertexId())) {
+              // another task from the same vertex ran in this container. So there are multiple 
+              // waves for this vertex on this container.
+              reUsesForVertex++;
+            }
+            long cAllocTime = containerAttempt.getAllocationTime();
+            long cFinishTime = containerAttempt.getFinishTime();
+            if (cFinishTime > creationTime) {
+              // for containerAttempts that used the container while this attempt was waiting
+              // add up time container was allocated to containerAttempt. Account for allocations
+              // that started before this attempt was created.
+              containerPreviousAllocatedTime += 
+                  (cFinishTime - (cAllocTime > creationTime ? cAllocTime : creationTime));
+            }
+          }
+          int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks();
+          int intervalMaxConcurrency = getIntervalMaxConcurrency(creationTime, finishTime);
+          double numWaves = getWaves(numVertexTasks, intervalMaxConcurrency);
+          
+          if (reUsesForVertex > 1) {
+            step.notes.add("Container ran multiple tasks for this vertex. ");
+            if (numWaves < 1) {
+              // less than 1 wave total but still ran more than 1 on this container
+              step.notes.add("Vertex potentially seeing contention from other branches in the DAG. ");
+            }
+          }
+          if (containerPreviousAllocatedTime == 0) {
+            step.notes.add("Container newly allocated.");
+          } else {
+            if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) {
+              step.notes.add("Container was fully allocated");
+            } else {
+              step.notes.add("Container in use for " + 
+              SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " +
+                  SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) + 
+                  " of allocation wait time");
+            }
+          }
+        }
+        // look for internal preemptions while attempt was waiting for allocation
+        for (TaskAttemptInfo a : preemptedAttempts) {
+          if (a.getTaskInfo().getVertexInfo().getVertexId()
+              .equals(attempt.getTaskInfo().getVertexInfo().getVertexId())) {
+            // dont preempt same vertex task. ideally this should look at priority but we dont have it
+            continue;
+          }
+          if (a.getFinishTime() > creationTime && a.getFinishTime() < allocationTime) {
+            // found an attempt that was preempted within this time interval
+            step.notes.add("Potentially waited for preemption of " + a.getShortName());
+          }
+        }
+      }
+    }
+  }
+  
+  private double getWaves(int numTasks, int concurrency) {
+    double numWaves = (numTasks*1.0) / concurrency;
+    numWaves = (double)Math.round(numWaves * 10d) / 10d; // convert to 1 decimal place
+    return numWaves;
+  }
+  
+  private void analyzeWaves(DagInfo dag) {
+    for (int i = 0; i < criticalPath.size(); ++i) {
+      CriticalPathStep step = criticalPath.get(i);
+      TaskAttemptInfo attempt = step.attempt;
+      if (step.getType() != EntityType.ATTEMPT) {
+        continue;
+      }
+      long creationTime = attempt.getCreationTime();
+      long finishTime = attempt.getFinishTime();
+
+      int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks();
+      if (numVertexTasks <= 1) {
+        continue;
+      }
+      int intervalMaxConcurrency = getIntervalMaxConcurrency(creationTime, finishTime);
+      double numWaves = getWaves(numVertexTasks, intervalMaxConcurrency);
+
+      step.notes.add("Vertex ran " + numVertexTasks
+          + " tasks in " + numWaves
+          + " waves with available concurrency of " + intervalMaxConcurrency);
+      if (numWaves > 1) {
+        if (numWaves%1 < 0.5) {
+          // more than 1 wave needed and last wave is small
+          step.notes.add("Last partial wave did not use full concurrency. ");
+        }
+      }
+    }
+  }
+  
+  private void analyzeStragglers(DagInfo dag) {
+    long dagStartTime = dag.getStartTime();
+    long dagTime = dag.getFinishTime() - dagStartTime;
+    long totalAttemptCriticalTime = 0;
+    for (int i = 0; i < criticalPath.size(); ++i) {
+      CriticalPathStep step = criticalPath.get(i);
+      totalAttemptCriticalTime += (step.stopCriticalPathTime - step.startCriticalPathTime);
+      TaskAttemptInfo attempt = step.attempt;
+      if (step.getType() == EntityType.ATTEMPT) {
+        // analyze execution overhead
+        if (attempt.getLastDataEvents().size() > 1) {
+          // there were read errors. that could have delayed the attempt. ignore this
+          continue;
+        }
+        long avgPostDataExecutionTime = attempt.getTaskInfo().getVertexInfo()
+            .getAvgPostDataExecutionTimeInterval();
+        if (avgPostDataExecutionTime <= 0) {
+          continue;
+        }
+        long attemptExecTime = attempt.getPostDataExecutionTimeInterval();
+        if (avgPostDataExecutionTime * 1.25 < attemptExecTime) {
+          step.notes
+              .add("Potential straggler. Post Data Execution time " + 
+                  SVGUtils.getTimeStr(attemptExecTime)
+                  + " compared to vertex average of " + 
+                  SVGUtils.getTimeStr(avgPostDataExecutionTime));
+        }
+      }
+    }
+    System.out
+        .println("DAG time taken: " + dagTime + " TotalAttemptTime: " + totalAttemptCriticalTime
+            + " DAG finish time: " + dag.getFinishTime() + " DAG start time: " + dagStartTime);
+  }
+  
+  private void analyzeCriticalPath(DagInfo dag) {
+    if (!criticalPath.isEmpty()) {
+      determineConcurrency(dag);
+      analyzeStragglers(dag);
+      analyzeWaves(dag);
+      analyzeAllocationOverhead(dag);
+    }
+  }
+  
+  private void createCriticalPath(DagInfo dagInfo, TaskAttemptInfo lastAttempt,
+      long lastAttemptFinishTime, Map<String, TaskAttemptInfo> attempts) {
+    List<CriticalPathStep> tempCP = Lists.newLinkedList();
+    if (lastAttempt != null) {
+      TaskAttemptInfo currentAttempt = lastAttempt;
+      CriticalPathStep currentStep = new CriticalPathStep(currentAttempt, EntityType.DAG_COMMIT);
+      long currentAttemptStopCriticalPathTime = lastAttemptFinishTime;
+
+      // add the commit step
+      if (dagInfo.getFinishTime() > 0) {
+        currentStep.stopCriticalPathTime = dagInfo.getFinishTime();
+      } else {
+        // AM crashed and no dag finished written
+        currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime;
+      }
+      currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime;
+      currentStep.reason = CriticalPathDependency.COMMIT_DEPENDENCY;
+      tempCP.add(currentStep);
+
+      while (true) {
+        Preconditions.checkState(currentAttempt != null);
+        Preconditions.checkState(currentAttemptStopCriticalPathTime > 0);
+        System.out.println(
+            "Step: " + tempCP.size() + " Attempt: " + currentAttempt.getTaskAttemptId());
+        
+        currentStep = new CriticalPathStep(currentAttempt, EntityType.ATTEMPT);
+        currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime;
+
+        // consider the last data event seen immediately preceding the current critical path 
+        // stop time for this attempt
+        long currentStepLastDataEventTime = 0;
+        String currentStepLastDataTA = null;
+        DataDependencyEvent item = currentAttempt.getLastDataEventInfo(currentStep.stopCriticalPathTime);
+        if (item!=null) {
+          currentStepLastDataEventTime = item.getTimestamp();
+          currentStepLastDataTA = item.getTaskAttemptId();
+        }
+
+        // sanity check
+        for (CriticalPathStep previousStep : tempCP) {
+          if (previousStep.type == EntityType.ATTEMPT) {
+            if (previousStep.attempt.getTaskAttemptId().equals(currentAttempt.getTaskAttemptId())) {
+              // found loop.
+              // this should only happen for read errors in currentAttempt
+              List<DataDependencyEvent> dataEvents = currentAttempt.getLastDataEvents();
+              Preconditions.checkState(dataEvents.size() > 1); // received
+                                                               // original and
+                                                               // retry data events
+              Preconditions.checkState(currentStepLastDataEventTime < dataEvents
+                  .get(dataEvents.size() - 1).getTimestamp()); // new event is
+                                                               // earlier than
+                                                               // last
+            }
+          }
+        }
+
+        tempCP.add(currentStep);
+  
+        // find the next attempt on the critical path
+        boolean dataDependency = false;
+        // find out predecessor dependency
+        if (currentStepLastDataEventTime > currentAttempt.getCreationTime()) {
+          dataDependency = true;
+        }
+  
+        long startCriticalPathTime = 0;
+        String nextAttemptId = null;
+        CriticalPathDependency reason = null;
+        if (dataDependency) {
+          // last data event was produced after the attempt was scheduled. use
+          // data dependency
+          // typically the case when scheduling ahead of time
+          System.out.println("Has data dependency");
+          if (!Strings.isNullOrEmpty(currentStepLastDataTA)) {
+            // there is a valid data causal TA. Use it.
+            nextAttemptId = currentStepLastDataTA;
+            reason = CriticalPathDependency.DATA_DEPENDENCY;
+            startCriticalPathTime = currentStepLastDataEventTime;
+            System.out.println("Using data dependency " + nextAttemptId);
+          } else {
+            // there is no valid data causal TA. This means data event came from the same vertex
+            VertexInfo vertex = currentAttempt.getTaskInfo().getVertexInfo();
+            Preconditions.checkState(!vertex.getAdditionalInputInfoList().isEmpty(),
+                "Vertex: " + vertex.getVertexId() + " has no external inputs but the last data event "
+                    + "TA is null for " + currentAttempt.getTaskAttemptId());
+            nextAttemptId = null;
+            reason = CriticalPathDependency.INIT_DEPENDENCY;
+            System.out.println("Using init dependency");
+          }
+        } else {
+          // attempt was scheduled after last data event. use scheduling dependency
+          // typically happens for retries
+          System.out.println("Has scheduling dependency");
+          if (!Strings.isNullOrEmpty(currentAttempt.getCreationCausalTA())) {
+            // there is a scheduling causal TA. Use it.
+            nextAttemptId = currentAttempt.getCreationCausalTA();
+            reason = CriticalPathDependency.RETRY_DEPENDENCY;
+            TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId);
+            if (nextAttemptId != null) {
+              VertexInfo currentVertex = currentAttempt.getTaskInfo().getVertexInfo();
+              VertexInfo nextVertex = nextAttempt.getTaskInfo().getVertexInfo();
+              if (!nextVertex.getVertexName().equals(currentVertex.getVertexName())){
+                // cause from different vertex. Might be rerun to re-generate outputs
+                for (VertexInfo outVertex : currentVertex.getOutputVertices()) {
+                  if (nextVertex.getVertexName().equals(outVertex.getVertexName())) {
+                    // next vertex is an output vertex
+                    reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY;
+                    break;
+                  }
+                }
+              }
+            }
+            if (reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) {
+              // rescheduled due to read error. start critical at read error report time.
+              // for now proxy own creation time for read error report time
+              startCriticalPathTime = currentAttempt.getCreationTime();
+            } else {
+              // rescheduled due to own previous attempt failure
+              // we are critical when the previous attempt fails
+              Preconditions.checkState(nextAttempt != null);
+              Preconditions.checkState(nextAttempt.getTaskInfo().getTaskId().equals(
+                  currentAttempt.getTaskInfo().getTaskId()));
+              startCriticalPathTime = nextAttempt.getFinishTime();
+            }
+            System.out.println("Using scheduling dependency " + nextAttemptId);
+          } else {
+            // there is no scheduling causal TA.
+            if (!Strings.isNullOrEmpty(currentStepLastDataTA)) {
+              // there is a data event going to the vertex. Count the time between data event and
+              // creation time as Initializer/Manager overhead and follow data dependency
+              nextAttemptId = currentStepLastDataTA;
+              reason = CriticalPathDependency.DATA_DEPENDENCY;
+              startCriticalPathTime = currentStepLastDataEventTime;
+              long overhead = currentAttempt.getCreationTime() - currentStepLastDataEventTime;
+              currentStep.notes
+                  .add("Initializer/VertexManager scheduling overhead " + SVGUtils.getTimeStr(overhead));
+              System.out.println("Using data dependency " + nextAttemptId);
+            } else {
+              // there is no scheduling causal TA and no data event casual TA.
+              // the vertex has external input that sent the last data events
+              // or the vertex has external input but does not use events
+              // or the vertex has no external inputs or edges
+              nextAttemptId = null;
+              reason = CriticalPathDependency.INIT_DEPENDENCY;
+              System.out.println("Using init dependency");
+            }
+          }
+        }
+
+        currentStep.startCriticalPathTime = startCriticalPathTime;
+        currentStep.reason = reason;
+        
+        Preconditions.checkState(currentStep.stopCriticalPathTime >= currentStep.startCriticalPathTime);
+  
+        if (Strings.isNullOrEmpty(nextAttemptId)) {
+          Preconditions.checkState(reason.equals(CriticalPathDependency.INIT_DEPENDENCY));
+          Preconditions.checkState(startCriticalPathTime == 0);
+          // no predecessor attempt found. this is the last step in the critical path
+          // assume attempts start critical path time is when its scheduled. before that is 
+          // vertex initialization time
+          currentStep.startCriticalPathTime = currentStep.attempt.getCreationTime();
+          
+          // add vertex init step
+          long initStepStopCriticalTime = currentStep.startCriticalPathTime;
+          currentStep = new CriticalPathStep(currentAttempt, EntityType.VERTEX_INIT);
+          currentStep.stopCriticalPathTime = initStepStopCriticalTime;
+          currentStep.startCriticalPathTime = dagInfo.getStartTime();
+          currentStep.reason = CriticalPathDependency.INIT_DEPENDENCY;
+          tempCP.add(currentStep);
+          
+          if (!tempCP.isEmpty()) {
+            for (int i=tempCP.size() - 1; i>=0; --i) {
+              criticalPath.add(tempCP.get(i));
+            }
+          }
+          return;
+        }
+  
+        currentAttempt = attempts.get(nextAttemptId);
+        currentAttemptStopCriticalPathTime = startCriticalPathTime;
+      }
+    }
+  }
+  
+  @Override
+  public CSVResult getResult() throws TezException {
+    String[] headers = { "Entity", "PathReason", "Status", "CriticalStartTime", 
+        "CriticalStopTime", "Notes" };
+
+    CSVResult csvResult = new CSVResult(headers);
+    for (CriticalPathStep step : criticalPath) {
+      String entity = (step.getType() == EntityType.ATTEMPT ? step.getAttempt().getTaskAttemptId()
+          : (step.getType() == EntityType.VERTEX_INIT
+              ? step.attempt.getTaskInfo().getVertexInfo().getVertexName() : "DAG COMMIT"));
+      String [] record = {entity, step.getReason().name(), 
+          step.getAttempt().getDetailedStatus(), String.valueOf(step.getStartCriticalTime()), 
+          String.valueOf(step.getStopCriticalTime()),
+          Joiner.on(";").join(step.getNotes())};
+      csvResult.addRecord(record);
+    }
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "CriticalPathAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze critical path of the DAG";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return getConf();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new CriticalPathAnalyzer(), args);
+    System.exit(res);
+  }
+
+}