You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/08/31 20:11:09 UTC
tez git commit: TEZ-2739. Improve handling of read errors in critical
path analyzer (bikas)
Repository: tez
Updated Branches:
refs/heads/master e5a79fd44 -> 5ba6cf9d3
TEZ-2739. Improve handling of read errors in critical path analyzer (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5ba6cf9d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5ba6cf9d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5ba6cf9d
Branch: refs/heads/master
Commit: 5ba6cf9d3cc2812497d939322003ec05227bd672
Parents: e5a79fd
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Aug 31 11:10:57 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Aug 31 11:10:57 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/history/ATSImportTool.java | 2 +-
.../tez/history/parser/datamodel/TaskInfo.java | 9 +-
tez-tools/analyzers/job-analyzer/pom.xml | 109 ++-
.../analyzer/plugins/CriticalPathAnalyzer.java | 138 +++-
.../org/apache/tez/analyzer/utils/SVGUtils.java | 24 +-
.../org/apache/tez/analyzer/TestAnalyzer.java | 662 +++++++++++++++++++
7 files changed, 901 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e145916..029d776 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ ALL CHANGES:
same name
TEZ-2747. Update master to reflect 0.8.0-alpha release.
TEZ-2662. Provide a way to check whether AM or task opts are valid and error if not.
+ TEZ-2739. Improve handling of read errors in critical path analyzer
Release 0.8.0-alpha: 2015-08-29
http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
index 0e53d27..737df76 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
@@ -433,7 +433,7 @@ public class ATSImportTool extends Configured implements Tool {
}
@VisibleForTesting
- static int process(String[] args) {
+ public static int process(String[] args) {
Options options = buildOptions();
int result = -1;
try {
http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
index 7a89166..c6f89d6 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
@@ -20,6 +20,7 @@ package org.apache.tez.history.parser.datamodel;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Lists;
@@ -205,14 +206,14 @@ public class TaskInfo extends BaseInfo {
* @return TaskAttemptInfo
*/
public final TaskAttemptInfo getSuccessfulTaskAttempt() {
- if (isNotNullOrEmpty(getSuccessfulAttemptId())) {
+ if (!Strings.isNullOrEmpty(getSuccessfulAttemptId())) {
for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
if (attemptInfo.getTaskAttemptId().equals(getSuccessfulAttemptId())) {
return attemptInfo;
}
}
}
- // fall back to checking status if successful attemt id is not available
+ // fall back to checking status if successful attempt id is not available
for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
if (attemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.toString())) {
return attemptInfo;
@@ -350,8 +351,4 @@ public class TaskInfo extends BaseInfo {
sb.append("]");
return sb.toString();
}
-
- private static boolean isNotNullOrEmpty(String str) {
- return str != null && !str.isEmpty();
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/tez-tools/analyzers/job-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/pom.xml b/tez-tools/analyzers/job-analyzer/pom.xml
index 13cf48d..37a06bf 100644
--- a/tez-tools/analyzers/job-analyzer/pom.xml
+++ b/tez-tools/analyzers/job-analyzer/pom.xml
@@ -26,10 +26,6 @@
<dependencies>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </dependency>
- <dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
@@ -39,9 +35,114 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-tests</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-tests</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-yarn-timeline-history</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-yarn-timeline-history</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index 448e785..c8d4225 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -33,6 +33,7 @@ import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.Ent
import org.apache.tez.analyzer.utils.SVGUtils;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.history.parser.datamodel.Container;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
@@ -49,11 +50,15 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
String succeededState = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name());
String failedState = StringInterner.weakIntern(TaskAttemptState.FAILED.name());
- private final static String DATA_DEPENDENCY = "Data-Dependency";
- private final static String INIT_DEPENDENCY = "Init-Dependency";
- private final static String COMMIT_DEPENDENCY = "Commit-Dependency";
- private final static String NON_DATA_DEPENDENCY = "Non-Data-Dependency";
- private final static String OUTPUT_LOST = "Previous version outputs lost";
+ public enum CriticalPathDependency {
+ DATA_DEPENDENCY,
+ INIT_DEPENDENCY,
+ COMMIT_DEPENDENCY,
+ RETRY_DEPENDENCY,
+ OUTPUT_RECREATE_DEPENDENCY
+ }
+
+ public static final String DRAW_SVG = "tez.critical-path-analyzer.draw-svg";
public static class CriticalPathStep {
public enum EntityType {
@@ -64,7 +69,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
EntityType type;
TaskAttemptInfo attempt;
- String reason; // reason linking this to the previous step on the critical path
+ CriticalPathDependency reason; // reason linking this to the previous step on the critical path
long startCriticalPathTime; // time at which attempt is on critical path
long stopCriticalPathTime; // time at which attempt is off critical path
List<String> notes = Lists.newLinkedList();
@@ -85,7 +90,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
public long getStopCriticalTime() {
return stopCriticalPathTime;
}
- public String getReason() {
+ public CriticalPathDependency getReason() {
return reason;
}
public List<String> getNotes() {
@@ -128,7 +133,13 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
analyzeCriticalPath(dagInfo);
- saveCriticalPathAsSVG(dagInfo);
+ if (getConf().getBoolean(DRAW_SVG, true)) {
+ saveCriticalPathAsSVG(dagInfo);
+ }
+ }
+
+ public List<CriticalPathStep> getCriticalPath() {
+ return criticalPath;
}
private void saveCriticalPathAsSVG(DagInfo dagInfo) {
@@ -212,7 +223,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
// add the commit step
currentStep.stopCriticalPathTime = dagInfo.getFinishTime();
currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime;
- currentStep.reason = COMMIT_DEPENDENCY;
+ currentStep.reason = CriticalPathDependency.COMMIT_DEPENDENCY;
tempCP.add(currentStep);
while (true) {
@@ -233,7 +244,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
long startCriticalPathTime = 0;
String nextAttemptId = null;
- String reason = null;
+ CriticalPathDependency reason = null;
if (dataDependency) {
// last data event was produced after the attempt was scheduled. use
// data dependency
@@ -242,7 +253,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) {
// there is a valid data causal TA. Use it.
nextAttemptId = currentAttempt.getLastDataEventSourceTA();
- reason = DATA_DEPENDENCY;
+ reason = CriticalPathDependency.DATA_DEPENDENCY;
startCriticalPathTime = currentAttempt.getLastDataEventTime();
System.out.println("Using data dependency " + nextAttemptId);
} else {
@@ -252,7 +263,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
"Vertex: " + vertex.getVertexId() + " has no external inputs but the last data event "
+ "TA is null for " + currentAttempt.getTaskAttemptId());
nextAttemptId = null;
- reason = INIT_DEPENDENCY;
+ reason = CriticalPathDependency.INIT_DEPENDENCY;
System.out.println("Using init dependency");
}
} else {
@@ -262,9 +273,9 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
if (!Strings.isNullOrEmpty(currentAttempt.getCreationCausalTA())) {
// there is a scheduling causal TA. Use it.
nextAttemptId = currentAttempt.getCreationCausalTA();
- reason = NON_DATA_DEPENDENCY;
+ reason = CriticalPathDependency.RETRY_DEPENDENCY;
TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId);
- if (nextAttempt != null) {
+ if (nextAttemptId != null) {
VertexInfo currentVertex = currentAttempt.getTaskInfo().getVertexInfo();
VertexInfo nextVertex = nextAttempt.getTaskInfo().getVertexInfo();
if (!nextVertex.getVertexName().equals(currentVertex.getVertexName())){
@@ -272,7 +283,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
for (VertexInfo outVertex : currentVertex.getOutputVertices()) {
if (nextVertex.getVertexName().equals(outVertex.getVertexName())) {
// next vertex is an output vertex
- reason = OUTPUT_LOST;
+ reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY;
break;
}
}
@@ -286,7 +297,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
// there is a data event going to the vertex. Count the time between data event and
// scheduling time as Initializer/Manager overhead and follow data dependency
nextAttemptId = currentAttempt.getLastDataEventSourceTA();
- reason = DATA_DEPENDENCY;
+ reason = CriticalPathDependency.DATA_DEPENDENCY;
startCriticalPathTime = currentAttempt.getLastDataEventTime();
long overhead = currentAttempt.getCreationTime()
- currentAttempt.getLastDataEventTime();
@@ -299,17 +310,102 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
// or the vertex has external input but does not use events
// or the vertex has no external inputs or edges
nextAttemptId = null;
- reason = INIT_DEPENDENCY;
+ reason = CriticalPathDependency.INIT_DEPENDENCY;
System.out.println("Using init dependency");
}
}
}
-
+
+
+ if (!Strings.isNullOrEmpty(nextAttemptId)) {
+ TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId);
+ TaskAttemptInfo attemptToCheck = nextAttempt;
+
+ // check if the next attempt is already on critical path to prevent infinite loop
+ boolean foundLoop = false;
+ CriticalPathDependency prevReason = null;
+ for (CriticalPathStep previousStep : tempCP) {
+ if (previousStep.attempt.equals(attemptToCheck)) {
+ foundLoop = true;
+ prevReason = previousStep.reason;
+ }
+ }
+
+ if (foundLoop) {
+ // found a loop - find the next step based on heuristics
+ /* only the losing outputs causes us to backtrack. There are 2 cases
+ * 1) Step N reported last data event to this step
+ * -> Step N+1 (current step) is the retry for read error reported
+ * -> read error was reported by the Step N attempt and it did not exit after the
+ * error
+ * -> So scheduling dependency of Step N points back to step N+1
+ * 2) Step N reported last data event to this step
+ * -> Step N+1 is a retry for a read error reported
+ * -> Step N+2 is the attempt that reported the read error
+ * -> Step N+3 is the last data event of N+2 and points back to N+1
+ */
+ System.out.println("Reset " + currentAttempt.getTaskAttemptId()
+ + " cause: " + currentAttempt.getTerminationCause()
+ + " time: " + currentAttempt.getFinishTime()
+ + " reason: " + reason
+ + " because of: " + attemptToCheck.getTaskAttemptId());
+ TaskAttemptInfo attemptWithLostAncestor = currentAttempt;
+ if (reason != CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) {
+ // Case 2 above. If reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY
+ // then its Case 1 above
+ Preconditions.checkState(prevReason.equals(
+ CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), prevReason);
+ reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY;
+ attemptWithLostAncestor = nextAttempt;
+ }
+ System.out.println("Reset " + currentAttempt.getTaskAttemptId()
+ + " cause: " + currentAttempt.getTerminationCause()
+ + " time: " + currentAttempt.getFinishTime()
+ + " reason: " + reason
+ + " because of: " + attemptToCheck.getTaskAttemptId()
+ + " looking at: " + attemptWithLostAncestor.getTaskAttemptId());
+ Preconditions.checkState(reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY);
+ // we dont track all input events to the consumer. So just jump to
+ // the previous successful version of the current attempt
+ TaskAttemptInfo prevSuccAttempt = null;
+ for (TaskAttemptInfo prevAttempt : attemptWithLostAncestor.getTaskInfo().getTaskAttempts()) {
+ System.out.println("Looking at " + prevAttempt.getTaskAttemptId()
+ + " cause: " + prevAttempt.getTerminationCause() +
+ " time: " + prevAttempt.getFinishTime());
+ if (prevAttempt.getTerminationCause()
+ .equals(TaskAttemptTerminationCause.OUTPUT_LOST.name())) {
+ if (prevAttempt.getFinishTime() < currentAttempt.getFinishTime()) {
+ // attempt finished before current attempt
+ if (prevSuccAttempt == null
+ || prevAttempt.getFinishTime() > prevSuccAttempt.getFinishTime()) {
+ // keep the latest attempt that had lost outputs
+ prevSuccAttempt = prevAttempt;
+ }
+ }
+ }
+ }
+ Preconditions.checkState(prevSuccAttempt != null,
+ attemptWithLostAncestor.getTaskAttemptId());
+ System.out
+ .println("Resetting nextAttempt to : " + prevSuccAttempt.getTaskAttemptId()
+ + " from " + nextAttempt.getTaskAttemptId());
+ nextAttemptId = prevSuccAttempt.getTaskAttemptId();
+ if (attemptWithLostAncestor == currentAttempt) {
+ startCriticalPathTime = currentAttempt.getCreationTime();
+ } else {
+ startCriticalPathTime = prevSuccAttempt.getFinishTime();
+ }
+ }
+
+ }
+
currentStep.startCriticalPathTime = startCriticalPathTime;
currentStep.reason = reason;
+
+ Preconditions.checkState(currentStep.stopCriticalPathTime >= currentStep.startCriticalPathTime);
if (Strings.isNullOrEmpty(nextAttemptId)) {
- Preconditions.checkState(reason.equals(INIT_DEPENDENCY));
+ Preconditions.checkState(reason.equals(CriticalPathDependency.INIT_DEPENDENCY));
Preconditions.checkState(startCriticalPathTime == 0);
// no predecessor attempt found. this is the last step in the critical path
// assume attempts start critical path time is when its scheduled. before that is
@@ -321,7 +417,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
currentStep = new CriticalPathStep(currentAttempt, EntityType.VERTEX_INIT);
currentStep.stopCriticalPathTime = initStepStopCriticalTime;
currentStep.startCriticalPathTime = dagInfo.getStartTime();
- currentStep.reason = INIT_DEPENDENCY;
+ currentStep.reason = CriticalPathDependency.INIT_DEPENDENCY;
tempCP.add(currentStep);
if (!tempCP.isEmpty()) {
@@ -348,7 +444,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
String entity = (step.getType() == EntityType.ATTEMPT ? step.getAttempt().getTaskAttemptId()
: (step.getType() == EntityType.VERTEX_INIT
? step.attempt.getTaskInfo().getVertexInfo().getVertexName() : "DAG COMMIT"));
- String [] record = {entity, step.getReason(),
+ String [] record = {entity, step.getReason().name(),
step.getAttempt().getDetailedStatus(), String.valueOf(step.getStartCriticalTime()),
String.valueOf(step.getStopCriticalTime()),
Joiner.on(";").join(step.getNotes())};
http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
index 50fe033..44408d4 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
@@ -252,19 +252,19 @@ public class SVGUtils {
// draw legend
int legendX = 0;
int legendY = (criticalPath.size() + 2) * STEP_GAP;
- int legendWidth = 10000;
+ int legendWidth = dagFinishTimeInterval/5;
- addRectStr(legendX, legendWidth, legendY, STEP_GAP, VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, "");
- addTextStr(legendX, legendY + STEP_GAP/2, "Vertex Init/Commit Overhead", "left", TEXT_SIZE, "");
- legendY += STEP_GAP;
- addRectStr(legendX, legendWidth, legendY, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, "");
- addTextStr(legendX, legendY + STEP_GAP/2, "Task Allocation Overhead", "left", TEXT_SIZE, "");
- legendY += STEP_GAP;
- addRectStr(legendX, legendWidth, legendY, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, "");
- addTextStr(legendX, legendY + STEP_GAP/2, "Task Launch Overhead", "left", TEXT_SIZE, "");
- legendY += STEP_GAP;
- addRectStr(legendX, legendWidth, legendY, STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, "");
- addTextStr(legendX, legendY + STEP_GAP/2, "Task Execution Time", "left", TEXT_SIZE, "");
+ addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+ addTextStr(legendX, legendY + STEP_GAP/3, "Vertex Init/Commit Overhead", "left", TEXT_SIZE, "");
+ legendY += STEP_GAP/2;
+ addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+ addTextStr(legendX, legendY + STEP_GAP/3, "Task Allocation Overhead", "left", TEXT_SIZE, "");
+ legendY += STEP_GAP/2;
+ addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+ addTextStr(legendX, legendY + STEP_GAP/3, "Task Launch Overhead", "left", TEXT_SIZE, "");
+ legendY += STEP_GAP/2;
+ addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+ addTextStr(legendX, legendY + STEP_GAP/3, "Task Execution Time", "left", TEXT_SIZE, "");
Y_MAX += Y_BASE*2;
X_MAX += X_BASE*2;
http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
new file mode 100644
index 0000000..9a75461
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
@@ -0,0 +1,662 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathDependency;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.history.ATSImportTool;
+import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.test.SimpleTestDAG;
+import org.apache.tez.test.SimpleTestDAG3Vertices;
+import org.apache.tez.test.TestInput;
+import org.apache.tez.test.TestProcessor;
+import org.apache.tez.test.dag.SimpleReverseVTestDAG;
+import org.apache.tez.test.dag.SimpleVTestDAG;
+import org.apache.tez.tests.MiniTezClusterWithTimeline;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+public class TestAnalyzer {
+ private static final Logger LOG = LoggerFactory.getLogger(TestAnalyzer.class);
+
+ private static String TEST_ROOT_DIR =
+ "target" + Path.SEPARATOR + TestAnalyzer.class.getName() + "-tmpDir";
+ private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download";
+ private final static String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/";
+
+ private static MiniDFSCluster dfsCluster;
+ private static MiniTezClusterWithTimeline miniTezCluster;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem fs;
+
+ private static TezClient tezSession = null;
+
+ private static int numDAGs = 0;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false);
+ EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+ fs = dfsCluster.getFileSystem();
+ conf.set("fs.defaultFS", fs.getUri().toString());
+
+ setupTezCluster();
+ numDAGs = 0;
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ LOG.info("Stopping mini clusters");
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ if (miniTezCluster != null) {
+ miniTezCluster.stop();
+ miniTezCluster = null;
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ public CriticalPathAnalyzer setupCPAnalyzer() {
+ Configuration analyzerConf = new Configuration(false);
+ analyzerConf.setBoolean(CriticalPathAnalyzer.DRAW_SVG, false);
+ CriticalPathAnalyzer cp = new CriticalPathAnalyzer();
+ cp.setConf(analyzerConf);
+ return cp;
+ }
+
+ public static void setupTezCluster() throws Exception {
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000);
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000);
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2);
+
+ //Enable per edge counters
+ conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true);
+ conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService
+ .class.getName());
+
+ conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR);
+
+ miniTezCluster =
+ new MiniTezClusterWithTimeline(TestAnalyzer.class.getName(), 4, 1, 1, true);
+
+ miniTezCluster.init(conf);
+ miniTezCluster.start();
+
+ TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+ tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188");
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+ tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ ATSHistoryLoggingService.class.getName());
+
+ Path remoteStagingDir = dfsCluster.getFileSystem().makeQualified(new Path(TEST_ROOT_DIR, String
+ .valueOf(new Random().nextInt(100000))));
+
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+ remoteStagingDir.toString());
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+
+ tezSession = TezClient.create("TestFaultTolerance", tezConf, true);
+ tezSession.start();
+
+ }
+
+ StepCheck createStep(String attempt, CriticalPathDependency reason) {
+ return new StepCheck(attempt, reason);
+ }
+
+ class StepCheck {
+ String attempt; // attempt is the TaskAttemptInfo short name with regex
+ CriticalPathDependency reason;
+ StepCheck(String attempt, CriticalPathDependency reason) {
+ this.attempt = attempt;
+ this.reason = reason;
+ }
+ String getAttemptDetail() {
+ return attempt;
+ }
+ CriticalPathDependency getReason() {
+ return reason;
+ }
+ }
+
+ DagInfo runDAGAndVerify(DAG dag, DAGStatus.State finalState, List<StepCheck[]> steps) throws Exception {
+ tezSession.waitTillReady();
+ numDAGs++;
+ LOG.info("XXX Running DAG name: " + dag.getName());
+ DAGClient dagClient = tezSession.submitDAG(dag);
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for dag to complete. Sleeping for 500ms."
+ + " DAG name: " + dag.getName()
+ + " DAG appContext: " + dagClient.getExecutionContext()
+ + " Current state: " + dagStatus.getState());
+ Thread.sleep(100);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+
+ Assert.assertEquals(finalState, dagStatus.getState());
+
+ String dagId = TezDAGID.getInstance(tezSession.getAppMasterApplicationId(), numDAGs).toString();
+ DagInfo dagInfo = getDagInfo(dagId);
+
+ verifyCriticalPath(dagInfo, steps);
+ return dagInfo;
+ }
+
+ DagInfo getDagInfo(String dagId) throws Exception {
+ // sleep for a bit to let ATS events be sent from AM
+ Thread.sleep(1000);
+ //Export the data from ATS
+ String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
+
+ int result = ATSImportTool.process(args);
+ assertTrue(result == 0);
+
+ //Parse ATS data and verify results
+ //Parse downloaded contents
+ File downloadedFile = new File(DOWNLOAD_DIR
+ + Path.SEPARATOR + dagId
+ + Path.SEPARATOR + dagId + ".zip");
+ ATSFileParser parser = new ATSFileParser(downloadedFile);
+ DagInfo dagInfo = parser.getDAGData(dagId);
+ assertTrue(dagInfo.getDagId().equals(dagId));
+ return dagInfo;
+ }
+
+ void verifyCriticalPath(DagInfo dagInfo, List<StepCheck[]> stepsOptions) throws Exception {
+ CriticalPathAnalyzer cp = setupCPAnalyzer();
+ cp.analyze(dagInfo);
+
+ List<CriticalPathStep> criticalPath = cp.getCriticalPath();
+
+ for (CriticalPathStep step : criticalPath) {
+ LOG.info("XXX Step: " + step.getType());
+ if (step.getType() == EntityType.ATTEMPT) {
+ LOG.info("XXX Attempt: " + step.getAttempt().getShortName() + " " + step.getAttempt().getDetailedStatus());
+ }
+ LOG.info("XXX Reason: " + step.getReason());
+ String notes = Joiner.on(";").join(step.getNotes());
+ LOG.info("XXX Notes: " + notes);
+ }
+
+ boolean foundMatchingLength = false;
+ for (StepCheck[] steps : stepsOptions) {
+ if (steps.length + 2 == criticalPath.size()) {
+ foundMatchingLength = true;
+ Assert.assertEquals(CriticalPathStep.EntityType.VERTEX_INIT, criticalPath.get(0).getType());
+ Assert.assertEquals(criticalPath.get(1).getAttempt().getShortName(),
+ criticalPath.get(0).getAttempt().getShortName());
+
+ for (int i=1; i<criticalPath.size() - 1; ++i) {
+ CriticalPathStep step = criticalPath.get(i);
+ Assert.assertEquals(CriticalPathStep.EntityType.ATTEMPT, step.getType());
+ Assert.assertTrue(steps[i-1].getAttemptDetail(),
+ step.getAttempt().getShortName().matches(steps[i-1].getAttemptDetail()));
+ //Assert.assertEquals(steps[i-1].getAttemptDetail(), step.getAttempt().getShortName());
+ Assert.assertEquals(steps[i-1].getReason(), step.getReason());
+ }
+
+ Assert.assertEquals(CriticalPathStep.EntityType.DAG_COMMIT,
+ criticalPath.get(criticalPath.size() - 1).getType());
+ break;
+ }
+ }
+
+ Assert.assertTrue(foundMatchingLength);
+
+ }
+
+ @Test (timeout=60000)
+ public void testBasicSuccessScatterGather() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+ StepCheck[] check = {
+ createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY)
+ };
+ DAG dag = SimpleTestDAG.createDAG("testBasicSuccessScatterGather", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+ }
+
+ @Test (timeout=60000)
+ public void testBasicTaskFailure() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+ testConf.setBoolean(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+ testConf.set(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
+ testConf.setInt(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
+
+ StepCheck[] check = {
+ createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v1 : 000000_1", CriticalPathDependency.RETRY_DEPENDENCY),
+ createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ };
+ DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+ }
+
+ @Test (timeout=60000)
+ public void testTaskMultipleFailures() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+ testConf.setBoolean(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+ testConf.set(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
+ testConf.setInt(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 1);
+
+ StepCheck[] check = {
+ createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v1 : 000000_1", CriticalPathDependency.RETRY_DEPENDENCY),
+ createStep("v1 : 000000_2", CriticalPathDependency.RETRY_DEPENDENCY),
+ createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ };
+
+ DAG dag = SimpleTestDAG.createDAG("testTaskMultipleFailures", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+ }
+
+ @Test (timeout=60000)
+ public void testBasicInputFailureWithExit() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), true);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+
+ StepCheck[] check = {
+ createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY),
+ };
+
+ DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithExit", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+ }
+
+ @Test (timeout=60000)
+ public void testBasicInputFailureWithoutExit() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+
+ StepCheck[] check = {
+ createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ };
+
+ DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithoutExit", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+ }
+
+ @Test (timeout=60000)
+ public void testMultiVersionInputFailureWithExit() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), true);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0,1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+ testConf.setInt(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1);
+
+ StepCheck[] check = {
+ createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v1 : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v2 : 000000_2", CriticalPathDependency.DATA_DEPENDENCY),
+ };
+
+ DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithExit", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+ }
+
+ @Test (timeout=60000)
+ public void testMultiVersionInputFailureWithoutExit() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+ testConf.setInt(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1);
+
+ StepCheck[] check = {
+ createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v1 : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ };
+
+ DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithoutExit", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+ }
+
+ /**
+ * Sets configuration for cascading input failure tests that
+ * use SimpleTestDAG3Vertices.
+ * @param testConf configuration
+ * @param failAndExit whether input failure should trigger attempt exit
+ */
+ private void setCascadingInputFailureConfig(Configuration testConf,
+ boolean failAndExit) {
+ // v2 attempt0 succeeds.
+ // v2 task0 attempt1 input0 fails up to version 0.
+ testConf.setInt(SimpleTestDAG3Vertices.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), failAndExit);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+ testConf.setInt(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"),
+ 0);
+
+ //v3 all-tasks attempt0 input0 fails up to version 0.
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), failAndExit);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "0");
+ testConf.setInt(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"),
+ 0);
+ }
+
+ /**
+ * Test cascading input failure without exit. Expecting success.
+ * v1 -- v2 -- v3
+ * v3 all-tasks attempt0 input0 fails. Wait. Triggering v2 rerun.
+ * v2 task0 attempt1 input0 fails. Wait. Triggering v1 rerun.
+ * v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt1 succeeds.
+ * v3 attempt0 accepts v2 attempt1 output.
+ *
+ * AM vertex succeeded order is v1, v2, v1, v2, v3.
+ * @throws Exception
+ */
+ @Test (timeout=60000)
+ public void testCascadingInputFailureWithoutExitSuccess() throws Exception {
+ Configuration testConf = new Configuration(false);
+ setCascadingInputFailureConfig(testConf, false);
+
+ StepCheck[] check = {
+ createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ };
+
+ DAG dag = SimpleTestDAG3Vertices.createDAG(
+ "testCascadingInputFailureWithoutExitSuccess", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+ }
+
+ /**
+ * Test cascading input failure with exit. Expecting success.
+ * v1 -- v2 -- v3
+ * v3 all-tasks attempt0 input0 fails. v3 attempt0 exits. Triggering v2 rerun.
+ * v2 task0 attempt1 input0 fails. v2 attempt1 exits. Triggering v1 rerun.
+ * v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt2 succeeds.
+ * v3 attempt1 accepts v2 attempt2 output.
+ *
+ * AM vertex succeeded order is v1, v2, v3, v1, v2, v3.
+ * @throws Exception
+ */
+ @Test (timeout=60000)
+ public void testCascadingInputFailureWithExitSuccess() throws Exception {
+ Configuration testConf = new Configuration(false);
+ setCascadingInputFailureConfig(testConf, true);
+
+ StepCheck[] check = {
+ createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v2 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v2 : 000000_2", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v3 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY),
+ };
+
+ DAG dag = SimpleTestDAG3Vertices.createDAG(
+ "testCascadingInputFailureWithExitSuccess", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+ }
+
+ /**
+ * Input failure of v3 causes rerun of both both v1 and v2 vertices.
+ * v1 v2
+ * \ /
+ * v3
+ *
+ * @throws Exception
+ */
+ @Test (timeout=60000)
+ public void testInputFailureCausesRerunOfTwoVerticesWithoutExit() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setInt(SimpleVTestDAG.TEZ_SIMPLE_V_DAG_NUM_TASKS, 1);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), false);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "1");
+
+ StepCheck[] check = {
+ // use regex for either vertices being possible on the path
+ createStep("v[12] : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v[12] : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v[12] : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ };
+
+ DAG dag = SimpleVTestDAG.createDAG(
+ "testInputFailureCausesRerunOfTwoVerticesWithoutExit", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+ }
+
+ /**
+ * Downstream(v3) attempt failure of a vertex connected with
+ * 2 upstream vertices..
+ * v1 v2
+ * \ /
+ * v3
+ *
+ * @throws Exception
+ */
+ @Test (timeout=60000)
+ public void testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setInt(SimpleVTestDAG.TEZ_SIMPLE_V_DAG_NUM_TASKS, 1);
+ testConf.setBoolean(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v3"), true);
+ testConf.set(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v3"), "0");
+ testConf.setInt(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v3"), 1);
+
+ StepCheck[] check = {
+ // use regex for either vertices being possible on the path
+ createStep("v[12] : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v3 : 000000_1", CriticalPathDependency.RETRY_DEPENDENCY),
+ createStep("v3 : 000000_2", CriticalPathDependency.RETRY_DEPENDENCY),
+ };
+
+ DAG dag = SimpleVTestDAG.createDAG(
+ "testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check));
+ }
+
+ /**
+ * Input failure of v2,v3 trigger v1 rerun.
+ * Both v2 and v3 report error on v1 and dont exit. So one of them triggers next
+ * version of v1 and also consume the output of the next version. While the other
+ * consumes the output of the next version of v1.
+ * Reruns can send output to 2 downstream vertices.
+ * v1
+ * / \
+ * v2 v3
+ *
+ * Also covers multiple consumer vertices report failure against same producer task.
+ * @throws Exception
+ */
+ @Test (timeout=60000)
+ public void testInputFailureRerunCanSendOutputToTwoDownstreamVertices() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setInt(SimpleReverseVTestDAG.TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS, 1);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), false);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "-1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "-1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), "0");
+
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), false);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "0");
+
+ List<StepCheck[]> stepsOptions = Lists.newLinkedList();
+ StepCheck[] check1 = {
+ // use regex for either vertices being possible on the path
+ createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v[23] : 000000_0", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ };
+ StepCheck[] check2 = {
+ createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ };
+ stepsOptions.add(check1);
+ stepsOptions.add(check2);
+ DAG dag = SimpleReverseVTestDAG.createDAG(
+ "testInputFailureRerunCanSendOutputToTwoDownstreamVertices", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, stepsOptions);
+ }
+
+}
\ No newline at end of file