You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/08/11 14:46:18 UTC

[1/2] tez git commit: TEZ-2692. bugfixes & enhancements related to job parser and analyzer (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master eadbfec44 -> ecd90dc1f


http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
index 8df40ba..a570493 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.analyzer.plugins;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.analyzer.Analyzer;
@@ -43,15 +44,21 @@ import java.util.Map;
  */
 public class ShuffleTimeAnalyzer implements Analyzer {
 
-  private static final String SHUFFLE_TIME_RATIO = "tez.shuffle-time-analyzer.shuffle.ratio";
-  private static final float SHUFFLE_TIME_RATIO_DEFAULT = 0.5f;
+  /**
+   * ratio of (total time taken by task - shuffle time) / (total time taken by task)
+   */
+  private static final String REAL_WORK_DONE_RATIO = "tez.shuffle-time-analyzer.real-work.done.ratio";
+  private static final float REAL_WORK_DONE_RATIO_DEFAULT = 0.5f;
 
+  /**
+   * Number of min records that needs to get in as reduce input records.
+   */
   private static final String MIN_SHUFFLE_RECORDS = "tez.shuffle-time-analyzer.shuffle.min.records";
   private static final long MIN_SHUFFLE_RECORDS_DEFAULT = 10000;
 
   private static final String[] headers = { "vertexName", "taskAttemptId", "Node", "counterGroup",
       "Comments", "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES",
-      "Time taken to receive all events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME",
+      "TotalTime", "Time_taken_to_receive_all_events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME",
       "TimeTaken_For_Real_Task", "FIRST_EVENT_RECEIVED", "LAST_EVENT_RECEIVED",
       "SHUFFLE_BYTES_DISK_DIRECT" };
 
@@ -59,15 +66,15 @@ public class ShuffleTimeAnalyzer implements Analyzer {
 
   private final Configuration config;
 
-  private final float shuffleTimeRatio;
+  private final float realWorkDoneRatio;
   private final long minShuffleRecords;
 
 
   public ShuffleTimeAnalyzer(Configuration config) {
     this.config = config;
 
-    shuffleTimeRatio = config.getFloat
-        (SHUFFLE_TIME_RATIO, SHUFFLE_TIME_RATIO_DEFAULT);
+    realWorkDoneRatio = config.getFloat
+        (REAL_WORK_DONE_RATIO, REAL_WORK_DONE_RATIO_DEFAULT);
     minShuffleRecords = config.getLong(MIN_SHUFFLE_RECORDS, MIN_SHUFFLE_RECORDS_DEFAULT);
   }
 
@@ -105,15 +112,20 @@ public class ShuffleTimeAnalyzer implements Analyzer {
             result.add(counterGroupName);
 
             //Real work done in the task
-            long timeTakenForRealWork = attemptInfo.getTimeTaken() -
-                Long.parseLong(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName,
-                    attemptInfo));
-
             String comments = "";
-            if ((timeTakenForRealWork * 1.0f / attemptInfo.getTimeTaken()) < shuffleTimeRatio) {
-              comments = "Time taken in shuffle is more than the actual work being done in task. "
-                  + " Check if source/destination machine is a slow node. Check if merge phase "
-                  + "time is more to understand disk bottlenecks in this node.  Check for skew";
+            String mergePhaseTime = getCounterValue(TaskCounter.MERGE_PHASE_TIME,
+                counterGroupName, attemptInfo);
+            String timeTakenForRealWork = "";
+            if (!Strings.isNullOrEmpty(mergePhaseTime)) {
+              long realWorkDone = attemptInfo.getTimeTaken() - Long.parseLong(mergePhaseTime);
+
+              if ((realWorkDone * 1.0f / attemptInfo.getTimeTaken()) < realWorkDoneRatio) {
+                comments = "Time taken in shuffle is more than the actual work being done in task. "
+                    + " Check if source/destination machine is a slow node. Check if merge phase "
+                    + "time is more to understand disk bottlenecks in this node.  Check for skew";
+              }
+
+              timeTakenForRealWork = Long.toString(realWorkDone);
             }
             result.add(comments);
 
@@ -122,13 +134,14 @@ public class ShuffleTimeAnalyzer implements Analyzer {
             result.add("" + (1.0f * reduceInputGroupsVal / reduceInputRecordsVal));
             result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES, counterGroupName, attemptInfo));
 
+            result.add(Long.toString(attemptInfo.getTimeTaken()));
+
             //Total time taken for receiving all events from source tasks
             result.add(getOverheadFromSourceTasks(counterGroupName, attemptInfo));
             result.add(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName, attemptInfo));
             result.add(getCounterValue(TaskCounter.SHUFFLE_PHASE_TIME, counterGroupName, attemptInfo));
 
-
-            result.add(Long.toString(timeTakenForRealWork));
+            result.add(timeTakenForRealWork);
 
             result.add(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, counterGroupName, attemptInfo));
             result.add(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, counterGroupName, attemptInfo));
@@ -150,11 +163,16 @@ public class ShuffleTimeAnalyzer implements Analyzer {
    * @return String
    */
   private String getOverheadFromSourceTasks(String counterGroupName, TaskAttemptInfo attemptInfo) {
-    long firstEventReceived = Long.parseLong(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED,
-        counterGroupName, attemptInfo));
-    long lastEventReceived = Long.parseLong(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED,
-        counterGroupName, attemptInfo));
-    return Long.toString(lastEventReceived - firstEventReceived);
+    String firstEventReceived = getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED,
+        counterGroupName, attemptInfo);
+    String lastEventReceived = getCounterValue(TaskCounter.LAST_EVENT_RECEIVED,
+        counterGroupName, attemptInfo);
+
+    if (!Strings.isNullOrEmpty(firstEventReceived) && !Strings.isNullOrEmpty(lastEventReceived)) {
+      return Long.toString(Long.parseLong(lastEventReceived) - Long.parseLong(firstEventReceived));
+    } else {
+      return "";
+    }
   }
 
   private String getCounterValue(TaskCounter counter, String counterGroupName,

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
index 8152344..f09380d 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
@@ -57,6 +57,10 @@ import java.util.Map;
  */
 public class SkewAnalyzer implements Analyzer {
 
+  /**
+   * Amount of bytes that was sent as shuffle bytes from source. If it is below this threshold,
+   * it would not be considered for analysis.
+   */
   private static final String SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE = "tez.skew-analyzer.shuffle"
       + ".bytes.per.source";
   private static final long SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT = 900 * 1024 * 1024l;

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
index 7c7f5c0..1a8d9d3 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
@@ -41,7 +41,7 @@ import java.util.List;
 public class SlowTaskIdentifier implements Analyzer {
 
   private static final String[] headers = { "vertexName", "taskAttemptId",
-      "Node", "taskDuration", "Status",
+      "Node", "taskDuration", "Status", "diagnostics",
       "NoOfInputs" };
 
   private final CSVResult csvResult;
@@ -72,14 +72,21 @@ public class SlowTaskIdentifier implements Analyzer {
       }
     });
 
-    int limit = config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT);
-    for(int i=0;i<limit;i++) {
+    int limit = Math.min(taskAttempts.size(),
+        Math.max(0, config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT)));
+
+    if (limit == 0) {
+      return;
+    }
+
+    for (int i = 0; i < limit - 1; i++) {
       List<String> record = Lists.newLinkedList();
       record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getVertexName());
       record.add(taskAttempts.get(i).getTaskAttemptId());
       record.add(taskAttempts.get(i).getContainer().getHost());
       record.add(taskAttempts.get(i).getTimeTaken() + "");
       record.add(taskAttempts.get(i).getStatus());
+      record.add(taskAttempts.get(i).getDiagnostics());
       record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getInputEdges().size() + "");
 
       csvResult.addRecord(record.toArray(new String[record.size()]));

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
index b7fca0b..c8d9695 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
@@ -29,6 +29,7 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
 import org.apache.tez.history.parser.datamodel.VertexInfo;
 
 import java.util.List;
