You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/10/27 14:37:53 UTC

tez git commit: TEZ-2888. Make critical path calculation resilient to AM crash (bikas)

Repository: tez
Updated Branches:
  refs/heads/master c35e5cc86 -> fede4c771


TEZ-2888. Make critical path calculation resilient to AM crash (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fede4c77
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fede4c77
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fede4c77

Branch: refs/heads/master
Commit: fede4c7713bfa34c825fbc9dc532475f98b67e2e
Parents: c35e5cc
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Oct 27 06:37:28 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Oct 27 06:37:28 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../parser/datamodel/TaskAttemptInfo.java       |  14 ++
 .../history/parser/datamodel/VertexInfo.java    |  20 +-
 .../analyzer/plugins/CriticalPathAnalyzer.java  | 185 +++++++++++++++++--
 .../org/apache/tez/analyzer/utils/SVGUtils.java |   8 +-
 5 files changed, 202 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 69746ac..02df677 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.2: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2888. Make critical path calculation resilient to AM crash
   TEZ-2899. Tez UI: DAG getting created with huge horizontal gap in between vertices
   TEZ-2907. NPE in IFile.Reader.getLength during final merge operation
   TEZ-2903. Stop using proprietary APIs in RPCLoadGen.

http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index acbefea..d373513 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -162,6 +162,20 @@ public class TaskAttemptInfo extends BaseInfo {
   public final long getExecutionTimeInterval() {
     return executionTimeInterval;
   }
+  
+  public final long getPostDataExecutionTimeInterval() {
+    if (getStartTime() > 0 && getFinishTime() > 0) {
+      // start time defaults to the actual start time
+      long postDataStartTime = startTime;
+      if (getLastDataEvents() != null && !getLastDataEvents().isEmpty()) {
+        // if last data event is after the start time then use last data event time
+        long lastEventTime = getLastDataEvents().get(getLastDataEvents().size()-1).getTimestamp();
+        postDataStartTime = startTime > lastEventTime ? startTime : lastEventTime;
+      }
+      return (getFinishTime() - postDataStartTime);
+    }
+    return -1;
+  }
 
   public final long getAllocationToEndTimeInterval() {
     return (endTime - allocationTime);

http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
index 7259667..50647fe 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
@@ -75,7 +75,7 @@ public class VertexInfo extends BaseInfo {
   private final List<AdditionalInputOutputDetails> additionalInputInfoList;
   private final List<AdditionalInputOutputDetails> additionalOutputInfoList;
   
-  private long avgExecutionTimeInterval = -1;
+  private long avgPostDataExecutionTimeInterval = -1;
 
   private DagInfo dagInfo;
 
@@ -86,7 +86,7 @@ public class VertexInfo extends BaseInfo {
         jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
             (Constants.TEZ_VERTEX_ID));
 
-    vertexId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY_TYPE));
+    vertexId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY));
     taskInfoMap = Maps.newHashMap();
 
     inEdgeList = Lists.newLinkedList();
@@ -195,22 +195,26 @@ public class VertexInfo extends BaseInfo {
     return getLastTaskToFinish().getFinishTimeInterval();
   }
   
