You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/10/27 14:37:53 UTC
tez git commit: TEZ-2888. Make critical path calculation resilient to
AM crash (bikas)
Repository: tez
Updated Branches:
refs/heads/master c35e5cc86 -> fede4c771
TEZ-2888. Make critical path calculation resilient to AM crash (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fede4c77
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fede4c77
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fede4c77
Branch: refs/heads/master
Commit: fede4c7713bfa34c825fbc9dc532475f98b67e2e
Parents: c35e5cc
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Oct 27 06:37:28 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Oct 27 06:37:28 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../parser/datamodel/TaskAttemptInfo.java | 14 ++
.../history/parser/datamodel/VertexInfo.java | 20 +-
.../analyzer/plugins/CriticalPathAnalyzer.java | 185 +++++++++++++++++--
.../org/apache/tez/analyzer/utils/SVGUtils.java | 8 +-
5 files changed, 202 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 69746ac..02df677 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.2: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2888. Make critical path calculation resilient to AM crash
TEZ-2899. Tez UI: DAG getting created with huge horizontal gap in between vertices
TEZ-2907. NPE in IFile.Reader.getLength during final merge operation
TEZ-2903. Stop using proprietary APIs in RPCLoadGen.
http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index acbefea..d373513 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -162,6 +162,20 @@ public class TaskAttemptInfo extends BaseInfo {
public final long getExecutionTimeInterval() {
return executionTimeInterval;
}
+
+ public final long getPostDataExecutionTimeInterval() {
+ if (getStartTime() > 0 && getFinishTime() > 0) {
+ // start time defaults to the actual start time
+ long postDataStartTime = startTime;
+ if (getLastDataEvents() != null && !getLastDataEvents().isEmpty()) {
+ // if last data event is after the start time then use last data event time
+ long lastEventTime = getLastDataEvents().get(getLastDataEvents().size()-1).getTimestamp();
+ postDataStartTime = startTime > lastEventTime ? startTime : lastEventTime;
+ }
+ return (getFinishTime() - postDataStartTime);
+ }
+ return -1;
+ }
public final long getAllocationToEndTimeInterval() {
return (endTime - allocationTime);
http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
index 7259667..50647fe 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
@@ -75,7 +75,7 @@ public class VertexInfo extends BaseInfo {
private final List<AdditionalInputOutputDetails> additionalInputInfoList;
private final List<AdditionalInputOutputDetails> additionalOutputInfoList;
- private long avgExecutionTimeInterval = -1;
+ private long avgPostDataExecutionTimeInterval = -1;
private DagInfo dagInfo;
@@ -86,7 +86,7 @@ public class VertexInfo extends BaseInfo {
jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
(Constants.TEZ_VERTEX_ID));
- vertexId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY_TYPE));
+ vertexId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY));
taskInfoMap = Maps.newHashMap();
inEdgeList = Lists.newLinkedList();
@@ -195,22 +195,26 @@ public class VertexInfo extends BaseInfo {
return getLastTaskToFinish().getFinishTimeInterval();
}
- public final long getAvgExecutionTimeInterval() {
- if (avgExecutionTimeInterval == -1) {
+ public final long getAvgPostDataExecutionTimeInterval() {
+ if (avgPostDataExecutionTimeInterval == -1) {
long totalExecutionTime = 0;
long totalAttempts = 0;
for (TaskInfo task : getTasks()) {
TaskAttemptInfo attempt = task.getSuccessfulTaskAttempt();
if (attempt != null) {
- totalExecutionTime += attempt.getExecutionTimeInterval();
- totalAttempts++;
+ // count only time after last data was received
+ long execTime = attempt.getPostDataExecutionTimeInterval();
+ if (execTime >= 0) {
+ totalExecutionTime += execTime;
+ totalAttempts++;
+ }
}
}
if (totalAttempts > 0) {
- avgExecutionTimeInterval = Math.round(totalExecutionTime*1.0/totalAttempts);
+ avgPostDataExecutionTimeInterval = Math.round(totalExecutionTime*1.0/totalAttempts);
}
}
- return avgExecutionTimeInterval;
+ return avgPostDataExecutionTimeInterval;
}
public final long getStartTime() {
http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index 4062142..d4efdf9 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -19,6 +19,7 @@
package org.apache.tez.analyzer.plugins;
import java.io.File;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -104,6 +105,9 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
Map<String, TaskAttemptInfo> attempts = Maps.newHashMap();
+ int maxConcurrency = 0;
+ ArrayList<TimeInfo> concurrencyByTime = Lists.newArrayList();
+
public CriticalPathAnalyzer() {
}
@@ -153,6 +157,92 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
svg.saveCriticalPathAsSVG(dagInfo, outputFileName, criticalPath);
}
+ static class TimeInfo implements Comparable<TimeInfo> {
+ long timestamp;
+ int count;
+ boolean start;
+ TimeInfo(long timestamp, boolean start) {
+ this.timestamp = timestamp;
+ this.start = start;
+ }
+
+ @Override
+ public int compareTo(TimeInfo o) {
+ return Long.compare(this.timestamp, o.timestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)((timestamp >> 32) ^ timestamp);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if(o == null) {
+ return false;
+ }
+ if (o.getClass() == this.getClass()) {
+ TimeInfo other = (TimeInfo) o;
+ return (this.compareTo(other) == 0);
+ }
+ else {
+ return false;
+ }
+ }
+ }
+
+ private void determineConcurrency(DagInfo dag) {
+ ArrayList<TimeInfo> timeInfo = Lists.newArrayList();
+ for (VertexInfo v : dag.getVertices()) {
+ for (TaskInfo t : v.getTasks()) {
+ for (TaskAttemptInfo a : t.getTaskAttempts()) {
+ if (a.getStartTime() > 0) {
+ timeInfo.add(new TimeInfo(a.getStartTime(), true));
+ timeInfo.add(new TimeInfo(a.getFinishTime(), false));
+ }
+ }
+ }
+ }
+ Collections.sort(timeInfo);
+
+ int concurrency = 0;
+ TimeInfo lastTimeInfo = null;
+ for (TimeInfo t : timeInfo) {
+ concurrency += (t.start) ? 1 : -1;
+ maxConcurrency = (concurrency > maxConcurrency) ? concurrency : maxConcurrency;
+ if (lastTimeInfo == null || lastTimeInfo.timestamp < t.timestamp) {
+ lastTimeInfo = t;
+ lastTimeInfo.count = concurrency;
+ concurrencyByTime.add(lastTimeInfo);
+ } else {
+ // lastTimeInfo.timestamp == t.timestamp
+ lastTimeInfo.count = concurrency;
+ }
+ }
+// for (TimeInfo t : concurrencyByTime) {
+// System.out.println(t.timestamp + " " + t.count);
+// }
+ }
+
+ private int getIntervalMaxConcurrency(long begin, long end) {
+ int concurrency = 0;
+ for (TimeInfo timeInfo : concurrencyByTime) {
+ if (timeInfo.timestamp < begin) {
+ continue;
+ }
+ if (timeInfo.timestamp > end) {
+ break;
+ }
+ if (timeInfo.count > concurrency) {
+ concurrency = timeInfo.count;
+ }
+ }
+ return concurrency;
+ }
+
private void analyzeAllocationOverhead(DagInfo dag) {
List<TaskAttemptInfo> preemptedAttempts = Lists.newArrayList();
for (VertexInfo v : dag.getVertices()) {
@@ -175,6 +265,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
long creationTime = attempt.getCreationTime();
long allocationTime = attempt.getAllocationTime();
+ long finishTime = attempt.getFinishTime();
if (allocationTime < step.startCriticalPathTime) {
// allocated before it became critical
continue;
@@ -190,7 +281,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime());
// walk the list to record allocation time before the current attempt
long containerPreviousAllocatedTime = 0;
- int wavesForVertex = 1;
+ int reUsesForVertex = 1;
for (TaskAttemptInfo containerAttempt : attemptsList) {
if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) {
break;
@@ -199,14 +290,28 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
attempt.getTaskInfo().getVertexInfo().getVertexId())) {
// another task from the same vertex ran in this container. So there are multiple
// waves for this vertex on this container.
- wavesForVertex++;
+ reUsesForVertex++;
+ }
+ long cAllocTime = containerAttempt.getAllocationTime();
+ long cFinishTime = containerAttempt.getFinishTime();
+ if (cFinishTime > creationTime) {
+ // for containerAttempts that used the container while this attempt was waiting
+ // add up time container was allocated to containerAttempt. Account for allocations
+ // that started before this attempt was created.
+ containerPreviousAllocatedTime +=
+ (cFinishTime - (cAllocTime > creationTime ? cAllocTime : creationTime));
}
- System.out.println("Container: " + container.getId() + " running att: " +
- containerAttempt.getTaskAttemptId() + " wait att: " + attempt.getTaskAttemptId());
- containerPreviousAllocatedTime += containerAttempt.getAllocationToEndTimeInterval();
}
- if (wavesForVertex > 1) {
- step.notes.add("Container ran multiple waves for this vertex.");
+ int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks();
+ int intervalMaxConcurrency = getIntervalMaxConcurrency(creationTime, finishTime);
+ double numWaves = getWaves(numVertexTasks, intervalMaxConcurrency);
+
+ if (reUsesForVertex > 1) {
+ step.notes.add("Container ran multiple tasks for this vertex. ");
+ if (numWaves < 1) {
+ // less than 1 wave total but still ran more than 1 on this container
+ step.notes.add("Vertex potentially seeing contention from other branches in the DAG. ");
+ }
}
if (containerPreviousAllocatedTime == 0) {
step.notes.add("Container newly allocated.");
@@ -223,7 +328,12 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
}
// look for internal preemptions while attempt was waiting for allocation
for (TaskAttemptInfo a : preemptedAttempts) {
- if (a.getFinishTime() > creationTime && a.getFinishTime() < allocationTime){
+ if (a.getTaskInfo().getVertexInfo().getVertexId()
+ .equals(attempt.getTaskInfo().getVertexInfo().getVertexId())) {
+ // dont preempt same vertex task. ideally this should look at priority but we dont have it
+ continue;
+ }
+ if (a.getFinishTime() > creationTime && a.getFinishTime() < allocationTime) {
// found an attempt that was preempted within this time interval
step.notes.add("Potentially waited for preemption of " + a.getShortName());
}
@@ -232,6 +342,41 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
}
}
+ private double getWaves(int numTasks, int concurrency) {
+ double numWaves = (numTasks*1.0) / concurrency;
+ numWaves = (double)Math.round(numWaves * 10d) / 10d; // convert to 1 decimal place
+ return numWaves;
+ }
+
+ private void analyzeWaves(DagInfo dag) {
+ for (int i = 0; i < criticalPath.size(); ++i) {
+ CriticalPathStep step = criticalPath.get(i);
+ TaskAttemptInfo attempt = step.attempt;
+ if (step.getType() != EntityType.ATTEMPT) {
+ continue;
+ }
+ long creationTime = attempt.getCreationTime();
+ long finishTime = attempt.getFinishTime();
+
+ int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks();
+ if (numVertexTasks <= 1) {
+ continue;
+ }
+ int intervalMaxConcurrency = getIntervalMaxConcurrency(creationTime, finishTime);
+ double numWaves = getWaves(numVertexTasks, intervalMaxConcurrency);
+
+ step.notes.add("Vertex ran " + numVertexTasks
+ + " tasks in " + numWaves
+ + " waves with available concurrency of " + intervalMaxConcurrency);
+ if (numWaves > 1) {
+ if (numWaves%1 < 0.5) {
+ // more than 1 wave needed and last wave is small
+ step.notes.add("Last partial wave did not use full concurrency. ");
+ }
+ }
+ }
+ }
+
private void analyzeStragglers(DagInfo dag) {
long dagStartTime = dag.getStartTime();
long dagTime = dag.getFinishTime() - dagStartTime;
@@ -246,17 +391,18 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
// there were read errors. that could have delayed the attempt. ignore this
continue;
}
- long avgExecutionTime = attempt.getTaskInfo().getVertexInfo()
- .getAvgExecutionTimeInterval();
- if (avgExecutionTime <= 0) {
+ long avgPostDataExecutionTime = attempt.getTaskInfo().getVertexInfo()
+ .getAvgPostDataExecutionTimeInterval();
+ if (avgPostDataExecutionTime <= 0) {
continue;
}
- if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) {
+ long attemptExecTime = attempt.getPostDataExecutionTimeInterval();
+ if (avgPostDataExecutionTime * 1.25 < attemptExecTime) {
step.notes
- .add("Potential straggler. Execution time " +
- SVGUtils.getTimeStr(attempt.getExecutionTimeInterval())
+ .add("Potential straggler. Post Data Execution time " +
+ SVGUtils.getTimeStr(attemptExecTime)
+ " compared to vertex average of " +
- SVGUtils.getTimeStr(avgExecutionTime));
+ SVGUtils.getTimeStr(avgPostDataExecutionTime));
}
}
}
@@ -267,7 +413,9 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
private void analyzeCriticalPath(DagInfo dag) {
if (!criticalPath.isEmpty()) {
+ determineConcurrency(dag);
analyzeStragglers(dag);
+ analyzeWaves(dag);
analyzeAllocationOverhead(dag);
}
}
@@ -281,7 +429,12 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
long currentAttemptStopCriticalPathTime = lastAttemptFinishTime;
// add the commit step
- currentStep.stopCriticalPathTime = dagInfo.getFinishTime();
+ if (dagInfo.getFinishTime() > 0) {
+ currentStep.stopCriticalPathTime = dagInfo.getFinishTime();
+ } else {
+ // AM crashed and no dag finished written
+ currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime;
+ }
currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime;
currentStep.reason = CriticalPathDependency.COMMIT_DEPENDENCY;
tempCP.add(currentStep);
http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
index 2e94ec0..78cb921 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
@@ -219,11 +219,15 @@ public class SVGUtils {
}
private void drawCritical(DagInfo dagInfo, List<CriticalPathStep> criticalPath) {
- int duration = (int) dagInfo.getFinishTimeInterval();
- MAX_DAG_RUNTIME = duration;
long dagStartTime = dagInfo.getStartTime();
int dagStartTimeInterval = 0; // this is 0 since we are offseting from the dag start time
int dagFinishTimeInterval = (int) (dagInfo.getFinishTime() - dagStartTime);
+ if (dagInfo.getFinishTime() <= 0) {
+ // AM crashed. no dag finish time written
+ dagFinishTimeInterval =(int) (criticalPath.get(criticalPath.size()-1).getStopCriticalTime()
+ - dagStartTime);
+ }
+ MAX_DAG_RUNTIME = dagFinishTimeInterval;
// draw grid
addLineStr(dagStartTimeInterval, 0, dagFinishTimeInterval, 0, BORDER_COLOR, "", TICK);