@@ -41,7 +42,7 @@ public class SlowestVertexAnalyzer implements Analyzer {
 
   private static final String[] headers = { "vertexName", "taskAttempts", "totalTime",
       "shuffleTime", "shuffleTime_Max", "LastEventReceived", "LastEventReceivedFrom",
-      "TimeTaken_ForRealWork", "75thPercentile", "95thPercentile", "98thPercentile", "Median",
+      "75thPercentile", "95thPercentile", "98thPercentile", "Median",
       "observation", "comments" };
 
   private final CSVResult csvResult = new CSVResult(headers);
@@ -50,8 +51,27 @@ public class SlowestVertexAnalyzer implements Analyzer {
   private final MetricRegistry metrics = new MetricRegistry();
   private Histogram taskAttemptRuntimeHistorgram;
 
+  private final static String MAX_VERTEX_RUNTIME = "tez.slowest-vertex-analyzer.max.vertex.runtime";
+  private final static long MAX_VERTEX_RUNTIME_DEFAULT = 100000;
+
+  private final long vertexRuntimeThreshold;
+
   public SlowestVertexAnalyzer(Configuration config) {
     this.config = config;
+    this.vertexRuntimeThreshold = Math.max(1, config.getLong(MAX_VERTEX_RUNTIME,
+        MAX_VERTEX_RUNTIME_DEFAULT));
+
+  }
+
+  private long getTaskRuntime(VertexInfo vertexInfo) {
+    TaskInfo firstTaskToStart = vertexInfo.getFirstTaskToStart();
+    TaskInfo lastTaskToFinish = vertexInfo.getLastTaskToFinish();
+
+    DagInfo dagInfo = vertexInfo.getDagInfo();
+    long totalTime = ((lastTaskToFinish == null) ?
+        dagInfo.getFinishTime() : lastTaskToFinish.getFinishTime()) -
+        ((firstTaskToStart == null) ? dagInfo.getStartTime() : firstTaskToStart.getStartTime());
+    return totalTime;
   }
 
   @Override
@@ -59,9 +79,13 @@ public class SlowestVertexAnalyzer implements Analyzer {
 
     for (VertexInfo vertexInfo : dagInfo.getVertices()) {
       String vertexName = vertexInfo.getVertexName();
-      long totalTime = vertexInfo.getTimeTaken();
+      if (vertexInfo.getFirstTaskToStart()  == null || vertexInfo.getLastTaskToFinish() == null) {
+        continue;
+      }
+
+      long totalTime = getTaskRuntime(vertexInfo);
 
-      long max = Long.MIN_VALUE;
+      long slowestLastEventTime = Long.MIN_VALUE;
       String maxSourceName = "";
       taskAttemptRuntimeHistorgram = metrics.histogram(vertexName);
 
@@ -81,10 +105,8 @@ public class SlowestVertexAnalyzer implements Analyzer {
             continue;
           }
           //Find the slowest last event received
-          if (entry.getValue().getValue() > max) {
-            //w.r.t vertex start time.
-            max =(attemptInfo.getStartTimeInterval() +  entry.getValue().getValue()) -
-                (vertexInfo.getStartTimeInterval());
+          if (entry.getValue().getValue() > slowestLastEventTime) {
+            slowestLastEventTime = entry.getValue().getValue();
             maxSourceName = entry.getKey();
           }
         }
@@ -104,9 +126,7 @@ public class SlowestVertexAnalyzer implements Analyzer {
           }
           //Find the slowest last event received
           if (entry.getValue().getValue() > shuffleMax) {
-            //w.r.t vertex start time.
-            shuffleMax =(attemptInfo.getStartTimeInterval() +  entry.getValue().getValue()) -
-                (vertexInfo.getStartTimeInterval());
+            shuffleMax = entry.getValue().getValue();
             shuffleMaxSource = entry.getKey();
           }
         }
@@ -120,9 +140,10 @@ public class SlowestVertexAnalyzer implements Analyzer {
       record.add(totalTime + "");
       record.add(Math.max(0, shuffleMax) + "");
       record.add(shuffleMaxSource);
-      record.add(Math.max(0, max) + "");
+      record.add(Math.max(0, slowestLastEventTime) + "");
       record.add(maxSourceName);
-      record.add(Math.max(0,(totalTime - max)) + "");
+      //Finding out real_work done at vertex level might be meaningless (as it is quite posisble
+      // that it went to starvation).
 
       StringBuilder sb = new StringBuilder();
       double percentile75 = taskAttemptRuntimeHistorgram.getSnapshot().get75thPercentile();
@@ -145,7 +166,7 @@ public class SlowestVertexAnalyzer implements Analyzer {
 
       if (totalTime > 0 && vertexInfo.getTaskAttempts().size() > 0) {
         if ((shuffleMax * 1.0f / totalTime) > 0.5) {
-          if ((max * 1.0f / totalTime) > 0.5) {
+          if ((slowestLastEventTime * 1.0f / totalTime) > 0.5) {
             comments = "This vertex is slow due to its dependency on parent. Got a lot delayed last"
                 + " event received";
           } else {
@@ -153,8 +174,9 @@ public class SlowestVertexAnalyzer implements Analyzer {
                 "Spending too much time on shuffle. Check shuffle bytes from previous vertex";
           }
         } else {
-          if (totalTime > 10000) { //greater than 10 seconds. //TODO: Configure it later.
-            comments = "Concentrate on this vertex (totalTime > 10 seconds)";
+          if (totalTime > vertexRuntimeThreshold) { //greater than X seconds.
+            comments = "Concentrate on this vertex (totalTime > " + vertexRuntimeThreshold
+                + " seconds)";
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
index c650104..83b1bb0 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
@@ -49,12 +49,21 @@ public class SpillAnalyzerImpl implements Analyzer {
 
   private final CSVResult csvResult;
 
-  private static long OUTPUT_BYTES_THRESHOLD = 1 * 1024 * 1024 * 1024l;
+  /**
+   * Minimum output bytes that should be chunrned out by a task
+   */
+  private static final String OUTPUT_BYTES_THRESHOLD = "tez.spill-analyzer.min.output.bytes"
+      + ".threshold";
+  private static long OUTPUT_BYTES_THRESHOLD_DEFAULT = 1 * 1024 * 1024 * 1024l;
+
+  private final long minOutputBytesPerTask;
 
   private final Configuration config;
 
   public SpillAnalyzerImpl(Configuration config) {
     this.config = config;
+    minOutputBytesPerTask = Math.max(0, config.getLong(OUTPUT_BYTES_THRESHOLD,
+        OUTPUT_BYTES_THRESHOLD_DEFAULT));
     this.csvResult = new CSVResult(headers);
   }
 
@@ -83,7 +92,7 @@ public class SpillAnalyzerImpl implements Analyzer {
           long outputRecords = outputRecordsMap.get(source).getValue();
           long spilledRecords = spilledRecordsMap.get(source).getValue();
 
-          if (spillCount > 1 && outBytes > OUTPUT_BYTES_THRESHOLD) {
+          if (spillCount > 1 && outBytes > minOutputBytesPerTask) {
             List<String> recorList = Lists.newLinkedList();
             recorList.add(vertexName);
             recorList.add(attemptInfo.getTaskAttemptId());
@@ -95,7 +104,7 @@ public class SpillAnalyzerImpl implements Analyzer {
             recorList.add(outputRecords + "");
             recorList.add(spilledRecords + "");
             recorList.add("Consider increasing " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB
-                + ", try increasing container size.");
+                + ". Try increasing container size.");
 
             csvResult.addRecord(recorList.toArray(new String[recorList.size()]));
           }

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
new file mode 100644
index 0000000..c07ff83
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.TreeMultiset;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Analyze concurrent tasks running in every vertex at regular intervals.
+ */
+public class TaskConcurrencyAnalyzer implements Analyzer {
+
+  private static final String[] headers = { "time", "vertexName", "concurrentTasksRunning" };
+
+  private final CSVResult csvResult;
+  private final Configuration config;
+
+  public TaskConcurrencyAnalyzer(Configuration conf) {
+    this.csvResult = new CSVResult(headers);
+    this.config = conf;
+  }
+
+  private enum EventType {START, FINISH}
+
+  static class TimeInfo {
+    EventType eventType;
+    long timestamp;
+    int concurrentTasks;
+
+    public TimeInfo(EventType eventType, long timestamp) {
+      this.eventType = eventType;
+      this.timestamp = timestamp;
+    }
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+
+    //For each vertex find the concurrent tasks running at any point
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      List<TaskAttemptInfo> taskAttempts =
+          Lists.newLinkedList(vertexInfo.getTaskAttempts(true, null));
+
+      String vertexName = vertexInfo.getVertexName();
+
+      /**
+       * - Get sorted multi-set of timestamps (S1, S2,...E1, E2..). Possible to have multiple
+       * tasks starting/ending at same time.
+       * - Walk through the set
+       * - Increment concurrent tasks when start event is encountered
+       * - Decrement concurrent tasks when start event is encountered
+       */
+      TreeMultiset<TimeInfo> timeInfoSet = TreeMultiset.create(new Comparator<TimeInfo>() {
+        @Override public int compare(TimeInfo o1, TimeInfo o2) {
+          return (o1.timestamp < o2.timestamp) ? -1 :
+              ((o1.timestamp == o2.timestamp) ? 0 : 1);
+        }
+      });
+
+      for (TaskAttemptInfo attemptInfo : taskAttempts) {
+        TimeInfo startTimeInfo = new TimeInfo(EventType.START, attemptInfo.getStartTime());
+        TimeInfo stopTimeInfo = new TimeInfo(EventType.FINISH, attemptInfo.getFinishTime());
+
+        timeInfoSet.add(startTimeInfo);
+        timeInfoSet.add(stopTimeInfo);
+      }
+
+      //Compute concurrent tasks in the list now.
+      int concurrentTasks = 0;
+      for(TimeInfo timeInfo : timeInfoSet.elementSet()) {
+        switch (timeInfo.eventType) {
+        case START:
+          concurrentTasks += timeInfoSet.count(timeInfo);
+          break;
+        case FINISH:
+          concurrentTasks -= timeInfoSet.count(timeInfo);
+          break;
+        default:
+          break;
+        }
+        timeInfo.concurrentTasks = concurrentTasks;
+        addToResult(vertexName, timeInfo.timestamp, timeInfo.concurrentTasks);
+      }
+    }
+  }
+
+  private void addToResult(String vertexName, long currentTime, int concurrentTasks) {
+    String[] record = { currentTime + "", vertexName, concurrentTasks + "" };
+    csvResult.addRecord(record);
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "TaskConcurrencyAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze how many tasks were running in every vertex at given point in time. This "
+        + "would be helpful in understanding whether any starvation was there or not.";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
new file mode 100644
index 0000000..4a582bb
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.analyzer.utils;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.plutext.jaxb.svg11.Line;
+import org.plutext.jaxb.svg11.ObjectFactory;
+import org.plutext.jaxb.svg11.Svg;
+import org.plutext.jaxb.svg11.Title;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.namespace.QName;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.TreeSet;
+
+public class SVGUtils {
+
+  private static final String UTF8 = "UTF-8";
+
+  private static final Logger LOG = LoggerFactory.getLogger(SVGUtils.class);
+
+
+  private final ObjectFactory objectFactory;
+  private final Svg svg;
+  private final QName titleName = new QName("title");
+
+  private static int MAX_DAG_RUNTIME = 0;
+  private static final int SCREEN_WIDTH = 1800;
+
+  private final DagInfo dagInfo;
+
+  //Gap between various components
+  private static final int DAG_GAP = 70;
+  private static final int VERTEX_GAP = 50;
+  private static final int TASK_GAP = 5;
+  private static final int STROKE_WIDTH = 5;
+
+  //To compute the size of the graph.
+  private long MIN_X = Long.MAX_VALUE;
+  private long MAX_X = Long.MIN_VALUE;
+
+  private int x1 = 0;
+  private int y1 = 0;
+  private int y2 = 0;
+
+  public SVGUtils(DagInfo dagInfo) {
+    this.dagInfo = dagInfo;
+    this.objectFactory = new ObjectFactory();
+    this.svg = objectFactory.createSvg();
+  }
+
+  private Line createLine(int x1, int y1, int x2, int y2) {
+    Line line = objectFactory.createLine();
+    line.setX1(scaleDown(x1) + "");
+    line.setY1(y1 + "");
+    line.setX2(scaleDown(x2) + "");
+    line.setY2(y2 + "");
+    return line;
+  }
+
+  private Title createTitle(String msg) {
+    Title t = objectFactory.createTitle();
+    t.setContent(msg);
+    return t;
+  }
+
+  private Title createTitleForVertex(VertexInfo vertex) {
+    String titleStr = vertex.getVertexName() + ":"
+        + (vertex.getFinishTimeInterval())
+        + " ms, RelativeTimeToDAG:"
+        + (vertex.getInitTime() - this.dagInfo.getStartTime())
+        + " ms, counters:" + vertex.getTezCounters();
+    Title title = createTitle(titleStr);
+    return title;
+  }
+
+  private Title createTitleForTaskAttempt(TaskAttemptInfo taskAttemptInfo) {
+    String titleStr = "RelativeTimeToVertex:"
+        + (taskAttemptInfo.getStartTime() -
+        taskAttemptInfo.getTaskInfo().getVertexInfo().getInitTime()) +
+        " ms, " + taskAttemptInfo.toString() + ", counters:" + taskAttemptInfo.getTezCounters();
+    Title title = createTitle(titleStr);
+    return title;
+  }
+
+  /**
+   * Draw DAG from dagInfo
+   *
+   * @param dagInfo
+   */
+  private void drawDAG(DagInfo dagInfo) {
+    Title title = createTitle(dagInfo.getDagId() + " : " + dagInfo.getTimeTaken() + " ms");
+    int duration = (int) dagInfo.getFinishTimeInterval();
+    MAX_DAG_RUNTIME = duration;
+    MIN_X = Math.min(dagInfo.getStartTimeInterval(), MIN_X);
+    MAX_X = Math.max(dagInfo.getFinishTimeInterval(), MAX_X);
+    Line line = createLine(x1, y1, x1 + duration, y2);
+    line.getSVGDescriptionClass().add(new JAXBElement<Title>(titleName, Title.class, title));
+    line.setStyle("stroke: black; stroke-width:20");
+    line.setOpacity("0.3");
+    svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line);
+    drawVertex();
+  }
+
+  private Collection<VertexInfo> getSortedVertices() {
+    Collection<VertexInfo> vertices = this.dagInfo.getVertices();
+    // Add corresponding vertex details
+    TreeSet<VertexInfo> vertexSet = new TreeSet<VertexInfo>(
+        new Comparator<VertexInfo>() {
+          @Override
+          public int compare(VertexInfo o1, VertexInfo o2) {
+            return (int) (o1.getFirstTaskStartTimeInterval() - o2.getFirstTaskStartTimeInterval());
+          }
+        });
+    vertexSet.addAll(vertices);
+    return  vertexSet;
+  }
+
+  private Collection<TaskInfo> getSortedTasks(VertexInfo vertexInfo) {
+    Collection<TaskInfo> tasks = vertexInfo.getTasks();
+    // Add corresponding task details
+    TreeSet<TaskInfo> taskSet = new TreeSet<TaskInfo>(new Comparator<TaskInfo>() {
+      @Override
+      public int compare(TaskInfo o1, TaskInfo o2) {
+        return (int) (o1.getSuccessfulTaskAttempt().getStartTimeInterval()
+            - o2.getSuccessfulTaskAttempt().getStartTimeInterval());
+      }
+    });
+    taskSet.addAll(tasks);
+    return taskSet;
+  }
+
+  /**
+   * Draw the vertices
+   *
+   */
+  public void drawVertex() {
+    Collection<VertexInfo> vertices = getSortedVertices();
+    for (VertexInfo vertex : vertices) {
+      //Set vertex start time as the one when its first task attempt started executing
+      x1 = (int) vertex.getStartTimeInterval();
+      y1 += VERTEX_GAP;
+      int duration = ((int) (vertex.getTimeTaken()));
+      Line line = createLine(x1, y1, x1 + duration, y1);
+      line.setStyle("stroke: red; stroke-width:" + STROKE_WIDTH);
+      line.setOpacity("0.3");
+
+      Title vertexTitle = createTitleForVertex(vertex);
+      line.getSVGDescriptionClass().add(
+          new JAXBElement<Title>(titleName, Title.class, vertexTitle));
+      svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line);
+      // For each vertex, draw the tasks
+      drawTask(vertex);
+    }
+    x1 = x1 + (int) dagInfo.getFinishTimeInterval();
+    y1 = y1 + DAG_GAP;
+    y2 = y1;
+  }
+
+  /**
+   * Draw tasks
+   *
+   * @param vertex
+   */
+  public void drawTask(VertexInfo vertex) {
+    Collection<TaskInfo> tasks = getSortedTasks(vertex);
+    for (TaskInfo task : tasks) {
+      for (TaskAttemptInfo taskAttemptInfo : task.getTaskAttempts()) {
+        x1 = (int) taskAttemptInfo.getStartTimeInterval();
+        y1 += TASK_GAP;
+        int duration = (int) taskAttemptInfo.getTimeTaken();
+        Line line = createLine(x1, y1, x1 + duration, y1);
+        String color =
+            taskAttemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.name())
+                ? "green" : "red";
+        line.setStyle("stroke: " + color + "; stroke-width:" + STROKE_WIDTH);
+        Title title = createTitleForTaskAttempt(taskAttemptInfo);
+        line.getSVGDescriptionClass().add(
+            new JAXBElement<Title>(titleName, Title.class, title));
+        svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass()
+            .add(line);
+      }
+    }
+  }
+
+  /**
+   * Convert DAG to graph
+   *
+   * @throws java.io.IOException
+   * @throws javax.xml.bind.JAXBException
+   */
+  public void saveAsSVG(String fileName) throws IOException, JAXBException {
+    drawDAG(dagInfo);
+    svg.setHeight("" + y2);
+    svg.setWidth("" + (MAX_X - MIN_X));
+    String tempFileName = System.nanoTime() + ".svg";
+    File file = new File(tempFileName);
+    JAXBContext jaxbContext = JAXBContext.newInstance(Svg.class);
+    Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
+    jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+    jaxbMarshaller.marshal(svg, file);
+    //TODO: dirty workaround to get rid of XMLRootException issue
+    BufferedReader reader = new BufferedReader(
+        new InputStreamReader(new FileInputStream(file), UTF8));
+    BufferedWriter writer = new BufferedWriter(
+        new OutputStreamWriter(new FileOutputStream(fileName), UTF8));
+    try {
+      while (reader.ready()) {
+        String line = reader.readLine();
+        if (line != null) {
+          line = line.replaceAll(
+              " xmlns:ns3=\"http://www.w3.org/2000/svg\" xmlns=\"\"", "");
+          writer.write(line);
+          writer.newLine();
+        }
+      }
+    } finally {
+      IOUtils.closeQuietly(reader);
+      IOUtils.closeQuietly(writer);
+      if (file.exists()) {
+        boolean deleted = file.delete();
+        LOG.debug("Deleted {}" + file.getAbsolutePath());
+      }
+    }
+  }
+
+  private float scaleDown(int len) {
+    return (len * 1.0f / MAX_DAG_RUNTIME) * SCREEN_WIDTH;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
new file mode 100644
index 0000000..8bcf265
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.utils;
+
+import com.sun.istack.Nullable;
+import org.apache.tez.dag.utils.Graph;
+import org.apache.tez.history.parser.datamodel.AdditionalInputOutputDetails;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.EdgeInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Utils {
+
+  private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+");
+
+  public static String getShortClassName(String className) {
+    int pos = className.lastIndexOf(".");
+    if (pos != -1 && pos < className.length() - 1) {
+      return className.substring(pos + 1);
+    }
+    return className;
+  }
+
+  public static String sanitizeLabelForViz(String label) {
+    Matcher m = sanitizeLabelPattern.matcher(label);
+    return m.replaceAll("_");
+  }
+
+  public static void generateDAGVizFile(DagInfo dagInfo, String fileName,
+      @Nullable List<String> criticalVertices) throws IOException {
+    Graph graph = new Graph(sanitizeLabelForViz(dagInfo.getName()));
+
+    for (VertexInfo v : dagInfo.getVertices()) {
+      String nodeLabel = sanitizeLabelForViz(v.getVertexName())
+          + "[" + getShortClassName(v.getProcessorClassName()
+          + ", tasks=" + v.getTasks().size() + ", time=" + v.getTimeTaken() +" ms]");
+      Graph.Node n = graph.newNode(sanitizeLabelForViz(v.getVertexName()), nodeLabel);
+
+      boolean criticalVertex = (criticalVertices != null) ? criticalVertices.contains(v
+          .getVertexName()) : false;
+      if (criticalVertex) {
+        n.setColor("red");
+      }
+
+
+      for (AdditionalInputOutputDetails input : v.getAdditionalInputInfoList()) {
+        Graph.Node inputNode = graph.getNode(sanitizeLabelForViz(v.getVertexName())
+            + "_" + sanitizeLabelForViz(input.getName()));
+        inputNode.setLabel(sanitizeLabelForViz(v.getVertexName())
+            + "[" + sanitizeLabelForViz(input.getName()) + "]");
+        inputNode.setShape("box");
+        inputNode.addEdge(n, "Input name=" + input.getName()
+            + " [inputClass=" + getShortClassName(input.getClazz())
+            + ", initializer=" + getShortClassName(input.getInitializer()) + "]");
+      }
+      for (AdditionalInputOutputDetails output : v.getAdditionalOutputInfoList()) {
+        Graph.Node outputNode = graph.getNode(sanitizeLabelForViz(v.getVertexName())
+            + "_" + sanitizeLabelForViz(output.getName()));
+        outputNode.setLabel(sanitizeLabelForViz(v.getVertexName())
+            + "[" + sanitizeLabelForViz(output.getName()) + "]");
+        outputNode.setShape("box");
+        n.addEdge(outputNode, "Output name=" + output.getName()
+            + " [outputClass=" + getShortClassName(output.getClazz())
+            + ", committer=" + getShortClassName(output.getInitializer()) + "]");
+      }
+
+    }
+
+    for (EdgeInfo e : dagInfo.getEdges()) {
+      Graph.Node n = graph.getNode(sanitizeLabelForViz(e.getInputVertexName()));
+      n.addEdge(graph.getNode(sanitizeLabelForViz(e.getOutputVertexName())),
+          "[input=" + getShortClassName(e.getEdgeSourceClass())
+              + ", output=" + getShortClassName(e.getEdgeDestinationClass())
+              + ", dataMovement=" + e.getDataMovementType().trim() + "]");
+    }
+
+    graph.save(fileName);
+  }
+}


[2/2] tez git commit: TEZ-2692. bugfixes & enhancements related to job parser and analyzer (rbalamohan)

Posted by rb...@apache.org.
TEZ-2692. bugfixes & enhancements related to job parser and analyzer (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ecd90dc1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ecd90dc1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ecd90dc1

Branch: refs/heads/master
Commit: ecd90dc1f79a4b1614174b75030b85feb8842793
Parents: eadbfec
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Aug 11 18:19:55 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue Aug 11 18:19:55 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/tez/dag/utils/Graph.java    |  15 +-
 .../tez/history/parser/SimpleHistoryParser.java | 239 ++++++
 .../history/parser/datamodel/VertexInfo.java    |  65 +-
 .../apache/tez/history/TestATSFileParser.java   | 587 --------------
 .../apache/tez/history/TestHistoryParser.java   | 785 +++++++++++++++++++
 tez-tools/analyzers/job-analyzer/pom.xml        |   9 +
 .../java/org/apache/tez/analyzer/CSVResult.java |   5 +-
 .../analyzer/plugins/CriticalPathAnalyzer.java  |  44 +-
 .../tez/analyzer/plugins/LocalityAnalyzer.java  |   3 +-
 .../analyzer/plugins/ShuffleTimeAnalyzer.java   |  60 +-
 .../tez/analyzer/plugins/SkewAnalyzer.java      |   4 +
 .../analyzer/plugins/SlowTaskIdentifier.java    |  13 +-
 .../analyzer/plugins/SlowestVertexAnalyzer.java |  52 +-
 .../tez/analyzer/plugins/SpillAnalyzerImpl.java |  15 +-
 .../plugins/TaskConcurrencyAnalyzer.java        | 138 ++++
 .../org/apache/tez/analyzer/utils/SVGUtils.java | 264 +++++++
 .../org/apache/tez/analyzer/utils/Utils.java    | 100 +++
 18 files changed, 1751 insertions(+), 648 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b37eb9e..3de9fb7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@ INCOMPATIBLE CHANGES
   TEZ-2699. Internalize strings in ATF parser
 
 ALL CHANGES:
+  TEZ-2692. bugfixes & enhancements related to job parser and analyzer.
   TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM.
   TEZ-2630. TezChild receives IP address instead of FQDN.
   TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized.

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
index 6de9c59..eb2bd41 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
@@ -62,6 +62,7 @@ public class Graph {
     List<Edge> outs;
     String label;
     String shape;
+    String color;
 
     public Node(String id) {
       this(id, null);
@@ -104,6 +105,10 @@ public class Graph {
     public void setShape(String shape) {
       this.shape = shape;
     }
+
+    public void setColor(String color) {
+      this.color = color;
+    }
   }
 
   private String name;
@@ -196,17 +201,19 @@ public class Graph {
     for (Node n : nodes) {
       if (n.shape != null && !n.shape.isEmpty()) {
         sb.append(String.format(
-            "%s%s [ label = %s, shape = %s ];",
+            "%s%s [ label = %s, shape = %s , color= %s];",
             indent,
             wrapSafeString(n.getUniqueId()),
             wrapSafeString(n.getLabel()),
-            wrapSafeString(n.shape)));
+            wrapSafeString(n.shape),
+            wrapSafeString(n.color == null ? "black" : n.color)));
       } else {
         sb.append(String.format(
-            "%s%s [ label = %s ];",
+            "%s%s [ label = %s , color= %s ];",
             indent,
             wrapSafeString(n.getUniqueId()),
-            wrapSafeString(n.getLabel())));
+            wrapSafeString(n.getLabel()),
+            wrapSafeString(n.color == null ? "black" : n.color)));
       }
       sb.append(System.getProperty("line.separator"));
       List<Edge> combinedOuts = combineEdges(n.outs);

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
new file mode 100644
index 0000000..09c010a
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.history.parser;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.history.parser.datamodel.BaseParser;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Scanner;
+
+/**
+ * Parser utility to parse data generated by SimpleHistoryLogging to in-memory datamodel provided
+ * in org.apache.tez.history.parser.datamodel
+ * <p/>
+ * <p/>
+ * Most of the information should be available. Minor info like VersionInfo may not be available,
+ * as it is not captured in SimpleHistoryLogging.
+ */
+public class SimpleHistoryParser extends BaseParser {
+  private static final Logger LOG = LoggerFactory.getLogger(SimpleHistoryParser.class);
+  private static final String UTF8 = "UTF-8";
+  private final File historyFile;
+
+
+  public SimpleHistoryParser(File historyFile) {
+    super();
+    Preconditions.checkArgument(historyFile.exists(), historyFile + " does not exist");
+    this.historyFile = historyFile;
+  }
+
+  /**
+   * Get in-memory representation of DagInfo
+   *
+   * @return DagInfo
+   * @throws TezException
+   */
+  public DagInfo getDAGData(String dagId) throws TezException {
+    try {
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(dagId), "Please provide valid dagId");
+      dagId = dagId.trim();
+      parseContents(historyFile, dagId);
+      linkParsedContents();
+      return dagInfo;
+    } catch (IOException e) {
+      LOG.error("Error in reading DAG ", e);
+      throw new TezException(e);
+    } catch (JSONException e) {
+      LOG.error("Error in parsing DAG ", e);
+      throw new TezException(e);
+    }
+  }
+
+  private void populateOtherInfo(JSONObject source, JSONObject destination) throws JSONException {
+    if (source == null || destination == null) {
+      return;
+    }
+    for (Iterator it = source.keys(); it.hasNext(); ) {
+      String key = (String) it.next();
+      Object val = source.get(key);
+      destination.put(key, val);
+    }
+  }
+
+  private void populateOtherInfo(JSONObject source, String entityName,
+      Map<String, JSONObject> destMap) throws JSONException {
+    JSONObject destinationJson = destMap.get(entityName);
+    JSONObject destOtherInfo = destinationJson.getJSONObject(Constants.OTHER_INFO);
+    populateOtherInfo(source, destOtherInfo);
+  }
+
+  private void parseContents(File historyFile, String dagId)
+      throws JSONException, FileNotFoundException, TezException {
+    Scanner scanner = new Scanner(historyFile, UTF8);
+    scanner.useDelimiter(SimpleHistoryLoggingService.RECORD_SEPARATOR);
+    JSONObject dagJson = null;
+    Map<String, JSONObject> vertexJsonMap = Maps.newHashMap();
+    Map<String, JSONObject> taskJsonMap = Maps.newHashMap();
+    Map<String, JSONObject> attemptJsonMap = Maps.newHashMap();
+    TezDAGID tezDAGID = TezDAGID.fromString(dagId);
+    while (scanner.hasNext()) {
+      String line = scanner.next();
+      JSONObject jsonObject = new JSONObject(line);
+      String entity = jsonObject.getString(Constants.ENTITY);
+      String entityType = jsonObject.getString(Constants.ENTITY_TYPE);
+      switch (entityType) {
+      case Constants.TEZ_DAG_ID:
+        if (!dagId.equals(entity)) {
+          LOG.warn(dagId + " is not matching with " + entity);
+          continue;
+        }
+        // Club all DAG related information together (DAG_INIT, DAG_FINISH etc). Each of them
+        // would have a set of entities in otherinfo (e.g vertex mapping, dagPlan, start/finish
+        // time etc).
+        if (dagJson == null) {
+          dagJson = jsonObject;
+        }
+        JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        JSONObject dagOtherInfo = dagJson.getJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, dagOtherInfo);
+        break;
+      case Constants.TEZ_VERTEX_ID:
+        String vertexName = entity;
+        TezVertexID tezVertexID = TezVertexID.fromString(vertexName);
+        if (!tezDAGID.equals(tezVertexID.getDAGId())) {
+          LOG.warn(vertexName + " does not belong to " + tezDAGID);
+          continue;
+        }
+        if (!vertexJsonMap.containsKey(vertexName)) {
+          vertexJsonMap.put(vertexName, jsonObject);
+        }
+        otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, vertexName, vertexJsonMap);
+        break;
+      case Constants.TEZ_TASK_ID:
+        String taskName = entity;
+        TezTaskID tezTaskID = TezTaskID.fromString(taskName);
+        if (!tezDAGID.equals(tezTaskID.getVertexID().getDAGId())) {
+          LOG.warn(taskName + " does not belong to " + tezDAGID);
+          continue;
+        }
+        if (!taskJsonMap.containsKey(taskName)) {
+          taskJsonMap.put(taskName, jsonObject);
+        }
+        otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, taskName, taskJsonMap);
+        break;
+      case Constants.TEZ_TASK_ATTEMPT_ID:
+        String taskAttemptName = entity;
+        TezTaskAttemptID tezAttemptId = TezTaskAttemptID.fromString(taskAttemptName);
+        if (!tezDAGID.equals(tezAttemptId.getTaskID().getVertexID().getDAGId())) {
+          LOG.warn(taskAttemptName + " does not belong to " + tezDAGID);
+          continue;
+        }
+        if (!attemptJsonMap.containsKey(taskAttemptName)) {
+          attemptJsonMap.put(taskAttemptName, jsonObject);
+        }
+        otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, taskAttemptName, attemptJsonMap);
+        break;
+      default:
+        break;
+      }
+    }
+    scanner.close();
+    if (dagJson != null) {
+      this.dagInfo = DagInfo.create(dagJson);
+    } else {
+      LOG.error("Dag is not yet parsed. Looks like partial file.");
+      throw new TezException(
+          "Please provide a valid/complete history log file containing " + dagId);
+    }
+    for (JSONObject jsonObject : vertexJsonMap.values()) {
+      VertexInfo vertexInfo = VertexInfo.create(jsonObject);
+      this.vertexList.add(vertexInfo);
+      LOG.debug("Parsed vertex {}", vertexInfo.getVertexName());
+    }
+    for (JSONObject jsonObject : taskJsonMap.values()) {
+      TaskInfo taskInfo = TaskInfo.create(jsonObject);
+      this.taskList.add(taskInfo);
+      LOG.debug("Parsed task {}", taskInfo.getTaskId());
+    }
+    for (JSONObject jsonObject : attemptJsonMap.values()) {
+      /**
+       * For converting SimpleHistoryLogging to in-memory representation
+       *
+       * We need to get "relatedEntities":[{"entity":"cn055-10.l42scl.hortonworks.com:58690",
+       * "entitytype":"nodeId"},{"entity":"container_1438652049951_0008_01_000152",
+       * "entitytype":"containerId"} and populate it in otherInfo object so that in-memory
+       * representation can parse it correctly
+       */
+    JSONObject subJsonObject = jsonObject.optJSONArray(Constants.RELATED_ENTITIES)
+        .optJSONObject(0);
+      if (subJsonObject != null) {
+        String nodeId = subJsonObject.optString(Constants.ENTITY_TYPE);
+        if (!Strings.isNullOrEmpty(nodeId) && nodeId.equalsIgnoreCase(Constants.NODE_ID)) {
+          //populate it in otherInfo
+          JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+          String nodeIdVal = subJsonObject.optString(Constants.ENTITY);
+          if (otherInfo != null && nodeIdVal != null) {
+            otherInfo.put(Constants.NODE_ID, nodeIdVal);
+          }
+        }
+      }
+
+      subJsonObject = jsonObject.optJSONArray(Constants.RELATED_ENTITIES)
+          .optJSONObject(1);
+      if (subJsonObject != null) {
+        String containerId = subJsonObject.optString(Constants.ENTITY_TYPE);
+        if (!Strings.isNullOrEmpty(containerId) && containerId.equalsIgnoreCase(Constants.CONTAINER_ID)) {
+          //populate it in otherInfo
+          JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+          String containerIdVal = subJsonObject.optString(Constants.ENTITY);
+          if (otherInfo != null && containerIdVal != null) {
+            otherInfo.put(Constants.CONTAINER_ID, containerIdVal);
+          }
+        }
+      }
+      TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(jsonObject);
+      this.attemptList.add(attemptInfo);
+      LOG.debug("Parsed task attempt {}", attemptInfo.getTaskAttemptId());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
index d2dac7d..6e227a5 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
@@ -33,6 +33,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
+import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -164,13 +165,25 @@ public class VertexInfo extends BaseInfo {
     updateEdgeInfo();
   }
 
+  public List<AdditionalInputOutputDetails> getAdditionalInputInfoList() {
+    return Collections.unmodifiableList(additionalInputInfoList);
+  }
+
+  public List<AdditionalInputOutputDetails> getAdditionalOutputInfoList() {
+    return Collections.unmodifiableList(additionalOutputInfoList);
+  }
+
   @Override
   public final long getStartTimeInterval() {
     return startTime - (dagInfo.getStartTime());
   }
 
   public final long getFirstTaskStartTimeInterval() {
-    return getFirstTaskToStart().getStartTimeInterval();
+    TaskInfo firstTask = getFirstTaskToStart();
+    if (firstTask == null) {
+      return 0;
+    }
+    return firstTask.getStartTimeInterval();
   }
 
   public final long getLastTaskFinishTimeInterval() {
@@ -270,14 +283,32 @@ public class VertexInfo extends BaseInfo {
 
   }
 
+
+  private List<TaskInfo> getTasksInternal() {
+    return Lists.newLinkedList(taskInfoMap.values());
+  }
+
   /**
    * Get all tasks
    *
    * @return list of taskInfo
    */
   public final List<TaskInfo> getTasks() {
-    List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values());
-    Collections.sort(taskInfoList, orderingOnStartTime());
+    return Collections.unmodifiableList(getTasksInternal());
+  }
+
+  /**
+   * Get all tasks in sorted order
+   *
+   * @param sorted
+   * @param ordering
+   * @return list of TaskInfo
+   */
+  public final List<TaskInfo> getTasks(boolean sorted, @Nullable Ordering<TaskInfo> ordering) {
+    List<TaskInfo> taskInfoList = getTasksInternal();
+    if (sorted) {
+      Collections.sort(taskInfoList, ((ordering == null) ? orderingOnStartTime() : ordering));
+    }
     return Collections.unmodifiableList(taskInfoList);
   }
 
@@ -352,12 +383,36 @@ public class VertexInfo extends BaseInfo {
     return Collections.unmodifiableList(outputVertices);
   }
 
-  public List<TaskAttemptInfo> getTaskAttempts() {
+  private List<TaskAttemptInfo> getTaskAttemptsInternal() {
     List<TaskAttemptInfo> taskAttemptInfos = Lists.newLinkedList();
     for (TaskInfo taskInfo : getTasks()) {
       taskAttemptInfos.addAll(taskInfo.getTaskAttempts());
     }
-    Collections.sort(taskAttemptInfos, orderingOnAttemptStartTime());
+    return taskAttemptInfos;
+  }
+
+  /**
+   * Get all task attempts
+   *
+   * @return List<TaskAttemptInfo> list of attempts
+   */
+  public List<TaskAttemptInfo> getTaskAttempts() {
+    return Collections.unmodifiableList(getTaskAttemptsInternal());
+  }
+
+  /**
+   * Get all task attempts in sorted order
+   *
+   * @param sorted
+   * @param ordering
+   * @return list of TaskAttemptInfo
+   */
+  public final List<TaskAttemptInfo> getTaskAttempts(boolean sorted,
+      @Nullable Ordering<TaskAttemptInfo> ordering) {
+    List<TaskAttemptInfo> taskAttemptInfos = getTaskAttemptsInternal();
+    if (sorted) {
+      Collections.sort(taskAttemptInfos, ((ordering == null) ? orderingOnAttemptStartTime() : ordering));
+    }
     return Collections.unmodifiableList(taskAttemptInfos);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
deleted file mode 100644
index 0d76e03..0000000
--- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
+++ /dev/null
@@ -1,587 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.history;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.common.counters.DAGCounter;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DataSinkDescriptor;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.dag.api.event.VertexState;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.app.dag.DAGState;
-import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.examples.WordCount;
-import org.apache.tez.history.parser.ATSFileParser;
-import org.apache.tez.history.parser.datamodel.DagInfo;
-import org.apache.tez.history.parser.datamodel.EdgeInfo;
-import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
-import org.apache.tez.history.parser.datamodel.TaskInfo;
-import org.apache.tez.history.parser.datamodel.VersionInfo;
-import org.apache.tez.history.parser.datamodel.VertexInfo;
-import org.apache.tez.mapreduce.input.MRInput;
-import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
-import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
-import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-import org.apache.tez.tests.MiniTezClusterWithTimeline;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestATSFileParser {
-
-  private static MiniDFSCluster miniDFSCluster;
-  private static MiniTezClusterWithTimeline miniTezCluster;
-
-  //location within miniHDFS cluster's hdfs
-  private static Path inputLoc = new Path("/tmp/sample.txt");
-
-  private final static String INPUT = "Input";
-  private final static String OUTPUT = "Output";
-  private final static String TOKENIZER = "Tokenizer";
-  private final static String SUMMATION = "Summation";
-
-  private static Configuration conf = new Configuration();
-  private static FileSystem fs;
-  private static String TEST_ROOT_DIR =
-      "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tmpDir";
-  private static String TEZ_BASE_DIR =
-      "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tez";
-  private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download";
-
-  private static TezClient tezClient;
-
-  private static int dagNumber;
-
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    conf = new Configuration();
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false);
-    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
-    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
-    miniDFSCluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
-    fs = miniDFSCluster.getFileSystem();
-    conf.set("fs.defaultFS", fs.getUri().toString());
-
-    setupTezCluster();
-  }
-
-  @AfterClass
-  public static void shutdownCluster() {
-    try {
-      if (tezClient != null) {
-        try {
-          tezClient.stop();
-        } catch (TezException e) {
-          //ignore
-        } catch (IOException e) {
-          //ignore
-        }
-      }
-      if (miniDFSCluster != null) {
-        miniDFSCluster.shutdown();
-      }
-      if (miniTezCluster != null) {
-        miniTezCluster.stop();
-      }
-    } finally {
-      try {
-        FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
-        FileUtils.deleteDirectory(new File(TEZ_BASE_DIR));
-      } catch (IOException e) {
-        //safe to ignore
-      }
-    }
-  }
-
-  // @Before
-  public static void setupTezCluster() throws Exception {
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000);
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000);
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2);
-
-    //Enable per edge counters
-    conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true);
-    conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
-    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-    conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService
-        .class.getName());
-
-    miniTezCluster =
-        new MiniTezClusterWithTimeline(TEZ_BASE_DIR, 1, 1, 1, true);
-
-    miniTezCluster.init(conf);
-    miniTezCluster.start();
-
-    createSampleFile(inputLoc);
-
-    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
-    tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-    tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188");
-    tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
-    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
-        ATSHistoryLoggingService.class.getName());
-
-    tezClient = TezClient.create("WordCount", tezConf, true);
-    tezClient.start();
-    tezClient.waitTillReady();
-  }
-
-
-  /**
-   * Run a word count example in mini cluster and check if it is possible to download
-   * data from ATS and parse it.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testParserWithSuccessfulJob() throws Exception {
-    //Run basic word count example.
-    String dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
-        WordCount.SumProcessor.class.getName(), "WordCount");
-
-    //Export the data from ATS
-    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
-
-    int result = ATSImportTool.process(args);
-    assertTrue(result == 0);
-
-    //Parse ATS data
-    DagInfo dagInfo = getDagInfo(dagId);
-
-    //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
-    verifyDagInfo(dagInfo);
-
-    //Job specific
-    assertTrue(dagInfo.getNumVertices() == 2);
-    assertTrue(dagInfo.getName().equals("WordCount"));
-    assertTrue(dagInfo.getVertex(TOKENIZER).getProcessorClassName().equals(
-        WordCount.TokenProcessor.class.getName()));
-    assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName()
-        .equals(WordCount.SumProcessor.class.getName()));
-    assertTrue(dagInfo.getEdges().size() == 1);
-    EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next();
-    assertTrue(edgeInfo.getDataMovementType().
-        equals(EdgeProperty.DataMovementType.SCATTER_GATHER.toString()));
-    assertTrue(edgeInfo.getSourceVertex().getVertexName().equals(TOKENIZER));
-    assertTrue(edgeInfo.getDestinationVertex().getVertexName().equals(SUMMATION));
-    assertTrue(edgeInfo.getInputVertexName().equals(TOKENIZER));
-    assertTrue(edgeInfo.getOutputVertexName().equals(SUMMATION));
-    assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName()));
-    assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName()));
-    assertTrue(dagInfo.getVertices().size() == 2);
-    String lastSourceTA = null;
-    String lastDataEventSourceTA = null;
-    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
-      assertTrue(vertexInfo.getKilledTasksCount() == 0);
-      assertTrue(vertexInfo.getInitRequestedTime() > 0);
-      assertTrue(vertexInfo.getInitTime() > 0);
-      assertTrue(vertexInfo.getStartRequestedTime() > 0);
-      assertTrue(vertexInfo.getStartTime() > 0);
-      assertTrue(vertexInfo.getFinishTime() > 0);
-      long finishTime = 0;
-      for (TaskInfo taskInfo : vertexInfo.getTasks()) {
-        assertTrue(taskInfo.getNumberOfTaskAttempts() == 1);
-        assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0);
-        assertTrue(taskInfo.getMinTaskAttemptDuration() >= 0);
-        assertTrue(taskInfo.getAvgTaskAttemptDuration() >= 0);
-        assertTrue(taskInfo.getLastTaskAttemptToFinish() != null);
-        assertTrue(taskInfo.getContainersMapping().size() > 0);
-        assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
-        assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
-        assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
-        List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts();
-        if (vertexInfo.getVertexName().equals(TOKENIZER)) {
-          // get the last task to finish and track its successful attempt
-          if (finishTime < taskInfo.getFinishTime()) {
-            finishTime = taskInfo.getFinishTime();
-            lastSourceTA = taskInfo.getSuccessfulAttemptId();
-          }
-        } else {
-          for (TaskAttemptInfo attempt : attempts) {
-            assertTrue(attempt.getLastDataEventTime() > 0);
-            if (lastDataEventSourceTA == null) {
-              lastDataEventSourceTA = attempt.getLastDataEventSourceTA();
-            } else {
-              // all attempts should have the same last data event source TA
-              assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA()));
-            }
-          }
-        }
-        for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
-          assertTrue(attemptInfo.getStartTimeInterval() > 0);
-          assertTrue(attemptInfo.getScheduledTimeInterval() > 0);
-        }
-      }
-      assertTrue(vertexInfo.getLastTaskToFinish() != null);
-      if (vertexInfo.getVertexName().equals(TOKENIZER)) {
-        assertTrue(vertexInfo.getInputEdges().size() == 0);
-        assertTrue(vertexInfo.getOutputEdges().size() == 1);
-        assertTrue(vertexInfo.getOutputVertices().size() == 1);
-        assertTrue(vertexInfo.getInputVertices().size() == 0);
-      } else {
-        assertTrue(vertexInfo.getInputEdges().size() == 1);
-        assertTrue(vertexInfo.getOutputEdges().size() == 0);
-        assertTrue(vertexInfo.getOutputVertices().size() == 0);
-        assertTrue(vertexInfo.getInputVertices().size() == 1);
-      }
-    }
-    assertTrue(lastSourceTA.equals(lastDataEventSourceTA));
-  }
-
-  /**
-   * Run a word count example in mini cluster.
-   * Provide invalid URL for ATS.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testParserWithSuccessfulJob_InvalidATS() throws Exception {
-    //Run basic word count example.
-    String dagId =  runWordCount(WordCount.TokenProcessor.class.getName(),
-        WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL");
-
-    //Export the data from ATS
-    String atsAddress = "--atsAddress=http://atsHost:8188";
-    String[] args = { "--dagId=" + dagId,
-        "--downloadDir=" + DOWNLOAD_DIR,
-        atsAddress
-      };
-
-    int result = ATSImportTool.process(args);
-    assertTrue(result == -1);
-  }
-
-  /**
-   * Run a failed job and parse the data from ATS
-   */
-  @Test
-  public void testParserWithFailedJob() throws Exception {
-    //Run a job which would fail
-    String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), FailProcessor.class
-        .getName(), "WordCount-With-Exception");
-
-    //Export the data from ATS
-    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
-
-    int result = ATSImportTool.process(args);
-    assertTrue(result == 0);
-
-    //Parse ATS data
-    DagInfo dagInfo = getDagInfo(dagId);
-
-    //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
-    verifyDagInfo(dagInfo);
-
-    //Dag specific
-    VertexInfo summationVertex = dagInfo.getVertex(SUMMATION);
-    assertTrue(summationVertex.getFailedTasks().size() == 1); //1 task, 4 attempts failed
-    assertTrue(summationVertex.getFailedTasks().get(0).getFailedTaskAttempts().size() == 4);
-    assertTrue(summationVertex.getStatus().equals(VertexState.FAILED.toString()));
-
-    assertTrue(dagInfo.getFailedVertices().size() == 1);
-    assertTrue(dagInfo.getFailedVertices().get(0).getVertexName().equals(SUMMATION));
-    assertTrue(dagInfo.getSuccessfullVertices().size() == 1);
-    assertTrue(dagInfo.getSuccessfullVertices().get(0).getVertexName().equals(TOKENIZER));
-
-    assertTrue(dagInfo.getStatus().equals(DAGState.FAILED.toString()));
-
-    verifyCounter(dagInfo.getCounter(DAGCounter.NUM_FAILED_TASKS.toString()), null, 4);
-    verifyCounter(dagInfo.getCounter(DAGCounter.NUM_SUCCEEDED_TASKS.toString()), null, 1);
-    verifyCounter(dagInfo.getCounter(DAGCounter.TOTAL_LAUNCHED_TASKS.toString()), null, 5);
-
-    verifyCounter(dagInfo.getCounter(TaskCounter.INPUT_RECORDS_PROCESSED.toString()),
-        "TaskCounter_Tokenizer_INPUT_Input", 10);
-    verifyCounter(dagInfo.getCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ.toString()),
-        "TaskCounter_Tokenizer_OUTPUT_Summation", 0);
-    verifyCounter(dagInfo.getCounter(TaskCounter.OUTPUT_RECORDS.toString()),
-        "TaskCounter_Tokenizer_OUTPUT_Summation",
-        20); //Every line has 2 words. 10 lines x 2 words = 20
-    verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()),
-        "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above
-    
-    for (TaskInfo taskInfo : summationVertex.getTasks()) {
-      String lastAttemptId = null;
-      for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
-        if (lastAttemptId != null) {
-          // failed attempt should be causal TA of next attempt
-          assertTrue(lastAttemptId.equals(attemptInfo.getSchedulingCausalTA()));
-        }
-        lastAttemptId = attemptInfo.getTaskAttemptId();
-      }
-    }
-
-    //TODO: Need to check for SUMMATION vertex counters. Since all attempts are failed, counters are not getting populated.
-    //TaskCounter.REDUCE_INPUT_RECORDS
-
-    //Verify if the processor exception is given in diagnostics
-    assertTrue(dagInfo.getDiagnostics().contains("Failing this processor for some reason"));
-
-  }
-
-  /**
-   * Create sample file for wordcount program
-   *
-   * @param inputLoc
-   * @throws IOException
-   */
-  private static void createSampleFile(Path inputLoc) throws IOException {
-    fs.deleteOnExit(inputLoc);
-    FSDataOutputStream out = fs.create(inputLoc);
-    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
-    for (int i = 0; i < 10; i++) {
-      writer.write("Sample " + RandomStringUtils.randomAlphanumeric(5));
-      writer.newLine();
-    }
-    writer.close();
-  }
-
-  private DagInfo getDagInfo(String dagId) throws TezException {
-    //Parse downloaded contents
-    File downloadedFile = new File(DOWNLOAD_DIR
-        + Path.SEPARATOR + dagId
-        + Path.SEPARATOR + dagId + ".zip");
-    ATSFileParser parser = new ATSFileParser(downloadedFile);
-    DagInfo dagInfo = parser.getDAGData(dagId);
-    assertTrue(dagInfo.getDagId().equals(dagId));
-    return dagInfo;
-  }
-
-  private void verifyCounter(Map<String, TezCounter> counterMap,
-      String counterGroupName, long expectedVal) {
-    //Iterate through group-->tezCounter
-    for (Map.Entry<String, TezCounter> entry : counterMap.entrySet()) {
-      if (counterGroupName != null) {
-        if (entry.getKey().equals(counterGroupName)) {
-          assertTrue(entry.getValue().getValue() == expectedVal);
-        }
-      } else {
-        assertTrue(entry.getValue().getValue() == expectedVal);
-      }
-    }
-  }
-
-  private String runWordCount(String tokenizerProcessor, String summationProcessor,
-      String dagName)
-      throws Exception {
-    dagNumber++;
-
-    //HDFS path
-    Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis());
-
-    DataSourceDescriptor dataSource = MRInput.createConfigBuilder(conf,
-        TextInputFormat.class, inputLoc.toString()).build();
-
-    DataSinkDescriptor dataSink =
-        MROutput.createConfigBuilder(conf, TextOutputFormat.class, outputLoc.toString()).build();
-
-    Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create(
-        tokenizerProcessor)).addDataSource(INPUT, dataSource);
-
-    OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig
-        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
-            HashPartitioner.class.getName()).build();
-
-    Vertex summationVertex = Vertex.create(SUMMATION,
-        ProcessorDescriptor.create(summationProcessor), 1).addDataSink(OUTPUT, dataSink);
-
-    // Create DAG and add the vertices. Connect the producer and consumer vertices via the edge
-    DAG dag = DAG.create(dagName);
-    dag.addVertex(tokenizerVertex).addVertex(summationVertex).addEdge(
-        Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty()));
-
-    DAGClient client = tezClient.submitDAG(dag);
-    client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
-    TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), dagNumber);
-
-    return tezDAGID.toString();
-  }
-
-  /**
-   * Processor which would just throw exception.
-   */
-  public static class FailProcessor extends SimpleMRProcessor {
-    public FailProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    @Override
-    public void run() throws Exception {
-      throw new Exception("Failing this processor for some reason");
-    }
-  }
-
-  private void verifyDagInfo(DagInfo dagInfo) {
-    VersionInfo versionInfo = dagInfo.getVersionInfo();
-    assertTrue(versionInfo != null); //should be present post 0.5.4
-    assertTrue(versionInfo.getVersion() != null);
-    assertTrue(versionInfo.getRevision() != null);
-    assertTrue(versionInfo.getBuildTime() != null);
-
-    assertTrue(dagInfo.getStartTime() > 0);
-    assertTrue(dagInfo.getFinishTimeInterval() > 0);
-    assertTrue(dagInfo.getStartTimeInterval() == 0);
-    assertTrue(dagInfo.getStartTime() > 0);
-    if (dagInfo.getStatus().equalsIgnoreCase(DAGState.SUCCEEDED.toString())) {
-      assertTrue(dagInfo.getFinishTime() >= dagInfo.getStartTime());
-    }
-    assertTrue(dagInfo.getFinishTimeInterval() > dagInfo.getStartTimeInterval());
-
-    assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime());
-    assertTrue(dagInfo.getTimeTaken() > 0);
-
-    //Verify all vertices
-    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
-      verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0);
-    }
-
-    VertexInfo fastestVertex = dagInfo.getFastestVertex();
-    assertTrue(fastestVertex != null);
-
-    if (dagInfo.getStatus().equals(DAGState.SUCCEEDED)) {
-      assertTrue(dagInfo.getSlowestVertex() != null);
-    }
-  }
-
-  private void verifyVertex(VertexInfo vertexInfo, boolean hasFailedTasks) {
-    assertTrue(vertexInfo != null);
-    if (hasFailedTasks) {
-      assertTrue(vertexInfo.getFailedTasksCount() > 0);
-    }
-    assertTrue(vertexInfo.getStartTimeInterval() > 0);
-    assertTrue(vertexInfo.getStartTime() > 0);
-    assertTrue(vertexInfo.getFinishTimeInterval() > 0);
-    assertTrue(vertexInfo.getStartTimeInterval() < vertexInfo.getFinishTimeInterval());
-    assertTrue(vertexInfo.getVertexName() != null);
-    if (!hasFailedTasks) {
-      assertTrue(vertexInfo.getFinishTime() > 0);
-      assertTrue(vertexInfo.getFailedTasks().size() == 0);
-      assertTrue(vertexInfo.getSucceededTasksCount() == vertexInfo.getSuccessfulTasks().size());
-      assertTrue(vertexInfo.getFailedTasksCount() == 0);
-      assertTrue(vertexInfo.getAvgTaskDuration() > 0);
-      assertTrue(vertexInfo.getMaxTaskDuration() > 0);
-      assertTrue(vertexInfo.getMinTaskDuration() > 0);
-      assertTrue(vertexInfo.getTimeTaken() > 0);
-      assertTrue(vertexInfo.getStatus().equalsIgnoreCase(VertexState.SUCCEEDED.toString()));
-      assertTrue(vertexInfo.getCompletedTasksCount() > 0);
-      assertTrue(vertexInfo.getFirstTaskToStart() != null);
-      assertTrue(vertexInfo.getSucceededTasksCount() > 0);
-      assertTrue(vertexInfo.getTasks().size() > 0);
-    }
-
-    for (TaskInfo taskInfo : vertexInfo.getTasks()) {
-      if (taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())) {
-        verifyTask(taskInfo, false);
-      }
-    }
-
-    for (TaskInfo taskInfo : vertexInfo.getFailedTasks()) {
-      verifyTask(taskInfo, true);
-    }
-
-    assertTrue(vertexInfo.getProcessorClassName() != null);
-    assertTrue(vertexInfo.getStatus() != null);
-    assertTrue(vertexInfo.getDagInfo() != null);
-    assertTrue(vertexInfo.getInitTimeInterval() > 0);
-    assertTrue(vertexInfo.getNumTasks() > 0);
-  }
-
-  private void verifyTask(TaskInfo taskInfo, boolean hasFailedAttempts) {
-    assertTrue(taskInfo != null);
-    assertTrue(taskInfo.getStatus() != null);
-    assertTrue(taskInfo.getStartTimeInterval() > 0);
-
-    //Not testing for killed attempts. So if there are no failures, it should succeed
-    if (!hasFailedAttempts) {
-      assertTrue(taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString()));
-      assertTrue(taskInfo.getFinishTimeInterval() > 0 && taskInfo.getFinishTime() > taskInfo
-          .getFinishTimeInterval());
-      assertTrue(
-          taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > taskInfo.getStartTimeInterval());
-      assertTrue(taskInfo.getSuccessfulAttemptId() != null);
-      assertTrue(taskInfo.getSuccessfulTaskAttempt() != null);
-    }
-    assertTrue(taskInfo.getTaskId() != null);
-
-    for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
-      verifyTaskAttemptInfo(attemptInfo);
-    }
-  }
-
-  private void verifyTaskAttemptInfo(TaskAttemptInfo attemptInfo) {
-    if (attemptInfo.getStatus() != null && attemptInfo.getStatus()
-        .equals(TaskAttemptState.SUCCEEDED)) {
-      assertTrue(attemptInfo.getStartTimeInterval() > 0);
-      assertTrue(attemptInfo.getFinishTimeInterval() > 0);
-      assertTrue(attemptInfo.getStartTime() > 0);
-      assertTrue(attemptInfo.getFinishTime() > 0);
-      assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime());
-      assertTrue(attemptInfo.getFinishTime() > attemptInfo.getFinishTimeInterval());
-      assertTrue(attemptInfo.getStartTime() > attemptInfo.getStartTimeInterval());
-      assertTrue(attemptInfo.getNodeId() != null);
-      assertTrue(attemptInfo.getTimeTaken() != -1);
-      assertTrue(attemptInfo.getEvents() != null);
-      assertTrue(attemptInfo.getTezCounters() != null);
-      assertTrue(attemptInfo.getContainer() != null);
-    }
-    assertTrue(attemptInfo.getTaskInfo() != null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
new file mode 100644
index 0000000..c89acb2
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -0,0 +1,785 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.examples.WordCount;
+import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.SimpleHistoryParser;
+import org.apache.tez.history.parser.datamodel.BaseInfo;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.EdgeInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VersionInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.tests.MiniTezClusterWithTimeline;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestHistoryParser {
+
+  private static MiniDFSCluster miniDFSCluster;
+  private static MiniTezClusterWithTimeline miniTezCluster;
+
+  //location within miniHDFS cluster's hdfs
+  private static Path inputLoc = new Path("/tmp/sample.txt");
+
+  private final static String INPUT = "Input";
+  private final static String OUTPUT = "Output";
+  private final static String TOKENIZER = "Tokenizer";
+  private final static String SUMMATION = "Summation";
+  private final static String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/";
+  private final static String HISTORY_TXT = "history.txt";
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem fs;
+  private static String TEST_ROOT_DIR =
+      "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + "-tmpDir";
+  private static String TEZ_BASE_DIR =
+      "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + "-tez";
+  private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download";
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false);
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+    miniDFSCluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    fs = miniDFSCluster.getFileSystem();
+    conf.set("fs.defaultFS", fs.getUri().toString());
+
+    setupTezCluster();
+  }
+
+  @AfterClass
+  public static void shutdownCluster() {
+    try {
+      if (miniDFSCluster != null) {
+        miniDFSCluster.shutdown();
+      }
+      if (miniTezCluster != null) {
+        miniTezCluster.stop();
+      }
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
+        FileUtils.deleteDirectory(new File(TEZ_BASE_DIR));
+      } catch (IOException e) {
+        //safe to ignore
+      }
+    }
+  }
+
+  // @Before
+  public static void setupTezCluster() throws Exception {
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2);
+
+    //Enable per edge counters
+    conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService
+        .class.getName());
+
+    conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR);
+
+    miniTezCluster =
+        new MiniTezClusterWithTimeline(TEZ_BASE_DIR, 1, 1, 1, true);
+
+    miniTezCluster.init(conf);
+    miniTezCluster.start();
+
+    createSampleFile(inputLoc);
+
+    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188");
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        ATSHistoryLoggingService.class.getName());
+
+  }
+
+
+  /**
+   * Run a word count example in mini cluster and check if it is possible to download
+   * data from ATS and parse it. Also, run with SimpleHistoryLogging option and verify
+   * if it matches with ATS data.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testParserWithSuccessfulJob() throws Exception {
+    //Run basic word count example.
+    String dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
+        WordCount.SumProcessor.class.getName(), "WordCount", true);
+
+    //Export the data from ATS
+    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
+
+    int result = ATSImportTool.process(args);
+    assertTrue(result == 0);
+
+    //Parse ATS data and verify results
+    DagInfo dagInfoFromATS = getDagInfo(dagId);
+    verifyDagInfo(dagInfoFromATS, true);
+    verifyJobSpecificInfo(dagInfoFromATS);
+
+    //Now run with SimpleHistoryLogging
+    dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
+        WordCount.SumProcessor.class.getName(), "WordCount", false);
+    Thread.sleep(10000); //For all flushes to happen and to avoid half-cooked download.
+
+    DagInfo shDagInfo = getDagInfoFromSimpleHistory(dagId);
+    verifyDagInfo(shDagInfo, false);
+    verifyJobSpecificInfo(shDagInfo);
+
+    //Compare dagInfo by parsing ATS data with DagInfo obtained by parsing SimpleHistoryLog
+    isDAGEqual(dagInfoFromATS, shDagInfo);
+  }
+
+  private DagInfo getDagInfoFromSimpleHistory(String dagId) throws TezException, IOException {
+    TezDAGID tezDAGID = TezDAGID.fromString(dagId);
+    ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(tezDAGID
+        .getApplicationId(), 1);
+    Path historyPath = new Path(conf.get("fs.defaultFS")
+        + SIMPLE_HISTORY_DIR + HISTORY_TXT + "."
+        + applicationAttemptId);
+    FileSystem fs = historyPath.getFileSystem(conf);
+
+    Path localPath = new Path(DOWNLOAD_DIR, HISTORY_TXT);
+    fs.copyToLocalFile(historyPath, localPath);
+    File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT);
+
+    //Now parse via SimpleHistory
+    SimpleHistoryParser parser = new SimpleHistoryParser(localFile);
+    DagInfo dagInfo = parser.getDAGData(dagId);
+    assertTrue(dagInfo.getDagId().equals(dagId));
+    return dagInfo;
+  }
+
+  private void verifyJobSpecificInfo(DagInfo dagInfo) {
+    //Job specific
+    assertTrue(dagInfo.getNumVertices() == 2);
+    assertTrue(dagInfo.getName().equals("WordCount"));
+    assertTrue(dagInfo.getVertex(TOKENIZER).getProcessorClassName().equals(
+        WordCount.TokenProcessor.class.getName()));
+    assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName()
+        .equals(WordCount.SumProcessor.class.getName()));
+    assertTrue(dagInfo.getEdges().size() == 1);
+    EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next();
+    assertTrue(edgeInfo.getDataMovementType().
+        equals(EdgeProperty.DataMovementType.SCATTER_GATHER.toString()));
+    assertTrue(edgeInfo.getSourceVertex().getVertexName().equals(TOKENIZER));
+    assertTrue(edgeInfo.getDestinationVertex().getVertexName().equals(SUMMATION));
+    assertTrue(edgeInfo.getInputVertexName().equals(TOKENIZER));
+    assertTrue(edgeInfo.getOutputVertexName().equals(SUMMATION));
+    assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName()));
+    assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName()));
+    assertTrue(dagInfo.getVertices().size() == 2);
+    String lastSourceTA = null;
+    String lastDataEventSourceTA = null;
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      assertTrue(vertexInfo.getKilledTasksCount() == 0);
+      assertTrue(vertexInfo.getInitRequestedTime() > 0);
+      assertTrue(vertexInfo.getInitTime() > 0);
+      assertTrue(vertexInfo.getStartRequestedTime() > 0);
+      assertTrue(vertexInfo.getStartTime() > 0);
+      assertTrue(vertexInfo.getFinishTime() > 0);
+      long finishTime = 0;
+      for (TaskInfo taskInfo : vertexInfo.getTasks()) {
+        assertTrue(taskInfo.getNumberOfTaskAttempts() == 1);
+        assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0);
+        assertTrue(taskInfo.getMinTaskAttemptDuration() >= 0);
+        assertTrue(taskInfo.getAvgTaskAttemptDuration() >= 0);
+        assertTrue(taskInfo.getLastTaskAttemptToFinish() != null);
+        assertTrue(taskInfo.getContainersMapping().size() > 0);
+        assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
+        assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
+        assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
+        List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts();
+        if (vertexInfo.getVertexName().equals(TOKENIZER)) {
+          // get the last task to finish and track its successful attempt
+          if (finishTime < taskInfo.getFinishTime()) {
+            finishTime = taskInfo.getFinishTime();
+            lastSourceTA = taskInfo.getSuccessfulAttemptId();
+          }
+        } else {
+          for (TaskAttemptInfo attempt : attempts) {
+            assertTrue(attempt.getLastDataEventTime() > 0);
+            if (lastDataEventSourceTA == null) {
+              lastDataEventSourceTA = attempt.getLastDataEventSourceTA();
+            } else {
+              // all attempts should have the same last data event source TA
+              assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA()));
+            }
+          }
+        }
+        for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+          assertTrue(attemptInfo.getStartTimeInterval() > 0);
+          assertTrue(attemptInfo.getScheduledTimeInterval() > 0);
+        }
+      }
+      assertTrue(vertexInfo.getLastTaskToFinish() != null);
+      if (vertexInfo.getVertexName().equals(TOKENIZER)) {
+        assertTrue(vertexInfo.getInputEdges().size() == 0);
+        assertTrue(vertexInfo.getOutputEdges().size() == 1);
+        assertTrue(vertexInfo.getOutputVertices().size() == 1);
+        assertTrue(vertexInfo.getInputVertices().size() == 0);
+      } else {
+        assertTrue(vertexInfo.getInputEdges().size() == 1);
+        assertTrue(vertexInfo.getOutputEdges().size() == 0);
+        assertTrue(vertexInfo.getOutputVertices().size() == 0);
+        assertTrue(vertexInfo.getInputVertices().size() == 1);
+      }
+    }
+    assertTrue(lastSourceTA.equals(lastDataEventSourceTA));
+  }
+
+  /**
+   * Run a word count example in mini cluster.
+   * Provide invalid URL for ATS.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testParserWithSuccessfulJob_InvalidATS() throws Exception {
+    //Run basic word count example.
+    String dagId =  runWordCount(WordCount.TokenProcessor.class.getName(),
+        WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL", true);
+
+    //Export the data from ATS
+    String atsAddress = "--atsAddress=http://atsHost:8188";
+    String[] args = { "--dagId=" + dagId,
+        "--downloadDir=" + DOWNLOAD_DIR,
+        atsAddress
+      };
+
+    int result = ATSImportTool.process(args);
+    assertTrue(result == -1);
+  }
+
+  /**
+   * Run a failed job and parse the data from ATS
+   */
+  @Test
+  public void testParserWithFailedJob() throws Exception {
+    //Run a job which would fail
+    String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), FailProcessor.class
+        .getName(), "WordCount-With-Exception", true);
+
+    //Export the data from ATS
+    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
+
+    int result = ATSImportTool.process(args);
+    assertTrue(result == 0);
+
+    //Parse ATS data
+    DagInfo dagInfo = getDagInfo(dagId);
+
+    //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
+    verifyDagInfo(dagInfo, true);
+
+    //Dag specific
+    VertexInfo summationVertex = dagInfo.getVertex(SUMMATION);
+    assertTrue(summationVertex.getFailedTasks().size() == 1); //1 task, 4 attempts failed
+    assertTrue(summationVertex.getFailedTasks().get(0).getFailedTaskAttempts().size() == 4);
+    assertTrue(summationVertex.getStatus().equals(VertexState.FAILED.toString()));
+
+    assertTrue(dagInfo.getFailedVertices().size() == 1);
+    assertTrue(dagInfo.getFailedVertices().get(0).getVertexName().equals(SUMMATION));
+    assertTrue(dagInfo.getSuccessfullVertices().size() == 1);
+    assertTrue(dagInfo.getSuccessfullVertices().get(0).getVertexName().equals(TOKENIZER));
+
+    assertTrue(dagInfo.getStatus().equals(DAGState.FAILED.toString()));
+
+    verifyCounter(dagInfo.getCounter(DAGCounter.NUM_FAILED_TASKS.toString()), null, 4);
+    verifyCounter(dagInfo.getCounter(DAGCounter.NUM_SUCCEEDED_TASKS.toString()), null, 1);
+    verifyCounter(dagInfo.getCounter(DAGCounter.TOTAL_LAUNCHED_TASKS.toString()), null, 5);
+
+    verifyCounter(dagInfo.getCounter(TaskCounter.INPUT_RECORDS_PROCESSED.toString()),
+        "TaskCounter_Tokenizer_INPUT_Input", 10);
+    verifyCounter(dagInfo.getCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ.toString()),
+        "TaskCounter_Tokenizer_OUTPUT_Summation", 0);
+    verifyCounter(dagInfo.getCounter(TaskCounter.OUTPUT_RECORDS.toString()),
+        "TaskCounter_Tokenizer_OUTPUT_Summation",
+        20); //Every line has 2 words. 10 lines x 2 words = 20
+    verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()),
+        "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above
+
+    for (TaskInfo taskInfo : summationVertex.getTasks()) {
+      String lastAttemptId = null;
+      for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+        if (lastAttemptId != null) {
+          // failed attempt should be causal TA of next attempt
+          assertTrue(lastAttemptId.equals(attemptInfo.getSchedulingCausalTA()));
+        }
+        lastAttemptId = attemptInfo.getTaskAttemptId();
+      }
+    }
+
+    //TODO: Need to check for SUMMATION vertex counters. Since all attempts are failed, counters are not getting populated.
+    //TaskCounter.REDUCE_INPUT_RECORDS
+
+    //Verify if the processor exception is given in diagnostics
+    assertTrue(dagInfo.getDiagnostics().contains("Failing this processor for some reason"));
+
+  }
+
+  /**
+   * Adding explicit equals here instead of in DAG/Vertex/Edge where hashCode also needs to
+   * change. Also, some custom comparisons are done here for unit testing.
+   */
+  private void isDAGEqual(DagInfo dagInfo1, DagInfo dagInfo2) {
+    assertNotNull(dagInfo1);
+    assertNotNull(dagInfo2);
+    assertEquals(dagInfo1.getStatus(), dagInfo2.getStatus());
+    isEdgeEqual(dagInfo1.getEdges(), dagInfo2.getEdges());
+    isVertexEqual(dagInfo1.getVertices(), dagInfo2.getVertices());
+  }
+
+  private void isVertexEqual(VertexInfo vertexInfo1, VertexInfo vertexInfo2) {
+    assertTrue(vertexInfo1 != null);
+    assertTrue(vertexInfo2 != null);
+    assertTrue(vertexInfo1.getVertexName().equals(vertexInfo2.getVertexName()));
+    assertTrue(vertexInfo1.getProcessorClassName().equals(vertexInfo2.getProcessorClassName()));
+    assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks());
+    assertTrue(vertexInfo1.getCompletedTasksCount() == vertexInfo2.getCompletedTasksCount());
+    assertTrue(vertexInfo1.getStatus().equals(vertexInfo2.getStatus()));
+
+    isEdgeEqual(vertexInfo1.getInputEdges(), vertexInfo2.getInputEdges());
+    isEdgeEqual(vertexInfo1.getOutputEdges(), vertexInfo2.getOutputEdges());
+
+    assertTrue(vertexInfo1.getInputVertices().size() == vertexInfo2.getInputVertices().size());
+    assertTrue(vertexInfo1.getOutputVertices().size() == vertexInfo2.getOutputVertices().size());
+
+    assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks());
+    isTaskEqual(vertexInfo1.getTasks(), vertexInfo2.getTasks());
+  }
+
+  private void isVertexEqual(List<VertexInfo> vertexList1, List<VertexInfo> vertexList2) {
+    assertTrue("Vertices sizes should be the same", vertexList1.size() == vertexList2.size());
+    Iterator<VertexInfo> it1 = vertexList1.iterator();
+    Iterator<VertexInfo> it2 = vertexList2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      VertexInfo info1 = it1.next();
+      VertexInfo info2 = it2.next();
+      isVertexEqual(info1, info2);
+    }
+  }
+
+  private void isEdgeEqual(EdgeInfo edgeInfo1, EdgeInfo edgeInfo2) {
+    assertTrue(edgeInfo1 != null);
+    assertTrue(edgeInfo2 != null);
+    String info1 = edgeInfo1.toString();
+    String info2 = edgeInfo1.toString();
+    assertTrue(info1.equals(info2));
+  }
+
+  private void isEdgeEqual(Collection<EdgeInfo> info1, Collection<EdgeInfo> info2) {
+    assertTrue("sizes should be the same", info1.size() == info1.size());
+    Iterator<EdgeInfo> it1 = info1.iterator();
+    Iterator<EdgeInfo> it2 = info2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      isEdgeEqual(it1.next(), it2.next());
+    }
+  }
+
+  private void isTaskEqual(Collection<TaskInfo> info1, Collection<TaskInfo> info2) {
+    assertTrue("sizes should be the same", info1.size() == info1.size());
+    Iterator<TaskInfo> it1 = info1.iterator();
+    Iterator<TaskInfo> it2 = info2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      isTaskEqual(it1.next(), it2.next());
+    }
+  }
+
+  private void isTaskEqual(TaskInfo taskInfo1, TaskInfo taskInfo2) {
+    assertTrue(taskInfo1 != null);
+    assertTrue(taskInfo2 != null);
+    assertTrue(taskInfo1.getVertexInfo() != null);
+    assertTrue(taskInfo2.getVertexInfo() != null);
+    assertTrue(taskInfo1.getStatus().equals(taskInfo2.getStatus()));
+    assertTrue(
+        taskInfo1.getVertexInfo().getVertexName()
+            .equals(taskInfo2.getVertexInfo().getVertexName()));
+    isTaskAttemptEqual(taskInfo1.getTaskAttempts(), taskInfo2.getTaskAttempts());
+
+    //Verify counters
+    isCountersSame(taskInfo1, taskInfo2);
+  }
+
+  private void isCountersSame(BaseInfo info1, BaseInfo info2) {
+    isCounterSame(info1.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()),
+        info2.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.SPILLED_RECORDS.name()),
+        info2.getCounter(TaskCounter.SPILLED_RECORDS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()),
+        info2.getCounter(TaskCounter.OUTPUT_RECORDS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.OUTPUT_BYTES.name()),
+        info2.getCounter(TaskCounter.OUTPUT_BYTES.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()),
+        info2.getCounter(TaskCounter.OUTPUT_RECORDS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()),
+        info2.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()),
+        info2.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()));
+  }
+
+  private void isCounterSame(Map<String, TezCounter> counter1, Map<String, TezCounter> counter2) {
+    for (Map.Entry<String, TezCounter> entry : counter1.entrySet()) {
+      String source = entry.getKey();
+      long val = entry.getValue().getValue();
+
+      //check if other counter has the same value
+      assertTrue(counter2.containsKey(entry.getKey()));
+      assertTrue(counter2.get(entry.getKey()).getValue() == val);
+    }
+  }
+
+  private void isTaskAttemptEqual(Collection<TaskAttemptInfo> info1,
+      Collection<TaskAttemptInfo> info2) {
+    assertTrue("sizes should be the same", info1.size() == info1.size());
+    Iterator<TaskAttemptInfo> it1 = info1.iterator();
+    Iterator<TaskAttemptInfo> it2 = info2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      isTaskAttemptEqual(it1.next(), it2.next());
+    }
+  }
+
+  private void isTaskAttemptEqual(TaskAttemptInfo info1, TaskAttemptInfo info2) {
+    assertTrue(info1 != null);
+    assertTrue(info2 != null);
+    assertTrue(info1.getTaskInfo() != null);
+    assertTrue(info2.getTaskInfo() != null);
+    assertTrue(info1.getStatus().equals(info2.getStatus()));
+    assertTrue(info1.getTaskInfo().getVertexInfo().getVertexName().equals(info2.getTaskInfo()
+        .getVertexInfo().getVertexName()));
+
+    //Verify counters
+    isCountersSame(info1, info2);
+  }
+
+
+  /**
+   * Create sample file for wordcount program
+   *
+   * @param inputLoc
+   * @throws IOException
+   */
+  private static void createSampleFile(Path inputLoc) throws IOException {
+    fs.deleteOnExit(inputLoc);
+    FSDataOutputStream out = fs.create(inputLoc);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+    for (int i = 0; i < 10; i++) {
+      writer.write("Sample " + RandomStringUtils.randomAlphanumeric(5));
+      writer.newLine();
+    }
+    writer.close();
+  }
+
+  private DagInfo getDagInfo(String dagId) throws TezException {
+    //Parse downloaded contents
+    File downloadedFile = new File(DOWNLOAD_DIR
+        + Path.SEPARATOR + dagId
+        + Path.SEPARATOR + dagId + ".zip");
+    ATSFileParser parser = new ATSFileParser(downloadedFile);
+    DagInfo dagInfo = parser.getDAGData(dagId);
+    assertTrue(dagInfo.getDagId().equals(dagId));
+    return dagInfo;
+  }
+
+  private void verifyCounter(Map<String, TezCounter> counterMap,
+      String counterGroupName, long expectedVal) {
+    //Iterate through group-->tezCounter
+    for (Map.Entry<String, TezCounter> entry : counterMap.entrySet()) {
+      if (counterGroupName != null) {
+        if (entry.getKey().equals(counterGroupName)) {
+          assertTrue(entry.getValue().getValue() == expectedVal);
+        }
+      } else {
+        assertTrue(entry.getValue().getValue() == expectedVal);
+      }
+    }
+  }
+
+  TezClient getTezClient(boolean withTimeline) throws Exception {
+    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    if (withTimeline) {
+      tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, withTimeline);
+      tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188");
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          ATSHistoryLoggingService.class.getName());
+    } else {
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          SimpleHistoryLoggingService.class.getName());
+    }
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+
+    TezClient tezClient = TezClient.create("WordCount", tezConf, false);
+    tezClient.start();
+    tezClient.waitTillReady();
+    return tezClient;
+  }
+
+  private String runWordCount(String tokenizerProcessor, String summationProcessor,
+      String dagName, boolean withTimeline)
+      throws Exception {
+    //HDFS path
+    Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis());
+
+    DataSourceDescriptor dataSource = MRInput.createConfigBuilder(conf,
+        TextInputFormat.class, inputLoc.toString()).build();
+
+    DataSinkDescriptor dataSink =
+        MROutput.createConfigBuilder(conf, TextOutputFormat.class, outputLoc.toString()).build();
+
+    Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create(
+        tokenizerProcessor)).addDataSource(INPUT, dataSource);
+
+    OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig
+        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
+            HashPartitioner.class.getName()).build();
+
+    Vertex summationVertex = Vertex.create(SUMMATION,
+        ProcessorDescriptor.create(summationProcessor), 1).addDataSink(OUTPUT, dataSink);
+
+    // Create DAG and add the vertices. Connect the producer and consumer vertices via the edge
+    DAG dag = DAG.create(dagName);
+    dag.addVertex(tokenizerVertex).addVertex(summationVertex).addEdge(
+        Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty()));
+
+    TezClient tezClient = getTezClient(withTimeline);
+    DAGClient client = tezClient.submitDAG(dag);
+    client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+    TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), 1);
+
+    if (tezClient != null) {
+      tezClient.stop();
+    }
+    return tezDAGID.toString();
+  }
+
+  /**
+   * Processor which would just throw exception.
+   */
+  public static class FailProcessor extends SimpleMRProcessor {
+    public FailProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      throw new Exception("Failing this processor for some reason");
+    }
+  }
+
+  private void verifyDagInfo(DagInfo dagInfo, boolean ats) {
+    if (ats) {
+      VersionInfo versionInfo = dagInfo.getVersionInfo();
+      assertTrue(versionInfo != null); //should be present post 0.5.4
+      assertTrue(versionInfo.getVersion() != null);
+      assertTrue(versionInfo.getRevision() != null);
+      assertTrue(versionInfo.getBuildTime() != null);
+    }
+
+    assertTrue(dagInfo.getStartTime() > 0);
+    assertTrue(dagInfo.getFinishTimeInterval() > 0);
+    assertTrue(dagInfo.getStartTimeInterval() == 0);
+    assertTrue(dagInfo.getStartTime() > 0);
+    if (dagInfo.getStatus().equalsIgnoreCase(DAGState.SUCCEEDED.toString())) {
+      assertTrue(dagInfo.getFinishTime() >= dagInfo.getStartTime());
+    }
+    assertTrue(dagInfo.getFinishTimeInterval() > dagInfo.getStartTimeInterval());
+
+    assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime());
+    assertTrue(dagInfo.getTimeTaken() > 0);
+
+    //Verify all vertices
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0);
+    }
+
+    VertexInfo fastestVertex = dagInfo.getFastestVertex();
+    assertTrue(fastestVertex != null);
+
+    if (dagInfo.getStatus().equals(DAGState.SUCCEEDED)) {
+      assertTrue(dagInfo.getSlowestVertex() != null);
+    }
+  }
+
+  private void verifyVertex(VertexInfo vertexInfo, boolean hasFailedTasks) {
+    assertTrue(vertexInfo != null);
+    if (hasFailedTasks) {
+      assertTrue(vertexInfo.getFailedTasksCount() > 0);
+    }
+    assertTrue(vertexInfo.getStartTimeInterval() > 0);
+    assertTrue(vertexInfo.getStartTime() > 0);
+    assertTrue(vertexInfo.getFinishTimeInterval() > 0);
+    assertTrue(vertexInfo.getStartTimeInterval() < vertexInfo.getFinishTimeInterval());
+    assertTrue(vertexInfo.getVertexName() != null);
+    if (!hasFailedTasks) {
+      assertTrue(vertexInfo.getFinishTime() > 0);
+      assertTrue(vertexInfo.getFailedTasks().size() == 0);
+      assertTrue(vertexInfo.getSucceededTasksCount() == vertexInfo.getSuccessfulTasks().size());
+      assertTrue(vertexInfo.getFailedTasksCount() == 0);
+      assertTrue(vertexInfo.getAvgTaskDuration() > 0);
+      assertTrue(vertexInfo.getMaxTaskDuration() > 0);
+      assertTrue(vertexInfo.getMinTaskDuration() > 0);
+      assertTrue(vertexInfo.getTimeTaken() > 0);
+      assertTrue(vertexInfo.getStatus().equalsIgnoreCase(VertexState.SUCCEEDED.toString()));
+      assertTrue(vertexInfo.getCompletedTasksCount() > 0);
+      assertTrue(vertexInfo.getFirstTaskToStart() != null);
+      assertTrue(vertexInfo.getSucceededTasksCount() > 0);
+      assertTrue(vertexInfo.getTasks().size() > 0);
+    }
+
+    for (TaskInfo taskInfo : vertexInfo.getTasks()) {
+      if (taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())) {
+        verifyTask(taskInfo, false);
+      }
+    }
+
+    for (TaskInfo taskInfo : vertexInfo.getFailedTasks()) {
+      verifyTask(taskInfo, true);
+    }
+
+    assertTrue(vertexInfo.getProcessorClassName() != null);
+    assertTrue(vertexInfo.getStatus() != null);
+    assertTrue(vertexInfo.getDagInfo() != null);
+    assertTrue(vertexInfo.getInitTimeInterval() > 0);
+    assertTrue(vertexInfo.getNumTasks() > 0);
+  }
+
+  private void verifyTask(TaskInfo taskInfo, boolean hasFailedAttempts) {
+    assertTrue(taskInfo != null);
+    assertTrue(taskInfo.getStatus() != null);
+    assertTrue(taskInfo.getStartTimeInterval() > 0);
+
+    //Not testing for killed attempts. So if there are no failures, it should succeed
+    if (!hasFailedAttempts) {
+      assertTrue(taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString()));
+      assertTrue(taskInfo.getFinishTimeInterval() > 0 && taskInfo.getFinishTime() > taskInfo
+          .getFinishTimeInterval());
+      assertTrue(
+          taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > taskInfo.getStartTimeInterval());
+      assertTrue(taskInfo.getSuccessfulAttemptId() != null);
+      assertTrue(taskInfo.getSuccessfulTaskAttempt() != null);
+    }
+    assertTrue(taskInfo.getTaskId() != null);
+
+    for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+      verifyTaskAttemptInfo(attemptInfo);
+    }
+  }
+
+  private void verifyTaskAttemptInfo(TaskAttemptInfo attemptInfo) {
+    if (attemptInfo.getStatus() != null && attemptInfo.getStatus()
+        .equals(TaskAttemptState.SUCCEEDED)) {
+      assertTrue(attemptInfo.getStartTimeInterval() > 0);
+      assertTrue(attemptInfo.getFinishTimeInterval() > 0);
+      assertTrue(attemptInfo.getStartTime() > 0);
+      assertTrue(attemptInfo.getFinishTime() > 0);
+      assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime());
+      assertTrue(attemptInfo.getFinishTime() > attemptInfo.getFinishTimeInterval());
+      assertTrue(attemptInfo.getStartTime() > attemptInfo.getStartTimeInterval());
+      assertTrue(attemptInfo.getNodeId() != null);
+      assertTrue(attemptInfo.getTimeTaken() != -1);
+      assertTrue(attemptInfo.getEvents() != null);
+      assertTrue(attemptInfo.getTezCounters() != null);
+      assertTrue(attemptInfo.getContainer() != null);
+    }
+    assertTrue(attemptInfo.getTaskInfo() != null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/pom.xml b/tez-tools/analyzers/job-analyzer/pom.xml
index fe28b14..36b12fe 100644
--- a/tez-tools/analyzers/job-analyzer/pom.xml
+++ b/tez-tools/analyzers/job-analyzer/pom.xml
@@ -38,6 +38,15 @@
       <artifactId>tez-history-parser</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.plutext</groupId>
+      <artifactId>jaxb-svg11</artifactId>
+      <version>1.0.2</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
index 4151a90..27ad95e 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
@@ -19,15 +19,14 @@
 package org.apache.tez.analyzer;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.common.base.Strings;
 import org.apache.tez.dag.api.TezException;
 
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.nio.charset.Charset;
@@ -99,7 +98,7 @@ public class CSVResult implements Result {
 
       StringBuilder sb = new StringBuilder();
       for(int i=0;i<record.length;i++) {
-        sb.append(Strings.isNullOrEmpty(record[i]) ? record[i] : " ");
+        sb.append(!Strings.isNullOrEmpty(record[i]) ? record[i] : " ");
         if (i < record.length - 1) {
           sb.append(",");
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index 6748f3f..88d45f3 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -19,6 +19,8 @@
 package org.apache.tez.analyzer.plugins;
 
 import com.google.common.base.Functions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -26,10 +28,13 @@ import com.google.common.collect.Ordering;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.analyzer.Analyzer;
 import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.utils.Utils;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 import org.apache.tez.history.parser.datamodel.VertexInfo;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -43,24 +48,43 @@ public class CriticalPathAnalyzer implements Analyzer {
 
   private final CSVResult csvResult;
 
+  private static final String DOT_FILE_DIR = "tez.critical-path.analyzer.dot.output.loc";
+  private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory
+
+  private final String dotFileLocation;
+
+  private static final String CONNECTOR = "-->";
+
   public CriticalPathAnalyzer(Configuration config) {
     this.config = config;
     this.csvResult = new CSVResult(headers);
+    this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT);
   }
 
   @Override public void analyze(DagInfo dagInfo) throws TezException {
     Map<String, Long> result = Maps.newLinkedHashMap();
     getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result);
 
-    System.out.println();
-    System.out.println();
-
-    for (Map.Entry<String, Long> entry : sortByValues(result).entrySet()) {
+    Map<String, Long> sortedByValues = sortByValues(result);
+    for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) {
       List<String> record = Lists.newLinkedList();
       record.add(entry.getKey());
       record.add(entry.getValue() + "");
       csvResult.addRecord(record.toArray(new String[record.size()]));
-      System.out.println(entry.getKey() + ", " + entry.getValue());
+    }
+
+    String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + ".dot";
+    try {
+      List<String> criticalVertices = null;
+      if (!sortedByValues.isEmpty()) {
+        String criticalPath = sortedByValues.keySet().iterator().next();
+        criticalVertices = getVertexNames(criticalPath);
+      } else {
+        criticalVertices = Lists.newLinkedList();
+      }
+      Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices);
+    } catch (IOException e) {
+      throw new TezException(e);
     }
   }
 
@@ -98,7 +122,7 @@ public class CriticalPathAnalyzer implements Analyzer {
 
     if (dest != null) {
       time += dest.getTimeTaken();
-      predecessor += destVertexName + "-->";
+      predecessor += destVertexName + CONNECTOR;
 
       for (VertexInfo incomingVertex : dest.getInputVertices()) {
         getCriticalPath(predecessor, incomingVertex, time, result);
@@ -107,4 +131,12 @@ public class CriticalPathAnalyzer implements Analyzer {
       result.put(predecessor, time);
     }
   }
+
+  private static List<String> getVertexNames(String criticalPath) {
+    if (Strings.isNullOrEmpty(criticalPath)) {
+      return Lists.newLinkedList();
+    }
+    return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split
+        (criticalPath));
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
index 67b4c51..7ed52da 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
@@ -108,7 +108,8 @@ public class LocalityAnalyzer implements Analyzer {
         record.add(otherTaskResult.avgRuntime + "");
 
         //Get the number of inputs to this vertex
-        record.add(vertexInfo.getInputEdges().size() + "");
+        record.add(vertexInfo.getInputEdges().size()
+            + vertexInfo.getAdditionalInputInfoList().size() + "");
 
         //Get the avg HDFS bytes read in this vertex for different type of locality
         record.add(dataLocalResult.avgHDFSBytesRead + "");