You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2015/12/07 23:42:55 UTC

[2/5] tez git commit: TEZ-2973. Backport Analyzers to branch-0.7

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
new file mode 100644
index 0000000..ec72df1
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Get locality information for tasks for vertices and get their task execution times.
+ * This would be helpeful to co-relate if the vertex runtime is anyways related to the data
+ * locality.
+ */
+public class LocalityAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+  private final String[] headers = { "vertexName", "numTasks", "dataLocalRatio", "rackLocalRatio",
+      "otherRatio", "avgDataLocalTaskRuntime", "avgRackLocalTaskRuntime",
+      "avgOtherLocalTaskRuntime", "noOfInputs", "avgHDFSBytesRead_DataLocal",
+      "avgHDFSBytesRead_RackLocal", "avgHDFSBytesRead_Others", "recommendation" };
+
+  private static final String DATA_LOCAL_RATIO = "tez.locality-analyzer.data.local.ratio";
+  private static final float DATA_LOCAL_RATIO_DEFAULT = 0.5f;
+
+  private final Configuration config;
+
+  private final CSVResult csvResult;
+
+  public LocalityAnalyzer(Configuration config) {
+    this.config = config;
+    csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      String vertexName = vertexInfo.getVertexName();
+
+      Map<String, TezCounter> dataLocalTask = vertexInfo.getCounter(DAGCounter.class.getName(),
+          DAGCounter.DATA_LOCAL_TASKS.toString());
+      Map<String, TezCounter> rackLocalTask = vertexInfo.getCounter(DAGCounter.class.getName(),
+          DAGCounter.RACK_LOCAL_TASKS.toString());
+
+      long dataLocalTasks = 0;
+      long rackLocalTasks = 0;
+
+      if (!dataLocalTask.isEmpty()) {
+        dataLocalTasks = dataLocalTask.get(DAGCounter.class.getName()).getValue();
+      }
+
+      if (!rackLocalTask.isEmpty()) {
+        rackLocalTasks = rackLocalTask.get(DAGCounter.class.getName()).getValue();
+      }
+
+      long totalVertexTasks = vertexInfo.getNumTasks();
+
+      if (dataLocalTasks > 0 || rackLocalTasks > 0) {
+        //compute locality details.
+        float dataLocalRatio = dataLocalTasks * 1.0f / totalVertexTasks;
+        float rackLocalRatio = rackLocalTasks * 1.0f / totalVertexTasks;
+        float othersRatio = (totalVertexTasks - (dataLocalTasks + rackLocalTasks)) * 1.0f /
+            totalVertexTasks;
+
+        List<String> record = Lists.newLinkedList();
+        record.add(vertexName);
+        record.add(totalVertexTasks + "");
+        record.add(dataLocalRatio + "");
+        record.add(rackLocalRatio + "");
+        record.add(othersRatio + "");
+
+        TaskAttemptDetails dataLocalResult = computeAverages(vertexInfo,
+            DAGCounter.DATA_LOCAL_TASKS);
+        TaskAttemptDetails rackLocalResult = computeAverages(vertexInfo,
+            DAGCounter.RACK_LOCAL_TASKS);
+        TaskAttemptDetails otherTaskResult = computeAverages(vertexInfo,
+            DAGCounter.OTHER_LOCAL_TASKS);
+
+        record.add(dataLocalResult.avgRuntime + "");
+        record.add(rackLocalResult.avgRuntime + "");
+        record.add(otherTaskResult.avgRuntime + "");
+
+        //Get the number of inputs to this vertex
+        record.add(vertexInfo.getInputEdges().size()
+            + vertexInfo.getAdditionalInputInfoList().size() + "");
+
+        //Get the avg HDFS bytes read in this vertex for different type of locality
+        record.add(dataLocalResult.avgHDFSBytesRead + "");
+        record.add(rackLocalResult.avgHDFSBytesRead + "");
+        record.add(otherTaskResult.avgHDFSBytesRead + "");
+
+        String recommendation = "";
+        if (dataLocalRatio < config.getFloat(DATA_LOCAL_RATIO, DATA_LOCAL_RATIO_DEFAULT)) {
+          recommendation = "Data locality is poor for this vertex. Try tuning "
+              + TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS + ", "
+              + TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED + ", "
+              + TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED;
+        }
+
+        record.add(recommendation);
+        csvResult.addRecord(record.toArray(new String[record.size()]));
+      }
+    }
+  }
+
+  /**
+   * Compute counter averages for specific vertex
+   *
+   * @param vertexInfo
+   * @param counter
+   * @return task attempt details
+   */
+  private TaskAttemptDetails computeAverages(VertexInfo vertexInfo, DAGCounter counter) {
+    long totalTime = 0;
+    long totalTasks = 0;
+    long totalHDFSBytesRead = 0;
+
+    TaskAttemptDetails result = new TaskAttemptDetails();
+
+    for(TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+      Map<String, TezCounter> localityCounter = attemptInfo.getCounter(DAGCounter.class.getName(),
+          counter.toString());
+
+      if (!localityCounter.isEmpty() &&
+          localityCounter.get(DAGCounter.class.getName()).getValue() > 0) {
+        totalTime += attemptInfo.getTimeTaken();
+        totalTasks++;
+
+        //get HDFSBytes read counter
+        Map<String, TezCounter> hdfsBytesReadCounter = attemptInfo.getCounter(FileSystemCounter
+            .class.getName(), FileSystemCounter.HDFS_BYTES_READ.name());
+        for(Map.Entry<String, TezCounter> entry : hdfsBytesReadCounter.entrySet()) {
+          totalHDFSBytesRead += entry.getValue().getValue();
+        }
+      }
+    }
+    if (totalTasks > 0) {
+      result.avgRuntime = (totalTime * 1.0f / totalTasks);
+      result.avgHDFSBytesRead = (totalHDFSBytesRead * 1.0f / totalTasks);
+    }
+    return result;
+  }
+
+  @Override public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override public String getName() {
+    return "Locality Analyzer";
+  }
+
+  @Override public String getDescription() {
+    return "Analyze for locality information (data local, rack local, off-rack)";
+  }
+
+  @Override public Configuration getConfiguration() {
+    return config;
+  }
+
+  /**
+   * Placeholder for task attempt details
+   */
+  static class TaskAttemptDetails {
+    float avgHDFSBytesRead;
+    float avgRuntime;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    LocalityAnalyzer analyzer = new LocalityAnalyzer(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
new file mode 100644
index 0000000..57e91c6
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Analyze the time taken by merge phase, shuffle phase, time taken to do realistic work etc in
+ * tasks.
+ *
+ * Just dump REDUCE_INPUT_GROUPS, REDUCE_INPUT_RECORDS, its ratio and SHUFFLE_BYTES for tasks
+ * grouped by vertices. Provide time taken as well.  Just render it as a table for now.
+ *
+ */
+public class ShuffleTimeAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+  /**
+   * ratio of (total time taken by task - shuffle time) / (total time taken by task)
+   */
+  private static final String REAL_WORK_DONE_RATIO = "tez.shuffle-time-analyzer.real-work.done.ratio";
+  private static final float REAL_WORK_DONE_RATIO_DEFAULT = 0.5f;
+
+  /**
+   * Number of min records that needs to get in as reduce input records.
+   */
+  private static final String MIN_SHUFFLE_RECORDS = "tez.shuffle-time-analyzer.shuffle.min.records";
+  private static final long MIN_SHUFFLE_RECORDS_DEFAULT = 10000;
+
+  private static final String[] headers = { "vertexName", "taskAttemptId", "Node", "counterGroup",
+      "Comments", "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES",
+      "TotalTime", "Time_taken_to_receive_all_events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME",
+      "TimeTaken_For_Real_Task", "FIRST_EVENT_RECEIVED", "LAST_EVENT_RECEIVED",
+      "SHUFFLE_BYTES_DISK_DIRECT" };
+
+  private final CSVResult csvResult = new CSVResult(headers);
+
+  private final Configuration config;
+
+  private final float realWorkDoneRatio;
+  private final long minShuffleRecords;
+
+
+  public ShuffleTimeAnalyzer(Configuration config) {
+    this.config = config;
+
+    realWorkDoneRatio = config.getFloat
+        (REAL_WORK_DONE_RATIO, REAL_WORK_DONE_RATIO_DEFAULT);
+    minShuffleRecords = config.getLong(MIN_SHUFFLE_RECORDS, MIN_SHUFFLE_RECORDS_DEFAULT);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+        //counter_group (basically source) --> counter
+        Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter
+            .REDUCE_INPUT_GROUPS.toString());
+        Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter
+            .REDUCE_INPUT_RECORDS.toString());
+
+        if (reduceInputGroups == null) {
+          continue;
+        }
+
+        for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+          String counterGroupName = entry.getKey();
+          long reduceInputGroupsVal = entry.getValue().getValue();
+          long reduceInputRecordsVal = (reduceInputRecords.get(counterGroupName) != null) ?
+          reduceInputRecords.get(counterGroupName).getValue() : 0;
+
+          if (reduceInputRecordsVal <= 0) {
+            continue;
+          }
+          float ratio = (reduceInputGroupsVal * 1.0f / reduceInputRecordsVal);
+
+          if (ratio > 0 && reduceInputRecordsVal > minShuffleRecords) {
+            List<String> result = Lists.newLinkedList();
+            result.add(vertexInfo.getVertexName());
+            result.add(attemptInfo.getTaskAttemptId());
+            result.add(attemptInfo.getNodeId());
+            result.add(counterGroupName);
+
+            //Real work done in the task
+            String comments = "";
+            String mergePhaseTime = getCounterValue(TaskCounter.MERGE_PHASE_TIME,
+                counterGroupName, attemptInfo);
+            String timeTakenForRealWork = "";
+            if (!Strings.isNullOrEmpty(mergePhaseTime)) {
+              long realWorkDone = attemptInfo.getTimeTaken() - Long.parseLong(mergePhaseTime);
+
+              if ((realWorkDone * 1.0f / attemptInfo.getTimeTaken()) < realWorkDoneRatio) {
+                comments = "Time taken in shuffle is more than the actual work being done in task. "
+                    + " Check if source/destination machine is a slow node. Check if merge phase "
+                    + "time is more to understand disk bottlenecks in this node.  Check for skew";
+              }
+
+              timeTakenForRealWork = Long.toString(realWorkDone);
+            }
+            result.add(comments);
+
+            result.add(reduceInputGroupsVal + "");
+            result.add(reduceInputRecordsVal + "");
+            result.add("" + (1.0f * reduceInputGroupsVal / reduceInputRecordsVal));
+            result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES, counterGroupName, attemptInfo));
+
+            result.add(Long.toString(attemptInfo.getTimeTaken()));
+
+            //Total time taken for receiving all events from source tasks
+            result.add(getOverheadFromSourceTasks(counterGroupName, attemptInfo));
+            result.add(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName, attemptInfo));
+            result.add(getCounterValue(TaskCounter.SHUFFLE_PHASE_TIME, counterGroupName, attemptInfo));
+
+            result.add(timeTakenForRealWork);
+
+            result.add(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, counterGroupName, attemptInfo));
+            result.add(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, counterGroupName, attemptInfo));
+            result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT, counterGroupName, attemptInfo));
+
+            csvResult.addRecord(result.toArray(new String[result.size()]));
+          }
+        }
+      }
+    }
+
+  }
+
+  /**
+   * Time taken to receive all events from source tasks
+   *
+   * @param counterGroupName
+   * @param attemptInfo
+   * @return String
+   */
+  private String getOverheadFromSourceTasks(String counterGroupName, TaskAttemptInfo attemptInfo) {
+    String firstEventReceived = getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED,
+        counterGroupName, attemptInfo);
+    String lastEventReceived = getCounterValue(TaskCounter.LAST_EVENT_RECEIVED,
+        counterGroupName, attemptInfo);
+
+    if (!Strings.isNullOrEmpty(firstEventReceived) && !Strings.isNullOrEmpty(lastEventReceived)) {
+      return Long.toString(Long.parseLong(lastEventReceived) - Long.parseLong(firstEventReceived));
+    } else {
+      return "";
+    }
+  }
+
+  private String getCounterValue(TaskCounter counter, String counterGroupName,
+      TaskAttemptInfo attemptInfo) {
+    Map<String, TezCounter> tezCounterMap = attemptInfo.getCounter(counter.toString());
+    if (tezCounterMap != null) {
+      for (Map.Entry<String, TezCounter> entry : tezCounterMap.entrySet()) {
+        String groupName = entry.getKey();
+        long val = entry.getValue().getValue();
+        if (groupName.equals(counterGroupName)) {
+          return Long.toString(val);
+        }
+      }
+    }
+    return "";
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Shuffle time analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze the time taken for shuffle, merge "
+        + "and the real work done in the task";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    ShuffleTimeAnalyzer analyzer = new ShuffleTimeAnalyzer(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
new file mode 100644
index 0000000..067d871
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * <p/>
+ * Identify the skew (RECORD_INPUT_GROUPS / REDUCE_INPUT_RECORDS) ratio for all task attempts
+ * and report if they are below a certain threshold.
+ * <p/>
+ * <p/>
+ * - Case 1: Ratio of (reduce_input_groups / reduce_input_records) < 0.2  && SHUFFLE_BYTES > 1 GB
+ * per task attempt from a source. This means couple of keys having too many records. Either
+ * partitioning is wrong, or we need to increase memory limit for this vertex.
+ * <p/>
+ * - Case 2: Ratio of (reduce_input_groups / reduce_input_records) > 0.6 & Number of reduce input
+ * records in task attempt is closer to say 60% of overall number of records
+ * in vertex level & numTasks in vertex is greater than 1.  This might have any number of reducer
+ * groups.  This means that, partitioning is wrong (can also consider reducing number of tasks
+ * for that vertex). In some cases, too many reducers are launched and this can help find those.
+ * <p/>
+ * - Case 3: Ratio of (reduce_input_groups / reduce_input_records) is between 0.2 & 0.6 per task
+ * attempt & numTasks is greater than 1 & SHUFFLE_BYTES > 1 GB per task attempt from a
+ * source. This means, may be consider increasing parallelism based on the task attempt runtime.
+ * <p/>
+ */
+public class SkewAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+  /**
+   * Amount of bytes that was sent as shuffle bytes from source. If it is below this threshold,
+   * it would not be considered for analysis.
+   */
+  private static final String SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE = "tez.skew-analyzer.shuffle"
+      + ".bytes.per.source";
+  private static final long SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT = 900 * 1024 * 1024l;
+
+  //Min reducer input group : reducer keys ratio for computation
+  private static final String ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO = "tez.skew-analyzer.shuffle.key"
+      + ".group.min.ratio";
+  private static final float ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO_DEFAULT = 0.2f;
+
+  //Max reducer input group : reducer keys ratio for computation
+  private static final String ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO = "tez.skew-analyzer.shuffle.key"
+      + ".group.max.ratio";
+  private static final float ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT = 0.4f;
+
+
+
+  private static final String[] headers = { "vertexName", "taskAttemptId", "counterGroup", "node",
+      "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES", "timeTaken",
+      "observation" };
+
+  private final CSVResult csvResult = new CSVResult(headers);
+
+  private final Configuration config;
+
+  private final float minRatio;
+  private final float maxRatio;
+  private final long maxShuffleBytesPerSource;
+
+  public SkewAnalyzer(Configuration config) {
+    this.config = config;
+    maxRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO,
+        ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT);
+    minRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO,
+        ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO_DEFAULT);
+    maxShuffleBytesPerSource = config.getLong(SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE,
+        SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    Preconditions.checkArgument(dagInfo != null, "DAG can't be null");
+    analyzeReducers(dagInfo);
+  }
+
+  private void analyzeReducers(DagInfo dagInfo) {
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+        analyzeGroupSkewPerSource(attemptInfo);
+        analyzeRecordSkewPerSource(attemptInfo);
+        analyzeForParallelism(attemptInfo);
+      }
+    }
+  }
+
+  /**
+   * Analyze scenario where couple keys are having too many records per source
+   *
+   * @param attemptInfo
+   */
+  private void analyzeGroupSkewPerSource(TaskAttemptInfo attemptInfo) {
+
+    //counter_group (basically source) --> counter
+    Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_GROUPS.toString());
+    Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_RECORDS.toString());
+    Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString());
+
+
+    //tez counter for every source
+    for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+      if (entry.getKey().equals(TaskCounter.class.getName())) {
+        //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up
+        // getting TaskCounter details as well.
+        continue;
+      }
+
+      String counterGroup = entry.getKey();
+      long inputGroupsCount = entry.getValue().getValue();
+      long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords
+          .get(counterGroup).getValue() : 0;
+      long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ? shuffleBytes.get
+          (counterGroup).getValue() : 0;
+
+      float ratio = (inputGroupsCount * 1.0f / inputRecordsCount);
+
+      //Case 1: Couple of keys having too many records per source.
+      if (shuffleBytesPerSource > maxShuffleBytesPerSource) {
+        if (ratio < minRatio) {
+          List<String> result = Lists.newLinkedList();
+          result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
+          result.add(attemptInfo.getTaskAttemptId());
+          result.add(counterGroup);
+          result.add(attemptInfo.getNodeId());
+          result.add(inputGroupsCount + "");
+          result.add(inputRecordsCount + "");
+          result.add(ratio + "");
+          result.add(shuffleBytesPerSource + "");
+          result.add(attemptInfo.getTimeTaken() + "");
+          result.add("Please check partitioning. Otherwise consider increasing memLimit");
+
+          csvResult.addRecord(result.toArray(new String[result.size()]));
+        }
+      }
+    }
+  }
+
+  /**
+   * Analyze scenario where one task is getting > 60% of the vertex level records
+   *
+   * @param attemptInfo
+   */
+  private void analyzeRecordSkewPerSource(TaskAttemptInfo attemptInfo) {
+
+    Map<String, TezCounter> vertexLevelReduceInputRecords =
+        attemptInfo.getTaskInfo().getVertexInfo()
+            .getCounter(TaskCounter.REDUCE_INPUT_RECORDS.toString());
+
+    int vertexNumTasks = attemptInfo.getTaskInfo().getVertexInfo().getNumTasks();
+
+    //counter_group (basically source) --> counter
+    Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_GROUPS.toString());
+    Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_RECORDS.toString());
+    Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString());
+
+
+    //tez counter for every source
+    for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+      if (entry.getKey().equals(TaskCounter.class.getName())) {
+        //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up
+        // getting TaskCounter details as well.
+        continue;
+      }
+
+      String counterGroup = entry.getKey();
+      long inputGroupsCount = entry.getValue().getValue();
+      long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords
+          .get(counterGroup).getValue() : 0;
+      long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ?shuffleBytes.get
+          (counterGroup).getValue() : 0;
+      long vertexLevelInputRecordsCount = (vertexLevelReduceInputRecords.get(counterGroup) !=
+          null) ?
+          vertexLevelReduceInputRecords.get(counterGroup).getValue() : 0;
+
+      float ratio = (inputRecordsCount * 1.0f / vertexLevelInputRecordsCount);
+
+      if (vertexNumTasks > 1) {
+        if (ratio > maxRatio) {
+          //input records > 60% of vertex level record count
+          if (inputRecordsCount > (vertexLevelInputRecordsCount * 0.60)) {
+            List<String> result = Lists.newLinkedList();
+            result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
+            result.add(attemptInfo.getTaskAttemptId());
+            result.add(counterGroup);
+            result.add(attemptInfo.getNodeId());
+            result.add(inputGroupsCount + "");
+            result.add(inputRecordsCount + "");
+            result.add(ratio + "");
+            result.add(shuffleBytesPerSource + "");
+            result.add(attemptInfo.getTimeTaken() + "");
+            result.add("Some task attempts are getting > 60% of reduce input records. "
+                + "Consider adjusting parallelism & check partition logic");
+
+            csvResult.addRecord(result.toArray(new String[result.size()]));
+
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Analyze scenario where a vertex would need to increase parallelism
+   *
+   * @param attemptInfo
+   */
+  private void analyzeForParallelism(TaskAttemptInfo attemptInfo) {
+
+    //counter_group (basically source) --> counter
+    Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_GROUPS.toString());
+    Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_RECORDS.toString());
+    Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString());
+
+    //tez counter for every source
+    for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+      if (entry.getKey().equals(TaskCounter.class.getName())) {
+        //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up
+        // getting TaskCounter details as well.
+        continue;
+      }
+
+      String counterGroup = entry.getKey();
+      long inputGroupsCount = entry.getValue().getValue();
+      long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords
+          .get(counterGroup).getValue() : 0;
+      long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ? shuffleBytes.get
+          (counterGroup).getValue() : 0;
+
+      float ratio = (inputGroupsCount * 1.0f / inputRecordsCount);
+
+      //Case 3: Shuffle_Bytes > 1 GB.  Ratio between 0.2 & < 0.6. Consider increasing
+      // parallelism based on task runtime.
+      if (shuffleBytesPerSource > SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT) {
+        if (ratio > minRatio && ratio < maxRatio) {
+          //couple of keys have too many records. Classic case of partition issue.
+          List<String> result = Lists.newLinkedList();
+          result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
+          result.add(attemptInfo.getTaskAttemptId());
+          result.add(counterGroup);
+          result.add(attemptInfo.getNodeId());
+          result.add(inputGroupsCount + "");
+          result.add(inputRecordsCount + "");
+          result.add(ratio + "");
+          result.add(shuffleBytesPerSource + "");
+          result.add(attemptInfo.getTimeTaken() + "");
+          result.add("Consider increasing parallelism.");
+
+          csvResult.addRecord(result.toArray(new String[result.size()]));
+        }
+      }
+    }
+
+
+  }
+
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Skew Analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyzer reducer skews by mining reducer task counters";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return null;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    SkewAnalyzer analyzer = new SkewAnalyzer(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
new file mode 100644
index 0000000..a810a8a
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * This will provide the set of nodes participated in the DAG in descending order of task execution
+ * time.
+ * <p/>
+ * Combine it with other counters to understand slow nodes better.
+ */
+public class SlowNodeAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+  private static final Log LOG = LogFactory.getLog(SlowNodeAnalyzer.class);
+
+  private static final String[] headers = { "nodeName", "noOfTasksExecuted", "noOfKilledTasks",
+      "noOfFailedTasks", "avgSucceededTaskExecutionTime", "avgKilledTaskExecutionTime",
+      "avgFailedTaskExecutionTime", "avgHDFSBytesRead", "avgHDFSBytesWritten",
+      "avgFileBytesRead", "avgFileBytesWritten", "avgGCTimeMillis", "avgCPUTimeMillis" };
+
+  private final CSVResult csvResult = new CSVResult(headers);
+
+  private final Configuration config;
+
+  public SlowNodeAnalyzer(Configuration config) {
+    this.config = config;
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    Multimap<String, TaskAttemptInfo> nodeDetails = dagInfo.getNodeDetails();
+    for (String nodeName : nodeDetails.keySet()) {
+      List<String> record = Lists.newLinkedList();
+
+      Collection<TaskAttemptInfo> taskAttemptInfos = nodeDetails.get(nodeName);
+
+      record.add(nodeName);
+      record.add(taskAttemptInfos.size() + "");
+      record.add(getNumberOfTasks(taskAttemptInfos, TaskAttemptState.KILLED) + "");
+      record.add(getNumberOfTasks(taskAttemptInfos, TaskAttemptState.FAILED) + "");
+
+      Iterable<TaskAttemptInfo> succeedTasks = getFilteredTaskAttempts(taskAttemptInfos,
+          TaskAttemptState.SUCCEEDED);
+      record.add(getAvgTaskExecutionTime(succeedTasks) + "");
+
+      Iterable<TaskAttemptInfo> killedTasks = getFilteredTaskAttempts(taskAttemptInfos,
+          TaskAttemptState.KILLED);
+      record.add(getAvgTaskExecutionTime(killedTasks) + "");
+
+      Iterable<TaskAttemptInfo> failedTasks = getFilteredTaskAttempts(taskAttemptInfos,
+          TaskAttemptState.FAILED);
+      record.add(getAvgTaskExecutionTime(failedTasks) + "");
+
+      record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+          .getName(), FileSystemCounter.HDFS_BYTES_READ.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+          .getName(), FileSystemCounter.HDFS_BYTES_WRITTEN.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+          .getName(), FileSystemCounter.FILE_BYTES_READ.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+          .getName(), FileSystemCounter.FILE_BYTES_WRITTEN.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, TaskCounter.class
+          .getName(), TaskCounter.GC_TIME_MILLIS.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, TaskCounter.class
+              .getName(), TaskCounter.CPU_MILLISECONDS.name()) + "");
+
+          csvResult.addRecord(record.toArray(new String[record.size()]));
+    }
+  }
+
+  private Iterable<TaskAttemptInfo> getFilteredTaskAttempts(Collection<TaskAttemptInfo>
+      taskAttemptInfos, final TaskAttemptState status) {
+    return Iterables.filter(taskAttemptInfos, new
+        Predicate<TaskAttemptInfo>() {
+          @Override public boolean apply(TaskAttemptInfo input) {
+            return input.getStatus().equalsIgnoreCase(status.toString());
+          }
+        });
+  }
+
+  private float getAvgTaskExecutionTime(Iterable<TaskAttemptInfo> taskAttemptInfos) {
+    long totalTime = 0;
+    int size = 0;
+    for (TaskAttemptInfo attemptInfo : taskAttemptInfos) {
+      totalTime += attemptInfo.getTimeTaken();
+      size++;
+    }
+    return (size > 0) ? (totalTime * 1.0f / size) : 0;
+  }
+
+  private int getNumberOfTasks(Collection<TaskAttemptInfo> taskAttemptInfos, TaskAttemptState
+      status) {
+    int tasks = 0;
+    for (TaskAttemptInfo attemptInfo : taskAttemptInfos) {
+      if (attemptInfo.getStatus().equalsIgnoreCase(status.toString())) {
+        tasks++;
+      }
+    }
+    return tasks;
+  }
+
+  private float getAvgCounter(Collection<TaskAttemptInfo> taskAttemptInfos, String
+      counterGroupName, String counterName) {
+    long total = 0;
+    int taskCount = 0;
+    for (TaskAttemptInfo attemptInfo : taskAttemptInfos) {
+      TezCounters tezCounters = attemptInfo.getTezCounters();
+      TezCounter counter = tezCounters.findCounter(counterGroupName, counterName);
+      if (counter != null) {
+        total += counter.getValue();
+        taskCount++;
+      } else {
+        LOG.info("Could not find counterGroupName=" + counterGroupName + ", counter=" +
+            counterName + " in " + attemptInfo);
+      }
+    }
+    return (taskCount > 0) ? (total * 1.0f / taskCount) : 0;
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Slow Node Analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Analyze node details for the DAG.").append("\n");
+    sb.append("This could be used to find out the set of nodes where the tasks are taking more "
+        + "time on average.").append("\n");
+    sb.append("This could be used to find out the set of nodes where the tasks are taking more "
+        + "time on average and to understand whether too many tasks got scheduled on a node.")
+        .append("\n");
+    sb.append("One needs to combine the task execution time with other metrics like bytes "
+        + "read/written etc to get better idea of bad nodes. In order to understand the slow "
+        + "nodes due to network, it might be worthwhile to consider the shuffle performance "
+        + "analyzer tool in tez-tools").append("\n");
+    return sb.toString();
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    SlowNodeAnalyzer analyzer = new SlowNodeAnalyzer(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
new file mode 100644
index 0000000..d2474ad
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+
+/**
+ * Analyze slow tasks in the DAG. Top 100 tasks are listed by default.
+ *
+ * <p/>
+ * //TODO: We do not get counters for killed task attempts yet.
+ */
+public class SlowTaskIdentifier extends TezAnalyzerBase implements Analyzer {
+
+  private static final String[] headers = { "vertexName", "taskAttemptId",
+      "Node", "taskDuration", "Status", "diagnostics",
+      "NoOfInputs" };
+
+  private final CSVResult csvResult;
+
+  private static final String NO_OF_TASKS = "tez.slow-task-analyzer.task.count";
+  private static final int NO_OF_TASKS_DEFAULT = 100;
+
+  private final Configuration config;
+
+  public SlowTaskIdentifier(Configuration config) {
+    this.config = config;
+    this.csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    List<TaskAttemptInfo> taskAttempts = Lists.newArrayList();
+    for(VertexInfo vertexInfo : dagInfo.getVertices()) {
+      taskAttempts.addAll(vertexInfo.getTaskAttempts());
+    }
+
+    //sort them by runtime in descending order
+    Collections.sort(taskAttempts, new Comparator<TaskAttemptInfo>() {
+      @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+        return (o1.getTimeTaken() > o2.getTimeTaken()) ? -1 :
+            ((o1.getTimeTaken() == o2.getTimeTaken()) ?
+                0 : 1);
+      }
+    });
+
+    int limit = Math.min(taskAttempts.size(),
+        Math.max(0, config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT)));
+
+    if (limit == 0) {
+      return;
+    }
+
+    for (int i = 0; i < limit - 1; i++) {
+      List<String> record = Lists.newLinkedList();
+      record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getVertexName());
+      record.add(taskAttempts.get(i).getTaskAttemptId());
+      record.add(taskAttempts.get(i).getContainer().getHost());
+      record.add(taskAttempts.get(i).getTimeTaken() + "");
+      record.add(taskAttempts.get(i).getStatus());
+      record.add(taskAttempts.get(i).getDiagnostics());
+      record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getInputEdges().size() + "");
+
+      csvResult.addRecord(record.toArray(new String[record.size()]));
+    }
+
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Slow Task Identifier";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Identifies slow tasks in the DAG";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    SlowTaskIdentifier analyzer = new SlowTaskIdentifier(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
new file mode 100644
index 0000000..33f2421
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Identify the slowest vertex in the DAG.
+ */
+public class SlowestVertexAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+  private static final String[] headers = { "vertexName", "taskAttempts", "totalTime",
+      "shuffleTime", "shuffleTime_Max", "LastEventReceived", "LastEventReceivedFrom",
+      "75thPercentile", "95thPercentile", "98thPercentile", "Median",
+      "observation", "comments" };
+
+  private final CSVResult csvResult = new CSVResult(headers);
+
+  private final Configuration config;
+  private final MetricRegistry metrics = new MetricRegistry();
+  private Histogram taskAttemptRuntimeHistorgram;
+
+  private final static String MAX_VERTEX_RUNTIME = "tez.slowest-vertex-analyzer.max.vertex.runtime";
+  private final static long MAX_VERTEX_RUNTIME_DEFAULT = 100000;
+
+  private final long vertexRuntimeThreshold;
+
+  public SlowestVertexAnalyzer(Configuration config) {
+    this.config = config;
+    this.vertexRuntimeThreshold = Math.max(1, config.getLong(MAX_VERTEX_RUNTIME,
+        MAX_VERTEX_RUNTIME_DEFAULT));
+
+  }
+
+  private long getTaskRuntime(VertexInfo vertexInfo) {
+    TaskInfo firstTaskToStart = vertexInfo.getFirstTaskToStart();
+    TaskInfo lastTaskToFinish = vertexInfo.getLastTaskToFinish();
+
+    DagInfo dagInfo = vertexInfo.getDagInfo();
+    long totalTime = ((lastTaskToFinish == null) ?
+        dagInfo.getFinishTime() : lastTaskToFinish.getFinishTime()) -
+        ((firstTaskToStart == null) ? dagInfo.getStartTime() : firstTaskToStart.getStartTime());
+    return totalTime;
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      String vertexName = vertexInfo.getVertexName();
+      if (vertexInfo.getFirstTaskToStart()  == null || vertexInfo.getLastTaskToFinish() == null) {
+        continue;
+      }
+
+      long totalTime = getTaskRuntime(vertexInfo);
+
+      long slowestLastEventTime = Long.MIN_VALUE;
+      String maxSourceName = "";
+      taskAttemptRuntimeHistorgram = metrics.histogram(vertexName);
+
+
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+
+        taskAttemptRuntimeHistorgram.update(attemptInfo.getTimeTaken());
+
+        //Get the last event received from the incoming vertices
+        Map<String, TezCounter> lastEventReceivedMap = attemptInfo.getCounter(
+            TaskCounter.LAST_EVENT_RECEIVED.toString());
+
+        for (Map.Entry<String, TezCounter> entry : lastEventReceivedMap.entrySet()) {
+          if (entry.getKey().equals(TaskCounter.class.getName())) {
+            //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up
+            // getting TaskCounter details as well.
+            continue;
+          }
+          //Find the slowest last event received
+          if (entry.getValue().getValue() > slowestLastEventTime) {
+            slowestLastEventTime = entry.getValue().getValue();
+            maxSourceName = entry.getKey();
+          }
+        }
+      }
+
+      long shuffleMax = Long.MIN_VALUE;
+      String shuffleMaxSource = "";
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+        //Get the last event received from the incoming vertices
+        Map<String, TezCounter> lastEventReceivedMap = attemptInfo.getCounter(
+            TaskCounter.SHUFFLE_PHASE_TIME.toString());
+
+        for (Map.Entry<String, TezCounter> entry : lastEventReceivedMap.entrySet()) {
+          if (entry.getKey().equals(TaskCounter.class.getName())) {
+            //ignore. TODO: hack for taskcounter issue
+            continue;
+          }
+          //Find the slowest last event received
+          if (entry.getValue().getValue() > shuffleMax) {
+            shuffleMax = entry.getValue().getValue();
+            shuffleMaxSource = entry.getKey();
+          }
+        }
+      }
+
+      String comments = "";
+
+      List<String> record = Lists.newLinkedList();
+      record.add(vertexName);
+      record.add(vertexInfo.getTaskAttempts().size() + "");
+      record.add(totalTime + "");
+      record.add(Math.max(0, shuffleMax) + "");
+      record.add(shuffleMaxSource);
+      record.add(Math.max(0, slowestLastEventTime) + "");
+      record.add(maxSourceName);
+      //Finding out real_work done at vertex level might be meaningless (as it is quite posisble
+      // that it went to starvation).
+
+      StringBuilder sb = new StringBuilder();
+      double percentile75 = taskAttemptRuntimeHistorgram.getSnapshot().get75thPercentile();
+      double percentile95 = taskAttemptRuntimeHistorgram.getSnapshot().get95thPercentile();
+      double percentile98 = taskAttemptRuntimeHistorgram.getSnapshot().get98thPercentile();
+      double percentile99 = taskAttemptRuntimeHistorgram.getSnapshot().get99thPercentile();
+      double medianAttemptRuntime = taskAttemptRuntimeHistorgram.getSnapshot().getMedian();
+
+      record.add("75th=" + percentile75);
+      record.add("95th=" + percentile95);
+      record.add("98th=" + percentile98);
+      record.add("median=" + medianAttemptRuntime);
+
+      if (percentile75 / percentile99 < 0.5) {
+        //looks like some straggler task is there.
+        sb.append("Looks like some straggler task is there");
+      }
+
+      record.add(sb.toString());
+
+      if (totalTime > 0 && vertexInfo.getTaskAttempts().size() > 0) {
+        if ((shuffleMax * 1.0f / totalTime) > 0.5) {
+          if ((slowestLastEventTime * 1.0f / totalTime) > 0.5) {
+            comments = "This vertex is slow due to its dependency on parent. Got a lot delayed last"
+                + " event received";
+          } else {
+            comments =
+                "Spending too much time on shuffle. Check shuffle bytes from previous vertex";
+          }
+        } else {
+          if (totalTime > vertexRuntimeThreshold) { //greater than X seconds.
+            comments = "Concentrate on this vertex (totalTime > " + vertexRuntimeThreshold
+                + " seconds)";
+          }
+        }
+      }
+
+      record.add(comments);
+      csvResult.addRecord(record.toArray(new String[record.size()]));
+    }
+  }
+
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "SlowVertexAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Identify the slowest vertex in the DAG, which needs to be looked into first";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    SlowestVertexAnalyzer analyzer = new SlowestVertexAnalyzer(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
new file mode 100644
index 0000000..d69ca23
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Find out tasks which have more than 1 spill (ADDITIONAL_SPILL_COUNT).
+ * <p/>
+ * Accompany this with OUTPUT_BYTES (> 1 GB data written)
+ */
+public class SpillAnalyzerImpl extends TezAnalyzerBase implements Analyzer {
+
+  private static final String[] headers = { "vertexName", "taskAttemptId",
+      "Node", "counterGroupName",
+      "spillCount", "taskDuration",
+      "OUTPUT_BYTES", "OUTPUT_RECORDS",
+      "SPILLED_RECORDS", "Recommendation" };
+
+  private final CSVResult csvResult;
+
+  /**
+   * Minimum output bytes that should be chunrned out by a task
+   */
+  private static final String OUTPUT_BYTES_THRESHOLD = "tez.spill-analyzer.min.output.bytes"
+      + ".threshold";
+  private static long OUTPUT_BYTES_THRESHOLD_DEFAULT = 1 * 1024 * 1024 * 1024l;
+
+  private final long minOutputBytesPerTask;
+
+  private final Configuration config;
+
+  public SpillAnalyzerImpl(Configuration config) {
+    this.config = config;
+    minOutputBytesPerTask = Math.max(0, config.getLong(OUTPUT_BYTES_THRESHOLD,
+        OUTPUT_BYTES_THRESHOLD_DEFAULT));
+    this.csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      String vertexName = vertexInfo.getVertexName();
+
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+        //Get ADDITIONAL_SPILL_COUNT, OUTPUT_BYTES for every source
+        Map<String, TezCounter> spillCountMap =
+            attemptInfo.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name());
+        Map<String, TezCounter> spilledRecordsMap =
+            attemptInfo.getCounter(TaskCounter.SPILLED_RECORDS.name());
+        Map<String, TezCounter> outputRecordsMap =
+            attemptInfo.getCounter(TaskCounter.OUTPUT_RECORDS.name());
+
+        Map<String, TezCounter> outputBytesMap =
+            attemptInfo.getCounter(TaskCounter.OUTPUT_BYTES.name());
+
+        for (Map.Entry<String, TezCounter> entry : spillCountMap.entrySet()) {
+          String source = entry.getKey();
+          long spillCount = entry.getValue().getValue();
+          long outBytes = outputBytesMap.get(source).getValue();
+
+          long outputRecords = outputRecordsMap.get(source).getValue();
+          long spilledRecords = spilledRecordsMap.get(source).getValue();
+
+          if (spillCount > 1 && outBytes > minOutputBytesPerTask) {
+            List<String> recorList = Lists.newLinkedList();
+            recorList.add(vertexName);
+            recorList.add(attemptInfo.getTaskAttemptId());
+            recorList.add(attemptInfo.getNodeId());
+            recorList.add(source);
+            recorList.add(spillCount + "");
+            recorList.add(attemptInfo.getTimeTaken() + "");
+            recorList.add(outBytes + "");
+            recorList.add(outputRecords + "");
+            recorList.add(spilledRecords + "");
+            recorList.add("Consider increasing " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB
+                + ". Try increasing container size.");
+
+            csvResult.addRecord(recorList.toArray(new String[recorList.size()]));
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "SpillAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze spill details in the task";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    SpillAnalyzerImpl analyzer = new SpillAnalyzerImpl(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
new file mode 100644
index 0000000..070294f
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.TreeMultiset;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Analyze concurrent tasks running in every vertex at regular intervals.
+ */
+public class TaskConcurrencyAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+  private static final String[] headers = { "time", "vertexName", "concurrentTasksRunning" };
+
+  private final CSVResult csvResult;
+  private final Configuration config;
+
+  public TaskConcurrencyAnalyzer(Configuration conf) {
+    this.csvResult = new CSVResult(headers);
+    this.config = conf;
+  }
+
+  private enum EventType {START, FINISH}
+
+  static class TimeInfo {
+    EventType eventType;
+    long timestamp;
+    int concurrentTasks;
+
+    public TimeInfo(EventType eventType, long timestamp) {
+      this.eventType = eventType;
+      this.timestamp = timestamp;
+    }
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+
+    //For each vertex find the concurrent tasks running at any point
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      List<TaskAttemptInfo> taskAttempts =
+          Lists.newLinkedList(vertexInfo.getTaskAttempts(true, null));
+
+      String vertexName = vertexInfo.getVertexName();
+
+      /**
+       * - Get sorted multi-set of timestamps (S1, S2,...E1, E2..). Possible to have multiple
+       * tasks starting/ending at same time.
+       * - Walk through the set
+       * - Increment concurrent tasks when start event is encountered
+       * - Decrement concurrent tasks when start event is encountered
+       */
+      TreeMultiset<TimeInfo> timeInfoSet = TreeMultiset.create(new Comparator<TimeInfo>() {
+        @Override public int compare(TimeInfo o1, TimeInfo o2) {
+          return (o1.timestamp < o2.timestamp) ? -1 :
+              ((o1.timestamp == o2.timestamp) ? 0 : 1);
+        }
+      });
+
+      for (TaskAttemptInfo attemptInfo : taskAttempts) {
+        TimeInfo startTimeInfo = new TimeInfo(EventType.START, attemptInfo.getStartTime());
+        TimeInfo stopTimeInfo = new TimeInfo(EventType.FINISH, attemptInfo.getFinishTime());
+
+        timeInfoSet.add(startTimeInfo);
+        timeInfoSet.add(stopTimeInfo);
+      }
+
+      //Compute concurrent tasks in the list now.
+      int concurrentTasks = 0;
+      for(TimeInfo timeInfo : timeInfoSet.elementSet()) {
+        switch (timeInfo.eventType) {
+        case START:
+          concurrentTasks += timeInfoSet.count(timeInfo);
+          break;
+        case FINISH:
+          concurrentTasks -= timeInfoSet.count(timeInfo);
+          break;
+        default:
+          break;
+        }
+        timeInfo.concurrentTasks = concurrentTasks;
+        addToResult(vertexName, timeInfo.timestamp, timeInfo.concurrentTasks);
+      }
+    }
+  }
+
+  private void addToResult(String vertexName, long currentTime, int concurrentTasks) {
+    String[] record = { currentTime + "", vertexName, concurrentTasks + "" };
+    csvResult.addRecord(record);
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "TaskConcurrencyAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze how many tasks were running in every vertex at given point in time. This "
+        + "would be helpful in understanding whether any starvation was there or not.";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    TaskConcurrencyAnalyzer analyzer = new TaskConcurrencyAnalyzer(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
new file mode 100644
index 0000000..73e731a
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import java.io.File;
+import java.util.Iterator;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Tool;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.Result;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.ATSImportTool;
+import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.SimpleHistoryParser;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+
+import com.google.common.base.Preconditions;
+
+public abstract class TezAnalyzerBase extends Configured implements Tool, Analyzer {
+
+  
+  private static final String EVENT_FILE_NAME = "eventFileName";
+  private static final String OUTPUT_DIR = "outputDir";
+  private static final String SAVE_RESULTS = "saveResults";
+  private static final String DAG_ID = "dagId";
+  private static final String FROM_SIMPLE_HISTORY = "fromSimpleHistory";
+  private static final String HELP = "help";
+
+  private static final int SEPARATOR_WIDTH = 80;
+  private static final int MIN_COL_WIDTH = 12;
+
+  private String outputDir;
+  private boolean saveResults = false;
+  
+  @SuppressWarnings("static-access")
+  private static Options buildOptions() {
+    Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID)
+        .withDescription("DagId that needs to be analyzed").hasArg().isRequired(true).create();
+
+    Option outputDirOption = OptionBuilder.withArgName(OUTPUT_DIR).withLongOpt(OUTPUT_DIR)
+        .withDescription("Directory to write outputs to.").hasArg().isRequired(false).create();
+
+    Option saveResults = OptionBuilder.withArgName(SAVE_RESULTS).withLongOpt(SAVE_RESULTS)
+        .withDescription("Saves results to output directory (optional)")
+        .hasArg(false).isRequired(false).create();
+
+    Option eventFileNameOption = OptionBuilder.withArgName(EVENT_FILE_NAME).withLongOpt
+        (EVENT_FILE_NAME)
+        .withDescription("File with event data for the DAG").hasArg()
+        .isRequired(false).create();
+    
+    Option fromSimpleHistoryOption = OptionBuilder.withArgName(FROM_SIMPLE_HISTORY).withLongOpt
+        (FROM_SIMPLE_HISTORY)
+        .withDescription("Event data from Simple History logging. Must also specify event file")
+        .isRequired(false).create();
+    
+    Option help = OptionBuilder.withArgName(HELP).withLongOpt
+        (HELP)
+        .withDescription("print help")
+        .isRequired(false).create();
+
+    Options opts = new Options();
+    opts.addOption(dagIdOption);
+    opts.addOption(outputDirOption);
+    opts.addOption(saveResults);
+    opts.addOption(eventFileNameOption);
+    opts.addOption(fromSimpleHistoryOption);
+    opts.addOption(help);
+    return opts;
+  }
+  
+  protected String getOutputDir() {
+    return outputDir;
+  }
+  
+  private void printUsage() {
+    System.err.println("Analyzer base options are");
+    Options options = buildOptions();
+    for (Object obj : options.getOptions()) {
+      Option option = (Option) obj;
+      System.err.println(option.getArgName() + " : " + option.getDescription());
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    //Parse downloaded contents
+    CommandLine cmdLine = null;
+    try {
+      cmdLine = new GnuParser().parse(buildOptions(), args);
+    } catch (ParseException e) {
+      System.err.println("Invalid options on command line");
+      printUsage();
+      return -1;
+    }
+    saveResults = cmdLine.hasOption(SAVE_RESULTS);
+    
+    if(cmdLine.hasOption(HELP)) {
+      printUsage();
+      return 0;
+    }
+
+    outputDir = cmdLine.getOptionValue(OUTPUT_DIR);
+    if (outputDir == null) {
+      outputDir = System.getProperty("user.dir");
+    }
+
+    File file = null;
+    if (cmdLine.hasOption(EVENT_FILE_NAME)) {
+      file = new File(cmdLine.getOptionValue(EVENT_FILE_NAME));
+    }
+    
+    String dagId = cmdLine.getOptionValue(DAG_ID);
+    
+    DagInfo dagInfo = null;
+    
+    if (file == null) {
+      if (cmdLine.hasOption(FROM_SIMPLE_HISTORY)) {
+        System.err.println("Event file name must be specified when using simple history");
+        printUsage();
+        return -2;
+      }
+      // using ATS - try to download directly
+      String[] importArgs = { "--dagId=" + dagId, "--downloadDir=" + outputDir };
+
+      int result = ATSImportTool.process(importArgs);
+      if (result != 0) {
+        System.err.println("Error downloading data from ATS");
+        return -3;
+      }
+
+      //Parse ATS data and verify results
+      //Parse downloaded contents
+      file = new File(outputDir
+          + Path.SEPARATOR + dagId
+          + Path.SEPARATOR + dagId + ".zip");
+    }
+    
+    Preconditions.checkState(file != null);
+    if (!cmdLine.hasOption(FROM_SIMPLE_HISTORY)) {
+      ATSFileParser parser = new ATSFileParser(file);
+      dagInfo = parser.getDAGData(dagId);
+    } else {
+      SimpleHistoryParser parser = new SimpleHistoryParser(file);
+      dagInfo = parser.getDAGData(dagId);
+    }
+    Preconditions.checkState(dagInfo.getDagId().equals(dagId));
+    analyze(dagInfo);
+    Result result = getResult();
+    if (saveResults && (result instanceof CSVResult)) {
+      String fileName = outputDir + File.separator
+          + this.getClass().getName() + "_" + dagInfo.getDagId() + ".csv";
+      ((CSVResult) result).dumpToFile(fileName);
+      System.out.println("Saved results in " + fileName);
+    }
+    return 0;
+  }
+
+  public void printResults() throws TezException {
+    Result result = getResult();
+    if (result instanceof CSVResult) {
+      String[] headers = ((CSVResult) result).getHeaders();
+
+      StringBuilder formatBuilder = new StringBuilder();
+      int size = Math.max(MIN_COL_WIDTH, SEPARATOR_WIDTH / headers.length);
+      for (int i = 0; i < headers.length; i++) {
+        formatBuilder.append("%-").append(size).append("s ");
+      }
+      String format = formatBuilder.toString();
+
+      StringBuilder separator = new StringBuilder();
+      for (int i = 0; i < SEPARATOR_WIDTH; i++) {
+        separator.append("-");
+      }
+
+      System.out.println(separator);
+      System.out.println(String.format(format.toString(), (String[]) headers));
+      System.out.println(separator);
+
+      Iterator<String[]> recordsIterator = ((CSVResult) result).getRecordsIterator();
+      while (recordsIterator.hasNext()) {
+        String line = String.format(format, (String[]) recordsIterator.next());
+        System.out.println(line);
+      }
+      System.out.println(separator);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
new file mode 100644
index 0000000..06b8983
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.utils.Utils;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Identify a set of vertices which fall in the critical path in a DAG.
+ */
+public class VertexLevelCriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
+  private final Configuration config;
+
+  private static final String[] headers = { "CriticalPath", "Score" };
+
+  private final CSVResult csvResult;
+
+  private static final String DOT_FILE_DIR = "tez.critical-path.analyzer.dot.output.loc";
+  private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory
+
+  private final String dotFileLocation;
+
+  private static final String CONNECTOR = "-->";
+
+  public VertexLevelCriticalPathAnalyzer(Configuration config) {
+    this.config = config;
+    this.csvResult = new CSVResult(headers);
+    this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT);
+  }
+
+  @Override public void analyze(DagInfo dagInfo) throws TezException {
+    Map<String, Long> result = Maps.newLinkedHashMap();
+    getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result);
+
+    Map<String, Long> sortedByValues = sortByValues(result);
+    for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) {
+      List<String> record = Lists.newLinkedList();
+      record.add(entry.getKey());
+      record.add(entry.getValue() + "");
+      csvResult.addRecord(record.toArray(new String[record.size()]));
+    }
+
+    String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + ".dot";
+    try {
+      List<String> criticalVertices = null;
+      if (!sortedByValues.isEmpty()) {
+        String criticalPath = sortedByValues.keySet().iterator().next();
+        criticalVertices = getVertexNames(criticalPath);
+      } else {
+        criticalVertices = Lists.newLinkedList();
+      }
+      Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices);
+    } catch (IOException e) {
+      throw new TezException(e);
+    }
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "CriticalPathAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze vertex level critical path of the DAG";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  private static Map<String, Long> sortByValues(Map<String, Long> result) {
+    //Sort result by time in reverse order
+    final Ordering<String> reversValueOrdering =
+        Ordering.natural().reverse().nullsLast().onResultOf(Functions.forMap(result, null));
+    Map<String, Long> orderedMap = ImmutableSortedMap.copyOf(result, reversValueOrdering);
+    return orderedMap;
+  }
+
+  private static void getCriticalPath(String predecessor, VertexInfo dest, long time,
+      Map<String, Long> result) {
+    String destVertexName = (dest != null) ? (dest.getVertexName()) : "";
+
+    if (dest != null) {
+      time += dest.getTimeTaken();
+      predecessor += destVertexName + CONNECTOR;
+
+      for (VertexInfo incomingVertex : dest.getInputVertices()) {
+        getCriticalPath(predecessor, incomingVertex, time, result);
+      }
+
+      result.put(predecessor, time);
+    }
+  }
+
+  private static List<String> getVertexNames(String criticalPath) {
+    if (Strings.isNullOrEmpty(criticalPath)) {
+      return Lists.newLinkedList();
+    }
+    return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split
+        (criticalPath));
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    VertexLevelCriticalPathAnalyzer analyzer = new VertexLevelCriticalPathAnalyzer(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}