-  public final long getAvgExecutionTimeInterval() {
-    if (avgExecutionTimeInterval == -1) {
+  public final long getAvgPostDataExecutionTimeInterval() {
+    if (avgPostDataExecutionTimeInterval == -1) {
       long totalExecutionTime = 0;
       long totalAttempts = 0;
       for (TaskInfo task : getTasks()) {
         TaskAttemptInfo attempt = task.getSuccessfulTaskAttempt();
         if (attempt != null) {
-          totalExecutionTime += attempt.getExecutionTimeInterval();
-          totalAttempts++;
+          // count only time after last data was received
+          long execTime = attempt.getPostDataExecutionTimeInterval();
+          if (execTime >= 0) {
+            totalExecutionTime += execTime;
+            totalAttempts++;
+          }
         }
       }
       if (totalAttempts > 0) {
-        avgExecutionTimeInterval = Math.round(totalExecutionTime*1.0/totalAttempts);
+        avgPostDataExecutionTimeInterval = Math.round(totalExecutionTime*1.0/totalAttempts);
       }
     }
-    return avgExecutionTimeInterval;
+    return avgPostDataExecutionTimeInterval;
   }
 
   public final long getStartTime() {

http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index 4062142..d4efdf9 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -19,6 +19,7 @@
 package org.apache.tez.analyzer.plugins;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -104,6 +105,9 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
   
   Map<String, TaskAttemptInfo> attempts = Maps.newHashMap();
 
+  int maxConcurrency = 0;
+  ArrayList<TimeInfo> concurrencyByTime = Lists.newArrayList();
+
   public CriticalPathAnalyzer() {
   }
 
@@ -153,6 +157,92 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
     svg.saveCriticalPathAsSVG(dagInfo, outputFileName, criticalPath);
   }
   
+  static class TimeInfo implements Comparable<TimeInfo> {
+    long timestamp;
+    int count;
+    boolean start;
+    TimeInfo(long timestamp, boolean start) {
+      this.timestamp = timestamp;
+      this.start = start;
+    }
+    
+    @Override
+    public int compareTo(TimeInfo o) {
+      return Long.compare(this.timestamp, o.timestamp);
+    }
+    
+    @Override
+    public int hashCode() {
+      return (int)((timestamp >> 32) ^ timestamp);
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if(o == null) {
+        return false;
+      }
+      if (o.getClass() == this.getClass()) {
+        TimeInfo other = (TimeInfo) o;
+        return (this.compareTo(other) == 0);
+      }
+      else {
+        return false;
+      }
+    }
+  }
+  
+  private void determineConcurrency(DagInfo dag) {
+    ArrayList<TimeInfo> timeInfo = Lists.newArrayList();
+    for (VertexInfo v : dag.getVertices()) {
+      for (TaskInfo t : v.getTasks()) {
+        for (TaskAttemptInfo a : t.getTaskAttempts()) {
+          if (a.getStartTime() > 0) {
+            timeInfo.add(new TimeInfo(a.getStartTime(), true));
+            timeInfo.add(new TimeInfo(a.getFinishTime(), false));
+          }
+        }
+      }
+    }
+    Collections.sort(timeInfo);
+    
+    int concurrency = 0;
+    TimeInfo lastTimeInfo = null;
+    for (TimeInfo t : timeInfo) {
+      concurrency += (t.start) ? 1 : -1;
+      maxConcurrency = (concurrency > maxConcurrency) ? concurrency : maxConcurrency;
+      if (lastTimeInfo == null || lastTimeInfo.timestamp < t.timestamp) {
+        lastTimeInfo = t;
+        lastTimeInfo.count = concurrency;
+        concurrencyByTime.add(lastTimeInfo);        
+      } else {
+        // lastTimeInfo.timestamp == t.timestamp
+        lastTimeInfo.count = concurrency;
+      }
+    }
+//    for (TimeInfo t : concurrencyByTime) {
+//      System.out.println(t.timestamp + " " + t.count);
+//    }
+  }
+  
+  private int getIntervalMaxConcurrency(long begin, long end) {
+    int concurrency = 0;
+    for (TimeInfo timeInfo : concurrencyByTime) {
+      if (timeInfo.timestamp < begin) {
+        continue;
+      }
+      if (timeInfo.timestamp > end) {
+        break;
+      }
+      if (timeInfo.count > concurrency) {
+        concurrency = timeInfo.count;
+      }
+    }
+    return concurrency;
+  }
+  
   private void analyzeAllocationOverhead(DagInfo dag) {
     List<TaskAttemptInfo> preemptedAttempts = Lists.newArrayList();
     for (VertexInfo v : dag.getVertices()) {
@@ -175,6 +265,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
       
       long creationTime = attempt.getCreationTime();
       long allocationTime = attempt.getAllocationTime();
+      long finishTime = attempt.getFinishTime();
       if (allocationTime < step.startCriticalPathTime) {
         // allocated before it became critical
         continue;
@@ -190,7 +281,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
           Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime());
           // walk the list to record allocation time before the current attempt
           long containerPreviousAllocatedTime = 0;
-          int wavesForVertex = 1;
+          int reUsesForVertex = 1;
           for (TaskAttemptInfo containerAttempt : attemptsList) {
             if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) {
               break;
@@ -199,14 +290,28 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
                 attempt.getTaskInfo().getVertexInfo().getVertexId())) {
               // another task from the same vertex ran in this container. So there are multiple 
               // waves for this vertex on this container.
-              wavesForVertex++;
+              reUsesForVertex++;
+            }
+            long cAllocTime = containerAttempt.getAllocationTime();
+            long cFinishTime = containerAttempt.getFinishTime();
+            if (cFinishTime > creationTime) {
+              // for containerAttempts that used the container while this attempt was waiting
+              // add up time container was allocated to containerAttempt. Account for allocations
+              // that started before this attempt was created.
+              containerPreviousAllocatedTime += 
+                  (cFinishTime - (cAllocTime > creationTime ? cAllocTime : creationTime));
             }
-            System.out.println("Container: " + container.getId() + " running att: " + 
-            containerAttempt.getTaskAttemptId() + " wait att: " + attempt.getTaskAttemptId());
-            containerPreviousAllocatedTime += containerAttempt.getAllocationToEndTimeInterval();
           }
-          if (wavesForVertex > 1) {
-            step.notes.add("Container ran multiple waves for this vertex.");
+          int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks();
+          int intervalMaxConcurrency = getIntervalMaxConcurrency(creationTime, finishTime);
+          double numWaves = getWaves(numVertexTasks, intervalMaxConcurrency);
+          
+          if (reUsesForVertex > 1) {
+            step.notes.add("Container ran multiple tasks for this vertex. ");
+            if (numWaves < 1) {
+              // less than 1 wave total but still ran more than 1 on this container
+              step.notes.add("Vertex potentially seeing contention from other branches in the DAG. ");
+            }
           }
           if (containerPreviousAllocatedTime == 0) {
             step.notes.add("Container newly allocated.");
@@ -223,7 +328,12 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
         }
         // look for internal preemptions while attempt was waiting for allocation
         for (TaskAttemptInfo a : preemptedAttempts) {
-          if (a.getFinishTime() > creationTime && a.getFinishTime() < allocationTime){
+          if (a.getTaskInfo().getVertexInfo().getVertexId()
+              .equals(attempt.getTaskInfo().getVertexInfo().getVertexId())) {
+            // dont preempt same vertex task. ideally this should look at priority but we dont have it
+            continue;
+          }
+          if (a.getFinishTime() > creationTime && a.getFinishTime() < allocationTime) {
             // found an attempt that was preempted within this time interval
             step.notes.add("Potentially waited for preemption of " + a.getShortName());
           }
@@ -232,6 +342,41 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
     }
   }
   
+  private double getWaves(int numTasks, int concurrency) {
+    double numWaves = (numTasks*1.0) / concurrency;
+    numWaves = (double)Math.round(numWaves * 10d) / 10d; // convert to 1 decimal place
+    return numWaves;
+  }
+  
+  private void analyzeWaves(DagInfo dag) {
+    for (int i = 0; i < criticalPath.size(); ++i) {
+      CriticalPathStep step = criticalPath.get(i);
+      TaskAttemptInfo attempt = step.attempt;
+      if (step.getType() != EntityType.ATTEMPT) {
+        continue;
+      }
+      long creationTime = attempt.getCreationTime();
+      long finishTime = attempt.getFinishTime();
+
+      int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks();
+      if (numVertexTasks <= 1) {
+        continue;
+      }
+      int intervalMaxConcurrency = getIntervalMaxConcurrency(creationTime, finishTime);
+      double numWaves = getWaves(numVertexTasks, intervalMaxConcurrency);
+
+      step.notes.add("Vertex ran " + numVertexTasks
+          + " tasks in " + numWaves
+          + " waves with available concurrency of " + intervalMaxConcurrency);
+      if (numWaves > 1) {
+        if (numWaves%1 < 0.5) {
+          // more than 1 wave needed and last wave is small
+          step.notes.add("Last partial wave did not use full concurrency. ");
+        }
+      }
+    }
+  }
+  
   private void analyzeStragglers(DagInfo dag) {
     long dagStartTime = dag.getStartTime();
     long dagTime = dag.getFinishTime() - dagStartTime;
@@ -246,17 +391,18 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
           // there were read errors. that could have delayed the attempt. ignore this
           continue;
         }
-        long avgExecutionTime = attempt.getTaskInfo().getVertexInfo()
-            .getAvgExecutionTimeInterval();
-        if (avgExecutionTime <= 0) {
+        long avgPostDataExecutionTime = attempt.getTaskInfo().getVertexInfo()
+            .getAvgPostDataExecutionTimeInterval();
+        if (avgPostDataExecutionTime <= 0) {
           continue;
         }
-        if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) {
+        long attemptExecTime = attempt.getPostDataExecutionTimeInterval();
+        if (avgPostDataExecutionTime * 1.25 < attemptExecTime) {
           step.notes
-              .add("Potential straggler. Execution time " + 
-                  SVGUtils.getTimeStr(attempt.getExecutionTimeInterval())
+              .add("Potential straggler. Post Data Execution time " + 
+                  SVGUtils.getTimeStr(attemptExecTime)
                   + " compared to vertex average of " + 
-                  SVGUtils.getTimeStr(avgExecutionTime));
+                  SVGUtils.getTimeStr(avgPostDataExecutionTime));
         }
       }
     }
@@ -267,7 +413,9 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
   
   private void analyzeCriticalPath(DagInfo dag) {
     if (!criticalPath.isEmpty()) {
+      determineConcurrency(dag);
       analyzeStragglers(dag);
+      analyzeWaves(dag);
       analyzeAllocationOverhead(dag);
     }
   }
@@ -281,7 +429,12 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
       long currentAttemptStopCriticalPathTime = lastAttemptFinishTime;
 
       // add the commit step
-      currentStep.stopCriticalPathTime = dagInfo.getFinishTime();
+      if (dagInfo.getFinishTime() > 0) {
+        currentStep.stopCriticalPathTime = dagInfo.getFinishTime();
+      } else {
+        // AM crashed and no dag finished written
+        currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime;
+      }
       currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime;
       currentStep.reason = CriticalPathDependency.COMMIT_DEPENDENCY;
       tempCP.add(currentStep);

http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
index 2e94ec0..78cb921 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
@@ -219,11 +219,15 @@ public class SVGUtils {
   }
 
   private void drawCritical(DagInfo dagInfo, List<CriticalPathStep> criticalPath) {
-    int duration = (int) dagInfo.getFinishTimeInterval();
-    MAX_DAG_RUNTIME = duration;
     long dagStartTime = dagInfo.getStartTime();
     int dagStartTimeInterval = 0; // this is 0 since we are offseting from the dag start time
     int dagFinishTimeInterval = (int) (dagInfo.getFinishTime() - dagStartTime);
+    if (dagInfo.getFinishTime() <= 0) {
+      // AM crashed. no dag finish time written
+      dagFinishTimeInterval =(int) (criticalPath.get(criticalPath.size()-1).getStopCriticalTime()
+          - dagStartTime);
+    }
+    MAX_DAG_RUNTIME = dagFinishTimeInterval;
     
     // draw grid
     addLineStr(dagStartTimeInterval, 0, dagFinishTimeInterval, 0, BORDER_COLOR, "", TICK);