You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/12 02:31:51 UTC

tez git commit: TEZ-2810. Support for showing allocation delays due to internal preemption (bikas)

Repository: tez
Updated Branches:
  refs/heads/master e1fc9cc4f -> b6f15dcdc


TEZ-2810. Support for showing allocation delays due to internal preemption (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b6f15dcd
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b6f15dcd
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b6f15dcd

Branch: refs/heads/master
Commit: b6f15dcdcf3cae841c361983f32229858ff79f42
Parents: e1fc9cc
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Sep 11 17:31:34 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Sep 11 17:31:34 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../history/parser/datamodel/VertexInfo.java    |   1 +
 .../analyzer/plugins/CriticalPathAnalyzer.java  | 176 ++++++++++++-------
 .../org/apache/tez/analyzer/utils/SVGUtils.java |  48 +++--
 .../org/apache/tez/analyzer/TestAnalyzer.java   |  95 ++++++++--
 5 files changed, 226 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 00b8282..60010d0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2612. Support for showing allocation delays due to internal preemption
   TEZ-2808. Race condition between preemption and container assignment
   TEZ-2807. Log data in the finish event instead of the start event
   TEZ-2799. SimpleHistoryParser NPE

http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
index 94547d4..7259667 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
@@ -403,6 +403,7 @@ public class VertexInfo extends BaseInfo {
     return Collections.unmodifiableList(outputVertices);
   }
 
+  // expensive method to call for large DAGs as it creates big lists on every call
   private List<TaskAttemptInfo> getTaskAttemptsInternal() {
     List<TaskAttemptInfo> taskAttemptInfos = Lists.newLinkedList();
     for (TaskInfo taskInfo : getTasks()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index 350f783..3eb7701 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -33,11 +33,13 @@ import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.Ent
 import org.apache.tez.analyzer.utils.SVGUtils;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.history.parser.datamodel.Container;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
 import org.apache.tez.history.parser.datamodel.VertexInfo;
 import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -112,13 +114,15 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
     TaskAttemptInfo lastAttempt = null;
     long lastAttemptFinishTime = 0;
     for (VertexInfo vertex : dagInfo.getVertices()) {
-      for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
-        attempts.put(attempt.getTaskAttemptId(), attempt);
-        if (attempt.getStatus().equals(succeededState) ||
-            attempt.getStatus().equals(failedState)) {
-          if (lastAttemptFinishTime < attempt.getFinishTime()) {
-            lastAttempt = attempt;
-            lastAttemptFinishTime = attempt.getFinishTime();
+      for (TaskInfo task : vertex.getTasks()) {
+        for (TaskAttemptInfo attempt : task.getTaskAttempts()) { 
+          attempts.put(attempt.getTaskAttemptId(), attempt);
+          if (attempt.getStatus().equals(succeededState) ||
+              attempt.getStatus().equals(failedState)) {
+            if (lastAttemptFinishTime < attempt.getFinishTime()) {
+              lastAttempt = attempt;
+              lastAttemptFinishTime = attempt.getFinishTime();
+            }
           }
         }
       }
@@ -149,68 +153,112 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
     svg.saveCriticalPathAsSVG(dagInfo, outputFileName, criticalPath);
   }
   
-  private void analyzeCriticalPath(DagInfo dag) {
-    if (!criticalPath.isEmpty()) {
-      System.out.println("Walking critical path for dag " + dag.getDagId());
-      long dagStartTime = dag.getStartTime();
-      long dagTime = dag.getFinishTime() - dagStartTime;
-      long totalAttemptCriticalTime = 0;
-      for (int i = 0; i < criticalPath.size(); ++i) {
-        CriticalPathStep step = criticalPath.get(i);
-        totalAttemptCriticalTime += (step.stopCriticalPathTime - step.startCriticalPathTime);
-        TaskAttemptInfo attempt = step.attempt;
-        if (step.getType() == EntityType.ATTEMPT) {
-          // analyze execution overhead
-          long avgExecutionTime = attempt.getTaskInfo().getVertexInfo()
-              .getAvgExecutionTimeInterval();
-          if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) {
-            step.notes
-                .add("Potential straggler. Execution time " + 
-                    SVGUtils.getTimeStr(attempt.getExecutionTimeInterval())
-                    + " compared to vertex average of " + 
-                    SVGUtils.getTimeStr(avgExecutionTime));
+  private void analyzeAllocationOverhead(DagInfo dag) {
+    List<TaskAttemptInfo> preemptedAttempts = Lists.newArrayList();
+    for (VertexInfo v : dag.getVertices()) {
+      for (TaskInfo t : v.getTasks()) {
+        for (TaskAttemptInfo a : t.getTaskAttempts()) {
+          if (a.getTerminationCause().equals(
+              TaskAttemptTerminationCause.INTERNAL_PREEMPTION.name())) {
+            System.out.println("Found preempted attempt " + a.getTaskAttemptId());
+            preemptedAttempts.add(a);
           }
-          
-          if (attempt.getStartTime() > step.startCriticalPathTime) {
-            // the attempt is critical before launching. So allocation overhead needs analysis
-            // analyzer allocation overhead
-            Container container = attempt.getContainer();
-            if (container != null) {
-              Collection<TaskAttemptInfo> attempts = dag.getContainerMapping().get(container);
-              if (attempts != null && !attempts.isEmpty()) {
-                // arrange attempts by allocation time
-                List<TaskAttemptInfo> attemptsList = Lists.newArrayList(attempts);
-                Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime());
-                // walk the list to record allocation time before the current attempt
-                long containerPreviousAllocatedTime = 0;
-                for (TaskAttemptInfo containerAttempt : attemptsList) {
-                  if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) {
-                    break;
-                  }
-                  System.out.println("Container: " + container.getId() + " running att: " + 
-                  containerAttempt.getTaskAttemptId() + " wait att: " + attempt.getTaskAttemptId());
-                  containerPreviousAllocatedTime += containerAttempt.getAllocationToEndTimeInterval();
-                }
-                if (containerPreviousAllocatedTime == 0) {
-                  step.notes.add("Container " + container.getId() + " newly allocated.");
-                } else {
-                  if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) {
-                    step.notes.add("Container " + container.getId() + " was fully allocated");
-                  } else {
-                    step.notes.add("Container " + container.getId() + " allocated for " + 
-                    SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " +
-                        SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) + 
-                        " of allocation wait time");
-                  }
-                }
-              }
+        }
+      }
+    }
+    for (int i = 0; i < criticalPath.size(); ++i) {
+      CriticalPathStep step = criticalPath.get(i);
+      TaskAttemptInfo attempt = step.attempt;
+      if (step.getType() != EntityType.ATTEMPT) {
+        continue;
+      }
+      
+      long creationTime = attempt.getCreationTime();
+      long allocationTime = attempt.getAllocationTime();
+      if (allocationTime < step.startCriticalPathTime) {
+        // allocated before it became critical
+        continue;
+      }
+
+      // the attempt is critical before allocation. So allocation overhead needs analysis
+      Container container = attempt.getContainer();
+      if (container != null) {
+        Collection<TaskAttemptInfo> attempts = dag.getContainerMapping().get(container);
+        if (attempts != null && !attempts.isEmpty()) {
+          // arrange attempts by allocation time
+          List<TaskAttemptInfo> attemptsList = Lists.newArrayList(attempts);
+          Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime());
+          // walk the list to record allocation time before the current attempt
+          long containerPreviousAllocatedTime = 0;
+          for (TaskAttemptInfo containerAttempt : attemptsList) {
+            if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) {
+              break;
+            }
+            System.out.println("Container: " + container.getId() + " running att: " + 
+            containerAttempt.getTaskAttemptId() + " wait att: " + attempt.getTaskAttemptId());
+            containerPreviousAllocatedTime += containerAttempt.getAllocationToEndTimeInterval();
+          }
+          if (containerPreviousAllocatedTime == 0) {
+            step.notes.add("Container " + container.getId() + " newly allocated.");
+          } else {
+            if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) {
+              step.notes.add("Container " + container.getId() + " was fully allocated");
+            } else {
+              step.notes.add("Container " + container.getId() + " allocated for " + 
+              SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " +
+                  SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) + 
+                  " of allocation wait time");
             }
           }
         }
+        // look for internal preemptions while attempt was waiting for allocation
+        for (TaskAttemptInfo a : preemptedAttempts) {
+          if (a.getFinishTime() > creationTime && a.getFinishTime() < allocationTime){
+            // found an attempt that was preempted within this time interval
+            step.notes.add("Potentially waited for preemption of " + a.getShortName());
+          }
+        }
       }
-      System.out
-          .println("DAG time taken: " + dagTime + " TotalAttemptTime: " + totalAttemptCriticalTime
-              + " DAG finish time: " + dag.getFinishTime() + " DAG start time: " + dagStartTime);
+    }
+  }
+  
+  private void analyzeStragglers(DagInfo dag) {
+    long dagStartTime = dag.getStartTime();
+    long dagTime = dag.getFinishTime() - dagStartTime;
+    long totalAttemptCriticalTime = 0;
+    for (int i = 0; i < criticalPath.size(); ++i) {
+      CriticalPathStep step = criticalPath.get(i);
+      totalAttemptCriticalTime += (step.stopCriticalPathTime - step.startCriticalPathTime);
+      TaskAttemptInfo attempt = step.attempt;
+      if (step.getType() == EntityType.ATTEMPT) {
+        // analyze execution overhead
+        if (attempt.getLastDataEvents().size() > 1) {
+          // there were read errors. that could have delayed the attempt. ignore this
+          continue;
+        }
+        long avgExecutionTime = attempt.getTaskInfo().getVertexInfo()
+            .getAvgExecutionTimeInterval();
+        if (avgExecutionTime <= 0) {
+          continue;
+        }
+        if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) {
+          step.notes
+              .add("Potential straggler. Execution time " + 
+                  SVGUtils.getTimeStr(attempt.getExecutionTimeInterval())
+                  + " compared to vertex average of " + 
+                  SVGUtils.getTimeStr(avgExecutionTime));
+        }
+      }
+    }
+    System.out
+        .println("DAG time taken: " + dagTime + " TotalAttemptTime: " + totalAttemptCriticalTime
+            + " DAG finish time: " + dag.getFinishTime() + " DAG start time: " + dagStartTime);
+  }
+  
+  private void analyzeCriticalPath(DagInfo dag) {
+    if (!criticalPath.isEmpty()) {
+      analyzeStragglers(dag);
+      analyzeAllocationOverhead(dag);
     }
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
index 61b1676..88a2105 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
@@ -50,10 +50,11 @@ public class SVGUtils {
   private static final int STEP_GAP = 50;
   private static final int TEXT_SIZE = 20;
   private static final String RUNTIME_COLOR = "LightGreen";
+  private static final String ERROR_COLOR = "Tomato";
   private static final String ALLOCATION_OVERHEAD_COLOR = "GoldenRod";
   private static final String LAUNCH_OVERHEAD_COLOR = "DarkSalmon";
   private static final String BORDER_COLOR = "Sienna";
-  private static final String VERTEX_INIT_COMMIT_COLOR = "orange";
+  private static final String VERTEX_INIT_COMMIT_COLOR = "LightSalmon";
   private static final String CRITICAL_COLOR = "IndianRed";
   private static final float RECT_OPACITY = 1.0f;
   private static final String TITLE_BR = "&#13;";
@@ -162,8 +163,10 @@ public class SVGUtils {
       int startCriticalTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime);
       int stopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime);
       int creationTimeInterval = (int) (attempt.getCreationTime() - dagStartTime);
-      int allocationTimeInterval = (int) (attempt.getAllocationTime() - dagStartTime);
-      int launchTimeInterval = (int) (attempt.getStartTime() - dagStartTime);
+      int allocationTimeInterval = attempt.getAllocationTime() > 0 ? 
+          (int) (attempt.getAllocationTime() - dagStartTime) : 0;
+      int launchTimeInterval = attempt.getStartTime() > 0 ? 
+          (int) (attempt.getStartTime() - dagStartTime) : 0;
       int finishTimeInterval = (int) (attempt.getFinishTime() - dagStartTime);
       System.out.println(attempt.getTaskAttemptId() + " " + creationTimeInterval + " "
           + allocationTimeInterval + " " + launchTimeInterval + " " + finishTimeInterval);
@@ -178,22 +181,37 @@ public class SVGUtils {
       title.append("Critical start at: " + getTimeStr(startCriticalTimeInterval)).append(TITLE_BR);
       title.append("Critical stop at: " + getTimeStr(stopCriticalTimeInterval)).append(TITLE_BR);
       title.append("Created at: " + getTimeStr(creationTimeInterval)).append(TITLE_BR);
-      title.append("Allocated at: " + getTimeStr(allocationTimeInterval)).append(TITLE_BR);
-      title.append("Launched at: " + getTimeStr(launchTimeInterval)).append(TITLE_BR);
+      if (allocationTimeInterval > 0) {
+        title.append("Allocated at: " + getTimeStr(allocationTimeInterval)).append(TITLE_BR);
+      }
+      if (launchTimeInterval > 0) {
+        title.append("Launched at: " + getTimeStr(launchTimeInterval)).append(TITLE_BR);
+      }
       title.append("Finished at: " + getTimeStr(finishTimeInterval)).append(TITLE_BR);
       title.append(Joiner.on(TITLE_BR).join(step.getNotes()));
       String titleStr = title.toString();
 
-      addRectStr(creationTimeInterval, allocationTimeInterval - creationTimeInterval,
-          yOffset * STEP_GAP, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY,
-          titleStr);
-
-      addRectStr(allocationTimeInterval, launchTimeInterval - allocationTimeInterval,
-          yOffset * STEP_GAP, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY,
-          titleStr);
-
-      addRectStr(launchTimeInterval, finishTimeInterval - launchTimeInterval, yOffset * STEP_GAP,
-          STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
+      // handle cases when attempt fails before allocation or launch
+      if (allocationTimeInterval > 0) {
+        addRectStr(creationTimeInterval, allocationTimeInterval - creationTimeInterval,
+            yOffset * STEP_GAP, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY,
+            titleStr);
+        if (launchTimeInterval > 0) {
+          addRectStr(allocationTimeInterval, launchTimeInterval - allocationTimeInterval,
+              yOffset * STEP_GAP, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY,
+              titleStr);          
+          addRectStr(launchTimeInterval, finishTimeInterval - launchTimeInterval, yOffset * STEP_GAP,
+              STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
+        } else {
+          // no launch - so allocate to finish drawn
+          addRectStr(allocationTimeInterval, finishTimeInterval - allocationTimeInterval, yOffset * STEP_GAP,
+              STEP_GAP, ERROR_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);        
+        }
+      } else {
+        // no allocation - so create to finish drawn
+        addRectStr(creationTimeInterval, finishTimeInterval - creationTimeInterval, yOffset * STEP_GAP,
+            STEP_GAP, ERROR_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);        
+      }
 
       addTextStr((finishTimeInterval + creationTimeInterval) / 2,
           (yOffset * STEP_GAP + STEP_GAP / 2),   attempt.getShortName(), "middle", TEXT_SIZE, 

http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
index f3a69a6..f54a15a 100644
--- a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
@@ -42,6 +42,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.history.ATSImportTool;
 import org.apache.tez.history.parser.ATSFileParser;
@@ -136,7 +137,7 @@ public class TestAnalyzer {
     conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR);
 
     miniTezCluster =
-        new MiniTezClusterWithTimeline(TestAnalyzer.class.getName(), 4, 1, 1, true);
+        new MiniTezClusterWithTimeline(TestAnalyzer.class.getName(), 1, 1, 1, true);
 
     miniTezCluster.init(conf);
     miniTezCluster.start();
@@ -161,15 +162,26 @@ public class TestAnalyzer {
   }
   
   StepCheck createStep(String attempt, CriticalPathDependency reason) {
-    return new StepCheck(attempt, reason);
+    return createStep(attempt, reason, null, null);
+  }
+  
+  StepCheck createStep(String attempt, CriticalPathDependency reason, 
+      TaskAttemptTerminationCause errCause, List<String> notes) {
+    return new StepCheck(attempt, reason, errCause, notes);
   }
   
   class StepCheck {
     String attempt; // attempt is the TaskAttemptInfo short name with regex
     CriticalPathDependency reason;
-    StepCheck(String attempt, CriticalPathDependency reason) {
+    TaskAttemptTerminationCause errCause;
+    List<String> notesStr;
+
+    StepCheck(String attempt, CriticalPathDependency reason, 
+        TaskAttemptTerminationCause cause, List<String> notes) {
       this.attempt = attempt;
       this.reason = reason;
+      this.errCause = cause;
+      this.notesStr = notes;
     }
     String getAttemptDetail() {
       return attempt;
@@ -177,6 +189,12 @@ public class TestAnalyzer {
     CriticalPathDependency getReason() {
       return reason;
     }
+    TaskAttemptTerminationCause getErrCause() {
+      return errCause;
+    }
+    List<String> getNotesStr() {
+      return notesStr;
+    }
   }
 
   DagInfo runDAGAndVerify(DAG dag, DAGStatus.State finalState, List<StepCheck[]> steps) throws Exception {
@@ -232,7 +250,8 @@ public class TestAnalyzer {
     for (CriticalPathStep step : criticalPath) {
       LOG.info("ABC Step: " + step.getType());
       if (step.getType() == EntityType.ATTEMPT) {
-        LOG.info("ABC Attempt: " + step.getAttempt().getShortName() + " " + step.getAttempt().getDetailedStatus());
+        LOG.info("ABC Attempt: " + step.getAttempt().getShortName() 
+            + " " + step.getAttempt().getDetailedStatus());
       }
       LOG.info("ABC Reason: " + step.getReason());
       String notes = Joiner.on(";").join(step.getNotes());
@@ -248,12 +267,22 @@ public class TestAnalyzer {
             criticalPath.get(0).getAttempt().getShortName());
 
         for (int i=1; i<criticalPath.size() - 1; ++i) {
+          StepCheck check = steps[i-1];
           CriticalPathStep step = criticalPath.get(i);
           Assert.assertEquals(CriticalPathStep.EntityType.ATTEMPT, step.getType());
-          Assert.assertTrue(steps[i-1].getAttemptDetail(), 
-              step.getAttempt().getShortName().matches(steps[i-1].getAttemptDetail()));
-          //Assert.assertEquals(steps[i-1].getAttemptDetail(), step.getAttempt().getShortName());
+          Assert.assertTrue(check.getAttemptDetail(), 
+              step.getAttempt().getShortName().matches(check.getAttemptDetail()));
           Assert.assertEquals(steps[i-1].getReason(), step.getReason());
+          if (check.getErrCause() != null) {
+            Assert.assertEquals(check.getErrCause(),
+                TaskAttemptTerminationCause.valueOf(step.getAttempt().getTerminationCause()));
+          }
+          if (check.getNotesStr() != null) {
+            String notes = Joiner.on("#").join(step.getNotes());
+            for (String note : check.getNotesStr()) {
+              Assert.assertTrue(note, notes.contains(notes));
+            }
+          }
         }
     
         Assert.assertEquals(CriticalPathStep.EntityType.DAG_COMMIT,
@@ -435,16 +464,17 @@ public class TestAnalyzer {
    * @param failAndExit whether input failure should trigger attempt exit 
    */
   private void setCascadingInputFailureConfig(Configuration testConf, 
-                                              boolean failAndExit) {
+                                              boolean failAndExit,
+                                              int numTasks) {
     // v2 attempt0 succeeds.
-    // v2 task0 attempt1 input0 fails up to version 0.
-    testConf.setInt(SimpleTestDAG3Vertices.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+    // v2 all tasks attempt1 input0 fail up to version 0.
+    testConf.setInt(SimpleTestDAG3Vertices.TEZ_SIMPLE_DAG_NUM_TASKS, numTasks);
     testConf.setBoolean(TestInput.getVertexConfName(
             TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
     testConf.setBoolean(TestInput.getVertexConfName(
             TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), failAndExit);
     testConf.set(TestInput.getVertexConfName(
-            TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+            TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "-1");
     testConf.set(TestInput.getVertexConfName(
             TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "1");
     testConf.set(TestInput.getVertexConfName(
@@ -453,17 +483,17 @@ public class TestAnalyzer {
             TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"),
             0);
 
-    //v3 all-tasks attempt0 input0 fails up to version 0.
+    //v3 task0 attempt0 all inputs fails up to version 0.
     testConf.setBoolean(TestInput.getVertexConfName(
             TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
     testConf.setBoolean(TestInput.getVertexConfName(
             TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), failAndExit);
     testConf.set(TestInput.getVertexConfName(
-            TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1");
+            TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "0");
     testConf.set(TestInput.getVertexConfName(
             TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
     testConf.set(TestInput.getVertexConfName(
-            TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "0");
+            TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1");
     testConf.setInt(TestInput.getVertexConfName(
             TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"),
             0);
@@ -483,7 +513,7 @@ public class TestAnalyzer {
   @Test (timeout=60000)
   public void testCascadingInputFailureWithoutExitSuccess() throws Exception {
     Configuration testConf = new Configuration(false);
-    setCascadingInputFailureConfig(testConf, false);
+    setCascadingInputFailureConfig(testConf, false, 1);
 
     StepCheck[] check = {
         createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
@@ -514,7 +544,7 @@ public class TestAnalyzer {
   @Test (timeout=60000)
   public void testCascadingInputFailureWithExitSuccess() throws Exception {
     Configuration testConf = new Configuration(false);
-    setCascadingInputFailureConfig(testConf, true);
+    setCascadingInputFailureConfig(testConf, true, 1);
     
     StepCheck[] check = {
         createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
@@ -532,6 +562,39 @@ public class TestAnalyzer {
   }
   
   /**
+   * 1 NM is running and can run 4 containers based on YARN mini cluster defaults and 
+   * Tez defaults for AM/task memory
+   * v3 task0 reports read errors against both tasks of v2. This re-starts both of them.
+   * Now all 4 slots are occupied 1 AM + 3 tasks
+   * Now retries of v2 report read error against 1 task of v1. That re-starts.
+   * Retry of v1 task has no space - so it preempts the least priority task (current tez logic)
+   * v3 is preempted and re-run. Shows up on critical path as preempted failure.
+   * Also v1 retry attempts note show that it caused preemption of v3
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testInternalPreemption() throws Exception {
+    Configuration testConf = new Configuration(false);
+    setCascadingInputFailureConfig(testConf, false, 2);
+
+    StepCheck[] check = {
+        createStep("v1 : 00000[01]_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v2 : 00000[01]_0", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY, 
+            TaskAttemptTerminationCause.INTERNAL_PREEMPTION, null),
+        createStep("v2 : 00000[01]_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY,
+            null, Collections.singletonList("preemption of v3")),
+        createStep("v2 : 00000[01]_1", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v3 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY)
+    };
+    
+    DAG dag = SimpleTestDAG3Vertices.createDAG(
+              "testInternalPreemption", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+  }
+  
+  /**
    * Input failure of v3 causes rerun of both both v1 and v2 vertices. 
    *   v1  v2
    *    \ /