You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/12 02:31:51 UTC
tez git commit: TEZ-2810. Support for showing allocation delays due
to internal preemption (bikas)
Repository: tez
Updated Branches:
refs/heads/master e1fc9cc4f -> b6f15dcdc
TEZ-2810. Support for showing allocation delays due to internal preemption (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b6f15dcd
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b6f15dcd
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b6f15dcd
Branch: refs/heads/master
Commit: b6f15dcdcf3cae841c361983f32229858ff79f42
Parents: e1fc9cc
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Sep 11 17:31:34 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Sep 11 17:31:34 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../history/parser/datamodel/VertexInfo.java | 1 +
.../analyzer/plugins/CriticalPathAnalyzer.java | 176 ++++++++++++-------
.../org/apache/tez/analyzer/utils/SVGUtils.java | 48 +++--
.../org/apache/tez/analyzer/TestAnalyzer.java | 95 ++++++++--
5 files changed, 226 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 00b8282..60010d0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2612. Support for showing allocation delays due to internal preemption
TEZ-2808. Race condition between preemption and container assignment
TEZ-2807. Log data in the finish event instead of the start event
TEZ-2799. SimpleHistoryParser NPE
http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
index 94547d4..7259667 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
@@ -403,6 +403,7 @@ public class VertexInfo extends BaseInfo {
return Collections.unmodifiableList(outputVertices);
}
+ // expensive method to call for large DAGs as it creates big lists on every call
private List<TaskAttemptInfo> getTaskAttemptsInternal() {
List<TaskAttemptInfo> taskAttemptInfos = Lists.newLinkedList();
for (TaskInfo taskInfo : getTasks()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index 350f783..3eb7701 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -33,11 +33,13 @@ import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.Ent
import org.apache.tez.analyzer.utils.SVGUtils;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.history.parser.datamodel.Container;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -112,13 +114,15 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
TaskAttemptInfo lastAttempt = null;
long lastAttemptFinishTime = 0;
for (VertexInfo vertex : dagInfo.getVertices()) {
- for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
- attempts.put(attempt.getTaskAttemptId(), attempt);
- if (attempt.getStatus().equals(succeededState) ||
- attempt.getStatus().equals(failedState)) {
- if (lastAttemptFinishTime < attempt.getFinishTime()) {
- lastAttempt = attempt;
- lastAttemptFinishTime = attempt.getFinishTime();
+ for (TaskInfo task : vertex.getTasks()) {
+ for (TaskAttemptInfo attempt : task.getTaskAttempts()) {
+ attempts.put(attempt.getTaskAttemptId(), attempt);
+ if (attempt.getStatus().equals(succeededState) ||
+ attempt.getStatus().equals(failedState)) {
+ if (lastAttemptFinishTime < attempt.getFinishTime()) {
+ lastAttempt = attempt;
+ lastAttemptFinishTime = attempt.getFinishTime();
+ }
}
}
}
@@ -149,68 +153,112 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
svg.saveCriticalPathAsSVG(dagInfo, outputFileName, criticalPath);
}
- private void analyzeCriticalPath(DagInfo dag) {
- if (!criticalPath.isEmpty()) {
- System.out.println("Walking critical path for dag " + dag.getDagId());
- long dagStartTime = dag.getStartTime();
- long dagTime = dag.getFinishTime() - dagStartTime;
- long totalAttemptCriticalTime = 0;
- for (int i = 0; i < criticalPath.size(); ++i) {
- CriticalPathStep step = criticalPath.get(i);
- totalAttemptCriticalTime += (step.stopCriticalPathTime - step.startCriticalPathTime);
- TaskAttemptInfo attempt = step.attempt;
- if (step.getType() == EntityType.ATTEMPT) {
- // analyze execution overhead
- long avgExecutionTime = attempt.getTaskInfo().getVertexInfo()
- .getAvgExecutionTimeInterval();
- if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) {
- step.notes
- .add("Potential straggler. Execution time " +
- SVGUtils.getTimeStr(attempt.getExecutionTimeInterval())
- + " compared to vertex average of " +
- SVGUtils.getTimeStr(avgExecutionTime));
+ private void analyzeAllocationOverhead(DagInfo dag) {
+ List<TaskAttemptInfo> preemptedAttempts = Lists.newArrayList();
+ for (VertexInfo v : dag.getVertices()) {
+ for (TaskInfo t : v.getTasks()) {
+ for (TaskAttemptInfo a : t.getTaskAttempts()) {
+ if (a.getTerminationCause().equals(
+ TaskAttemptTerminationCause.INTERNAL_PREEMPTION.name())) {
+ System.out.println("Found preempted attempt " + a.getTaskAttemptId());
+ preemptedAttempts.add(a);
}
-
- if (attempt.getStartTime() > step.startCriticalPathTime) {
- // the attempt is critical before launching. So allocation overhead needs analysis
- // analyzer allocation overhead
- Container container = attempt.getContainer();
- if (container != null) {
- Collection<TaskAttemptInfo> attempts = dag.getContainerMapping().get(container);
- if (attempts != null && !attempts.isEmpty()) {
- // arrange attempts by allocation time
- List<TaskAttemptInfo> attemptsList = Lists.newArrayList(attempts);
- Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime());
- // walk the list to record allocation time before the current attempt
- long containerPreviousAllocatedTime = 0;
- for (TaskAttemptInfo containerAttempt : attemptsList) {
- if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) {
- break;
- }
- System.out.println("Container: " + container.getId() + " running att: " +
- containerAttempt.getTaskAttemptId() + " wait att: " + attempt.getTaskAttemptId());
- containerPreviousAllocatedTime += containerAttempt.getAllocationToEndTimeInterval();
- }
- if (containerPreviousAllocatedTime == 0) {
- step.notes.add("Container " + container.getId() + " newly allocated.");
- } else {
- if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) {
- step.notes.add("Container " + container.getId() + " was fully allocated");
- } else {
- step.notes.add("Container " + container.getId() + " allocated for " +
- SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " +
- SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) +
- " of allocation wait time");
- }
- }
- }
+ }
+ }
+ }
+ for (int i = 0; i < criticalPath.size(); ++i) {
+ CriticalPathStep step = criticalPath.get(i);
+ TaskAttemptInfo attempt = step.attempt;
+ if (step.getType() != EntityType.ATTEMPT) {
+ continue;
+ }
+
+ long creationTime = attempt.getCreationTime();
+ long allocationTime = attempt.getAllocationTime();
+ if (allocationTime < step.startCriticalPathTime) {
+ // allocated before it became critical
+ continue;
+ }
+
+ // the attempt is critical before allocation. So allocation overhead needs analysis
+ Container container = attempt.getContainer();
+ if (container != null) {
+ Collection<TaskAttemptInfo> attempts = dag.getContainerMapping().get(container);
+ if (attempts != null && !attempts.isEmpty()) {
+ // arrange attempts by allocation time
+ List<TaskAttemptInfo> attemptsList = Lists.newArrayList(attempts);
+ Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime());
+ // walk the list to record allocation time before the current attempt
+ long containerPreviousAllocatedTime = 0;
+ for (TaskAttemptInfo containerAttempt : attemptsList) {
+ if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) {
+ break;
+ }
+ System.out.println("Container: " + container.getId() + " running att: " +
+ containerAttempt.getTaskAttemptId() + " wait att: " + attempt.getTaskAttemptId());
+ containerPreviousAllocatedTime += containerAttempt.getAllocationToEndTimeInterval();
+ }
+ if (containerPreviousAllocatedTime == 0) {
+ step.notes.add("Container " + container.getId() + " newly allocated.");
+ } else {
+ if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) {
+ step.notes.add("Container " + container.getId() + " was fully allocated");
+ } else {
+ step.notes.add("Container " + container.getId() + " allocated for " +
+ SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " +
+ SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) +
+ " of allocation wait time");
}
}
}
+ // look for internal preemptions while attempt was waiting for allocation
+ for (TaskAttemptInfo a : preemptedAttempts) {
+ if (a.getFinishTime() > creationTime && a.getFinishTime() < allocationTime){
+ // found an attempt that was preempted within this time interval
+ step.notes.add("Potentially waited for preemption of " + a.getShortName());
+ }
+ }
}
- System.out
- .println("DAG time taken: " + dagTime + " TotalAttemptTime: " + totalAttemptCriticalTime
- + " DAG finish time: " + dag.getFinishTime() + " DAG start time: " + dagStartTime);
+ }
+ }
+
+ private void analyzeStragglers(DagInfo dag) {
+ long dagStartTime = dag.getStartTime();
+ long dagTime = dag.getFinishTime() - dagStartTime;
+ long totalAttemptCriticalTime = 0;
+ for (int i = 0; i < criticalPath.size(); ++i) {
+ CriticalPathStep step = criticalPath.get(i);
+ totalAttemptCriticalTime += (step.stopCriticalPathTime - step.startCriticalPathTime);
+ TaskAttemptInfo attempt = step.attempt;
+ if (step.getType() == EntityType.ATTEMPT) {
+ // analyze execution overhead
+ if (attempt.getLastDataEvents().size() > 1) {
+ // there were read errors. that could have delayed the attempt. ignore this
+ continue;
+ }
+ long avgExecutionTime = attempt.getTaskInfo().getVertexInfo()
+ .getAvgExecutionTimeInterval();
+ if (avgExecutionTime <= 0) {
+ continue;
+ }
+ if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) {
+ step.notes
+ .add("Potential straggler. Execution time " +
+ SVGUtils.getTimeStr(attempt.getExecutionTimeInterval())
+ + " compared to vertex average of " +
+ SVGUtils.getTimeStr(avgExecutionTime));
+ }
+ }
+ }
+ System.out
+ .println("DAG time taken: " + dagTime + " TotalAttemptTime: " + totalAttemptCriticalTime
+ + " DAG finish time: " + dag.getFinishTime() + " DAG start time: " + dagStartTime);
+ }
+
+ private void analyzeCriticalPath(DagInfo dag) {
+ if (!criticalPath.isEmpty()) {
+ analyzeStragglers(dag);
+ analyzeAllocationOverhead(dag);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
index 61b1676..88a2105 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
@@ -50,10 +50,11 @@ public class SVGUtils {
private static final int STEP_GAP = 50;
private static final int TEXT_SIZE = 20;
private static final String RUNTIME_COLOR = "LightGreen";
+ private static final String ERROR_COLOR = "Tomato";
private static final String ALLOCATION_OVERHEAD_COLOR = "GoldenRod";
private static final String LAUNCH_OVERHEAD_COLOR = "DarkSalmon";
private static final String BORDER_COLOR = "Sienna";
- private static final String VERTEX_INIT_COMMIT_COLOR = "orange";
+ private static final String VERTEX_INIT_COMMIT_COLOR = "LightSalmon";
private static final String CRITICAL_COLOR = "IndianRed";
private static final float RECT_OPACITY = 1.0f;
private static final String TITLE_BR = " ";
@@ -162,8 +163,10 @@ public class SVGUtils {
int startCriticalTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime);
int stopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime);
int creationTimeInterval = (int) (attempt.getCreationTime() - dagStartTime);
- int allocationTimeInterval = (int) (attempt.getAllocationTime() - dagStartTime);
- int launchTimeInterval = (int) (attempt.getStartTime() - dagStartTime);
+ int allocationTimeInterval = attempt.getAllocationTime() > 0 ?
+ (int) (attempt.getAllocationTime() - dagStartTime) : 0;
+ int launchTimeInterval = attempt.getStartTime() > 0 ?
+ (int) (attempt.getStartTime() - dagStartTime) : 0;
int finishTimeInterval = (int) (attempt.getFinishTime() - dagStartTime);
System.out.println(attempt.getTaskAttemptId() + " " + creationTimeInterval + " "
+ allocationTimeInterval + " " + launchTimeInterval + " " + finishTimeInterval);
@@ -178,22 +181,37 @@ public class SVGUtils {
title.append("Critical start at: " + getTimeStr(startCriticalTimeInterval)).append(TITLE_BR);
title.append("Critical stop at: " + getTimeStr(stopCriticalTimeInterval)).append(TITLE_BR);
title.append("Created at: " + getTimeStr(creationTimeInterval)).append(TITLE_BR);
- title.append("Allocated at: " + getTimeStr(allocationTimeInterval)).append(TITLE_BR);
- title.append("Launched at: " + getTimeStr(launchTimeInterval)).append(TITLE_BR);
+ if (allocationTimeInterval > 0) {
+ title.append("Allocated at: " + getTimeStr(allocationTimeInterval)).append(TITLE_BR);
+ }
+ if (launchTimeInterval > 0) {
+ title.append("Launched at: " + getTimeStr(launchTimeInterval)).append(TITLE_BR);
+ }
title.append("Finished at: " + getTimeStr(finishTimeInterval)).append(TITLE_BR);
title.append(Joiner.on(TITLE_BR).join(step.getNotes()));
String titleStr = title.toString();
- addRectStr(creationTimeInterval, allocationTimeInterval - creationTimeInterval,
- yOffset * STEP_GAP, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY,
- titleStr);
-
- addRectStr(allocationTimeInterval, launchTimeInterval - allocationTimeInterval,
- yOffset * STEP_GAP, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY,
- titleStr);
-
- addRectStr(launchTimeInterval, finishTimeInterval - launchTimeInterval, yOffset * STEP_GAP,
- STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
+ // handle cases when attempt fails before allocation or launch
+ if (allocationTimeInterval > 0) {
+ addRectStr(creationTimeInterval, allocationTimeInterval - creationTimeInterval,
+ yOffset * STEP_GAP, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY,
+ titleStr);
+ if (launchTimeInterval > 0) {
+ addRectStr(allocationTimeInterval, launchTimeInterval - allocationTimeInterval,
+ yOffset * STEP_GAP, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY,
+ titleStr);
+ addRectStr(launchTimeInterval, finishTimeInterval - launchTimeInterval, yOffset * STEP_GAP,
+ STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
+ } else {
+ // no launch - so allocate to finish drawn
+ addRectStr(allocationTimeInterval, finishTimeInterval - allocationTimeInterval, yOffset * STEP_GAP,
+ STEP_GAP, ERROR_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
+ }
+ } else {
+ // no allocation - so create to finish drawn
+ addRectStr(creationTimeInterval, finishTimeInterval - creationTimeInterval, yOffset * STEP_GAP,
+ STEP_GAP, ERROR_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
+ }
addTextStr((finishTimeInterval + creationTimeInterval) / 2,
(yOffset * STEP_GAP + STEP_GAP / 2), attempt.getShortName(), "middle", TEXT_SIZE,
http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
index f3a69a6..f54a15a 100644
--- a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
@@ -42,6 +42,7 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.history.ATSImportTool;
import org.apache.tez.history.parser.ATSFileParser;
@@ -136,7 +137,7 @@ public class TestAnalyzer {
conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR);
miniTezCluster =
- new MiniTezClusterWithTimeline(TestAnalyzer.class.getName(), 4, 1, 1, true);
+ new MiniTezClusterWithTimeline(TestAnalyzer.class.getName(), 1, 1, 1, true);
miniTezCluster.init(conf);
miniTezCluster.start();
@@ -161,15 +162,26 @@ public class TestAnalyzer {
}
StepCheck createStep(String attempt, CriticalPathDependency reason) {
- return new StepCheck(attempt, reason);
+ return createStep(attempt, reason, null, null);
+ }
+
+ StepCheck createStep(String attempt, CriticalPathDependency reason,
+ TaskAttemptTerminationCause errCause, List<String> notes) {
+ return new StepCheck(attempt, reason, errCause, notes);
}
class StepCheck {
String attempt; // attempt is the TaskAttemptInfo short name with regex
CriticalPathDependency reason;
- StepCheck(String attempt, CriticalPathDependency reason) {
+ TaskAttemptTerminationCause errCause;
+ List<String> notesStr;
+
+ StepCheck(String attempt, CriticalPathDependency reason,
+ TaskAttemptTerminationCause cause, List<String> notes) {
this.attempt = attempt;
this.reason = reason;
+ this.errCause = cause;
+ this.notesStr = notes;
}
String getAttemptDetail() {
return attempt;
@@ -177,6 +189,12 @@ public class TestAnalyzer {
CriticalPathDependency getReason() {
return reason;
}
+ TaskAttemptTerminationCause getErrCause() {
+ return errCause;
+ }
+ List<String> getNotesStr() {
+ return notesStr;
+ }
}
DagInfo runDAGAndVerify(DAG dag, DAGStatus.State finalState, List<StepCheck[]> steps) throws Exception {
@@ -232,7 +250,8 @@ public class TestAnalyzer {
for (CriticalPathStep step : criticalPath) {
LOG.info("ABC Step: " + step.getType());
if (step.getType() == EntityType.ATTEMPT) {
- LOG.info("ABC Attempt: " + step.getAttempt().getShortName() + " " + step.getAttempt().getDetailedStatus());
+ LOG.info("ABC Attempt: " + step.getAttempt().getShortName()
+ + " " + step.getAttempt().getDetailedStatus());
}
LOG.info("ABC Reason: " + step.getReason());
String notes = Joiner.on(";").join(step.getNotes());
@@ -248,12 +267,22 @@ public class TestAnalyzer {
criticalPath.get(0).getAttempt().getShortName());
for (int i=1; i<criticalPath.size() - 1; ++i) {
+ StepCheck check = steps[i-1];
CriticalPathStep step = criticalPath.get(i);
Assert.assertEquals(CriticalPathStep.EntityType.ATTEMPT, step.getType());
- Assert.assertTrue(steps[i-1].getAttemptDetail(),
- step.getAttempt().getShortName().matches(steps[i-1].getAttemptDetail()));
- //Assert.assertEquals(steps[i-1].getAttemptDetail(), step.getAttempt().getShortName());
+ Assert.assertTrue(check.getAttemptDetail(),
+ step.getAttempt().getShortName().matches(check.getAttemptDetail()));
Assert.assertEquals(steps[i-1].getReason(), step.getReason());
+ if (check.getErrCause() != null) {
+ Assert.assertEquals(check.getErrCause(),
+ TaskAttemptTerminationCause.valueOf(step.getAttempt().getTerminationCause()));
+ }
+ if (check.getNotesStr() != null) {
+ String notes = Joiner.on("#").join(step.getNotes());
+ for (String note : check.getNotesStr()) {
+ Assert.assertTrue(note, notes.contains(notes));
+ }
+ }
}
Assert.assertEquals(CriticalPathStep.EntityType.DAG_COMMIT,
@@ -435,16 +464,17 @@ public class TestAnalyzer {
* @param failAndExit whether input failure should trigger attempt exit
*/
private void setCascadingInputFailureConfig(Configuration testConf,
- boolean failAndExit) {
+ boolean failAndExit,
+ int numTasks) {
// v2 attempt0 succeeds.
- // v2 task0 attempt1 input0 fails up to version 0.
- testConf.setInt(SimpleTestDAG3Vertices.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+ // v2 all tasks attempt1 input0 fail up to version 0.
+ testConf.setInt(SimpleTestDAG3Vertices.TEZ_SIMPLE_DAG_NUM_TASKS, numTasks);
testConf.setBoolean(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
testConf.setBoolean(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), failAndExit);
testConf.set(TestInput.getVertexConfName(
- TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "-1");
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "1");
testConf.set(TestInput.getVertexConfName(
@@ -453,17 +483,17 @@ public class TestAnalyzer {
TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"),
0);
- //v3 all-tasks attempt0 input0 fails up to version 0.
+ //v3 task0 attempt0 all inputs fails up to version 0.
testConf.setBoolean(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
testConf.setBoolean(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), failAndExit);
testConf.set(TestInput.getVertexConfName(
- TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1");
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "0");
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
testConf.set(TestInput.getVertexConfName(
- TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "0");
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1");
testConf.setInt(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"),
0);
@@ -483,7 +513,7 @@ public class TestAnalyzer {
@Test (timeout=60000)
public void testCascadingInputFailureWithoutExitSuccess() throws Exception {
Configuration testConf = new Configuration(false);
- setCascadingInputFailureConfig(testConf, false);
+ setCascadingInputFailureConfig(testConf, false, 1);
StepCheck[] check = {
createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
@@ -514,7 +544,7 @@ public class TestAnalyzer {
@Test (timeout=60000)
public void testCascadingInputFailureWithExitSuccess() throws Exception {
Configuration testConf = new Configuration(false);
- setCascadingInputFailureConfig(testConf, true);
+ setCascadingInputFailureConfig(testConf, true, 1);
StepCheck[] check = {
createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
@@ -532,6 +562,39 @@ public class TestAnalyzer {
}
/**
+ * 1 NM is running and can run 4 containers based on YARN mini cluster defaults and
+ * Tez defaults for AM/task memory
+ * v3 task0 reports read errors against both tasks of v2. This re-starts both of them.
+ * Now all 4 slots are occupied 1 AM + 3 tasks
+ * Now retries of v2 report read error against 1 task of v1. That re-starts.
+ * Retry of v1 task has no space - so it preempts the least priority task (current tez logic)
+ * v3 is preempted and re-run. Shows up on critical path as preempted failure.
+ * Also v1 retry attempts note show that it caused preemption of v3
+ * @throws Exception
+ */
+ @Test (timeout=60000)
+ public void testInternalPreemption() throws Exception {
+ Configuration testConf = new Configuration(false);
+ setCascadingInputFailureConfig(testConf, false, 2);
+
+ StepCheck[] check = {
+ createStep("v1 : 00000[01]_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v2 : 00000[01]_0", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY,
+ TaskAttemptTerminationCause.INTERNAL_PREEMPTION, null),
+ createStep("v2 : 00000[01]_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY,
+ null, Collections.singletonList("preemption of v3")),
+ createStep("v2 : 00000[01]_1", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v3 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY)
+ };
+
+ DAG dag = SimpleTestDAG3Vertices.createDAG(
+ "testInternalPreemption", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+ }
+
+ /**
* Input failure of v3 causes rerun of both both v1 and v2 vertices.
* v1 v2
* \ /