You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/08/25 20:10:14 UTC
tez git commit: TEZ-2690. Add critical path analyser (bikas)
Repository: tez
Updated Branches:
refs/heads/master 2e62e98ec -> 24e17a4e5
TEZ-2690. Add critical path analyser (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/24e17a4e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/24e17a4e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/24e17a4e
Branch: refs/heads/master
Commit: 24e17a4e5af7d81a08ca2019052896cf3b97f6a6
Parents: 2e62e98
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Aug 25 11:10:10 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Aug 25 11:10:10 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../parser/datamodel/TaskAttemptInfo.java | 43 ++
.../tez/history/parser/datamodel/TaskInfo.java | 9 +
.../history/parser/datamodel/VertexInfo.java | 20 +-
tez-tools/analyzers/job-analyzer/pom.xml | 16 +-
.../tez/analyzer/plugins/AnalyzerDriver.java | 39 ++
.../analyzer/plugins/CriticalPathAnalyzer.java | 398 ++++++++++++----
.../tez/analyzer/plugins/TezAnalyzerBase.java | 117 +++++
.../VertexLevelCriticalPathAnalyzer.java | 142 ++++++
.../org/apache/tez/analyzer/utils/SVGUtils.java | 461 ++++++++++---------
10 files changed, 953 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9fa3d33..d2c39e9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.
ALL CHANGES:
+ TEZ-2690. Add critical path analyser
TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
TEZ-2687. ATS History shutdown happens before the min-held containers are released
http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index ba676a2..ccec0db 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -19,7 +19,9 @@
package org.apache.tez.history.parser.datamodel;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.common.ATSConstants;
@@ -29,6 +31,7 @@ import org.apache.tez.common.counters.TezCounter;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
+import java.util.Comparator;
import java.util.Map;
import static org.apache.hadoop.classification.InterfaceStability.Evolving;
@@ -53,6 +56,7 @@ public class TaskAttemptInfo extends BaseInfo {
private final long lastDataEventTime;
private final String lastDataEventSourceTA;
private final String terminationCause;
+ private final long executionTimeInterval;
private TaskInfo taskInfo;
@@ -88,6 +92,17 @@ public class TaskAttemptInfo extends BaseInfo {
otherInfoNode.optString(ATSConstants.LAST_DATA_EVENT_SOURCE_TA));
terminationCause = StringInterner
.weakIntern(otherInfoNode.optString(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
+ executionTimeInterval = (endTime > startTime) ? (endTime - startTime) : 0;
+ }
+
+ public static Ordering<TaskAttemptInfo> orderingOnAllocationTime() {
+ return Ordering.from(new Comparator<TaskAttemptInfo>() {
+ @Override
+ public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+ return (o1.getAllocationTime() < o2.getAllocationTime() ? -1
+ : o1.getAllocationTime() > o2.getAllocationTime() ? 1 : 0);
+ }
+ });
}
void setTaskInfo(TaskInfo taskInfo) {
@@ -105,6 +120,22 @@ public class TaskAttemptInfo extends BaseInfo {
public final long getFinishTimeInterval() {
return endTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
}
+
+ public final long getExecutionTimeInterval() {
+ return executionTimeInterval;
+ }
+
+ public final long getAllocationToEndTimeInterval() {
+ return (endTime - allocationTime);
+ }
+
+ public final long getAllocationToStartTimeInterval() {
+ return (startTime - allocationTime);
+ }
+
+ public final long getCreationToAllocationTimeInterval() {
+ return (allocationTime - creationTime);
+ }
public final long getStartTime() {
return startTime;
@@ -141,6 +172,11 @@ public class TaskAttemptInfo extends BaseInfo {
public final long getAllocationTime() {
return allocationTime;
}
+
+ public final String getShortName() {
+ return getTaskInfo().getVertexInfo().getVertexName() + " : " +
+ taskAttemptId.substring(taskAttemptId.lastIndexOf('_', taskAttemptId.lastIndexOf('_') - 1) + 1);
+ }
@Override
public final String getDiagnostics() {
@@ -169,6 +205,13 @@ public class TaskAttemptInfo extends BaseInfo {
}
return false;
}
+
+ public final String getDetailedStatus() {
+ if (!Strings.isNullOrEmpty(getTerminationCause())) {
+ return getStatus() + ":" + getTerminationCause();
+ }
+ return getStatus();
+ }
public final TezCounter getLocalityInfo() {
Map<String, TezCounter> dataLocalTask = getCounter(DAGCounter.class.getName(),
http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
index 5e63efa..a30d311 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Ordering;
+import org.apache.directory.api.util.Strings;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.codehaus.jettison.json.JSONException;
@@ -204,6 +205,14 @@ public class TaskInfo extends BaseInfo {
* @return TaskAttemptInfo
*/
public final TaskAttemptInfo getSuccessfulTaskAttempt() {
+ if (Strings.isNotEmpty(getSuccessfulAttemptId())) {
+ for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+ if (attemptInfo.getTaskAttemptId().equals(getSuccessfulAttemptId())) {
+ return attemptInfo;
+ }
+ }
+ }
+ // fall back to checking status if successful attemt id is not available
for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
if (attemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.toString())) {
return attemptInfo;
http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
index 6e227a5..35da2d4 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
@@ -74,6 +74,8 @@ public class VertexInfo extends BaseInfo {
private final List<AdditionalInputOutputDetails> additionalInputInfoList;
private final List<AdditionalInputOutputDetails> additionalOutputInfoList;
+
+ private long avgExecutionTimeInterval = -1;
private DagInfo dagInfo;
@@ -143,7 +145,7 @@ public class VertexInfo extends BaseInfo {
this.additionalInputInfoList.clear();
this.additionalInputInfoList.addAll(additionalInputInfoList);
}
-
+
void setAdditionalOutputInfoList(List<AdditionalInputOutputDetails> additionalOutputInfoList) {
this.additionalOutputInfoList.clear();
this.additionalOutputInfoList.addAll(additionalOutputInfoList);
@@ -192,6 +194,22 @@ public class VertexInfo extends BaseInfo {
}
return getLastTaskToFinish().getFinishTimeInterval();
}
+
+ public final long getAvgExecutionTimeInterval() {
+ if (avgExecutionTimeInterval == -1) {
+ long totalExecutionTime = 0;
+ long totalAttempts = 0;
+ for (TaskInfo task : getTasks()) {
+ TaskAttemptInfo attempt = task.getSuccessfulTaskAttempt();
+ totalExecutionTime += attempt.getExecutionTimeInterval();
+ totalAttempts++;
+ }
+ if (totalAttempts > 0) {
+ avgExecutionTimeInterval = Math.round(totalExecutionTime*1.0/totalAttempts);
+ }
+ }
+ return avgExecutionTimeInterval;
+ }
public final long getStartTime() {
return startTime;
http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-tools/analyzers/job-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/pom.xml b/tez-tools/analyzers/job-analyzer/pom.xml
index 36b12fe..543ba1b 100644
--- a/tez-tools/analyzers/job-analyzer/pom.xml
+++ b/tez-tools/analyzers/job-analyzer/pom.xml
@@ -42,15 +42,21 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
- <dependency>
- <groupId>org.plutext</groupId>
- <artifactId>jaxb-svg11</artifactId>
- <version>1.0.2</version>
- </dependency>
</dependencies>
<build>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.tez.analyzer.plugins.AnalyzerDriver</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
new file mode 100644
index 0000000..33dbead
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import org.apache.hadoop.util.ProgramDriver;
+
+public class AnalyzerDriver {
+
+ public static void main(String argv[]){
+ int exitCode = -1;
+ ProgramDriver pgd = new ProgramDriver();
+ try {
+ pgd.addClass("CriticalPath", CriticalPathAnalyzer.class,
+ "Find the critical path of a DAG");
+ exitCode = pgd.run(argv);
+ } catch(Throwable e){
+ e.printStackTrace();
+ }
+
+ System.exit(exitCode);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index 88d45f3..448e785 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -18,78 +18,342 @@
package org.apache.tez.analyzer.plugins;
-import com.google.common.base.Functions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
+import java.io.File;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.analyzer.Analyzer;
import org.apache.tez.analyzer.CSVResult;
-import org.apache.tez.analyzer.utils.Utils;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType;
+import org.apache.tez.analyzer.utils.SVGUtils;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.history.parser.datamodel.Container;
import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Identify a set of vertices which fall in the critical path in a DAG.
- */
-public class CriticalPathAnalyzer implements Analyzer {
- private final Configuration config;
-
- private static final String[] headers = { "CriticalPath", "Score" };
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
- private final CSVResult csvResult;
+public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
- private static final String DOT_FILE_DIR = "tez.critical-path.analyzer.dot.output.loc";
- private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory
+ String succeededState = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name());
+ String failedState = StringInterner.weakIntern(TaskAttemptState.FAILED.name());
- private final String dotFileLocation;
+ private final static String DATA_DEPENDENCY = "Data-Dependency";
+ private final static String INIT_DEPENDENCY = "Init-Dependency";
+ private final static String COMMIT_DEPENDENCY = "Commit-Dependency";
+ private final static String NON_DATA_DEPENDENCY = "Non-Data-Dependency";
+ private final static String OUTPUT_LOST = "Previous version outputs lost";
- private static final String CONNECTOR = "-->";
+ public static class CriticalPathStep {
+ public enum EntityType {
+ ATTEMPT,
+ VERTEX_INIT,
+ DAG_COMMIT
+ }
- public CriticalPathAnalyzer(Configuration config) {
- this.config = config;
- this.csvResult = new CSVResult(headers);
- this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT);
+ EntityType type;
+ TaskAttemptInfo attempt;
+ String reason; // reason linking this to the previous step on the critical path
+ long startCriticalPathTime; // time at which attempt is on critical path
+ long stopCriticalPathTime; // time at which attempt is off critical path
+ List<String> notes = Lists.newLinkedList();
+
+ public CriticalPathStep(TaskAttemptInfo attempt, EntityType type) {
+ this.type = type;
+ this.attempt = attempt;
+ }
+ public EntityType getType() {
+ return type;
+ }
+ public TaskAttemptInfo getAttempt() {
+ return attempt;
+ }
+ public long getStartCriticalTime() {
+ return startCriticalPathTime;
+ }
+ public long getStopCriticalTime() {
+ return stopCriticalPathTime;
+ }
+ public String getReason() {
+ return reason;
+ }
+ public List<String> getNotes() {
+ return notes;
+ }
}
+
+ List<CriticalPathStep> criticalPath = Lists.newLinkedList();
+
+ Map<String, TaskAttemptInfo> attempts = Maps.newHashMap();
- @Override public void analyze(DagInfo dagInfo) throws TezException {
- Map<String, Long> result = Maps.newLinkedHashMap();
- getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result);
+ public CriticalPathAnalyzer() {
+ }
- Map<String, Long> sortedByValues = sortByValues(result);
- for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) {
- List<String> record = Lists.newLinkedList();
- record.add(entry.getKey());
- record.add(entry.getValue() + "");
- csvResult.addRecord(record.toArray(new String[record.size()]));
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+ // get all attempts in the dag and find the last failed/succeeded attempt.
+ // ignore killed attempt to handle kills that happen upon dag completion
+ TaskAttemptInfo lastAttempt = null;
+ long lastAttemptFinishTime = 0;
+ for (VertexInfo vertex : dagInfo.getVertices()) {
+ for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
+ attempts.put(attempt.getTaskAttemptId(), attempt);
+ if (attempt.getStatus().equals(succeededState) ||
+ attempt.getStatus().equals(failedState)) {
+ if (lastAttemptFinishTime < attempt.getFinishTime()) {
+ lastAttempt = attempt;
+ lastAttemptFinishTime = attempt.getFinishTime();
+ }
+ }
+ }
+ }
+
+ if (lastAttempt == null) {
+ System.out.println("Cannot find last attempt to finish in DAG " + dagInfo.getDagId());
+ return;
}
+
+ createCriticalPath(dagInfo, lastAttempt, lastAttemptFinishTime, attempts);
+
+ analyzeCriticalPath(dagInfo);
- String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + ".dot";
- try {
- List<String> criticalVertices = null;
- if (!sortedByValues.isEmpty()) {
- String criticalPath = sortedByValues.keySet().iterator().next();
- criticalVertices = getVertexNames(criticalPath);
- } else {
- criticalVertices = Lists.newLinkedList();
+ saveCriticalPathAsSVG(dagInfo);
+ }
+
+ private void saveCriticalPathAsSVG(DagInfo dagInfo) {
+ SVGUtils svg = new SVGUtils();
+ String outputFileName = getOutputDir() + File.separator + dagInfo.getDagId() + ".svg";
+ System.out.println("Writing output to: " + outputFileName);
+ svg.saveCriticalPathAsSVG(dagInfo, outputFileName, criticalPath);
+ }
+
+ private void analyzeCriticalPath(DagInfo dag) {
+ if (!criticalPath.isEmpty()) {
+ System.out.println("Walking critical path for dag " + dag.getDagId());
+ long dagStartTime = dag.getStartTime();
+ long dagTime = dag.getFinishTime() - dagStartTime;
+ long totalAttemptCriticalTime = 0;
+ for (int i = 0; i < criticalPath.size(); ++i) {
+ CriticalPathStep step = criticalPath.get(i);
+ totalAttemptCriticalTime += (step.stopCriticalPathTime - step.startCriticalPathTime);
+ TaskAttemptInfo attempt = step.attempt;
+ if (step.getType() == EntityType.ATTEMPT) {
+ // analyze execution overhead
+ long avgExecutionTime = attempt.getTaskInfo().getVertexInfo()
+ .getAvgExecutionTimeInterval();
+ if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) {
+ step.notes
+ .add("Potential straggler. Execution time " + attempt.getExecutionTimeInterval()
+ + " compared to vertex average of " + avgExecutionTime);
+ }
+
+ if (attempt.getStartTime() > step.startCriticalPathTime) {
+ // the attempt is critical before launching. So allocation overhead needs analysis
+ // analyzer allocation overhead
+ Container container = attempt.getContainer();
+ if (container != null) {
+ Collection<TaskAttemptInfo> attempts = dag.getContainerMapping().get(container);
+ if (attempts != null && !attempts.isEmpty()) {
+ // arrange attempts by allocation time
+ List<TaskAttemptInfo> attemptsList = Lists.newArrayList(attempts);
+ Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime());
+ // walk the list to record allocation time before the current attempt
+ long containerPreviousAllocatedTime = 0;
+ for (TaskAttemptInfo containerAttempt : attemptsList) {
+ if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) {
+ break;
+ }
+ System.out.println("Container: " + container.getId() + " running att: " +
+ containerAttempt.getTaskAttemptId() + " wait att: " + attempt.getTaskAttemptId());
+ containerPreviousAllocatedTime += containerAttempt.getAllocationToEndTimeInterval();
+ }
+ if (containerPreviousAllocatedTime == 0) {
+ step.notes.add("Container " + container.getId() + " newly allocated.");
+ } else {
+ if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) {
+ step.notes.add("Container " + container.getId() + " was fully allocated");
+ } else {
+ step.notes.add("Container " + container.getId() + " allocated for " +
+ SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " +
+ SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) +
+ " of allocation wait time");
+ }
+ }
+ }
+ }
+ }
+ }
}
- Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices);
- } catch (IOException e) {
- throw new TezException(e);
+ System.out
+ .println("DAG time taken: " + dagTime + " TotalAttemptTime: " + totalAttemptCriticalTime
+ + " DAG finish time: " + dag.getFinishTime() + " DAG start time: " + dagStartTime);
}
}
+
+ private void createCriticalPath(DagInfo dagInfo, TaskAttemptInfo lastAttempt,
+ long lastAttemptFinishTime, Map<String, TaskAttemptInfo> attempts) {
+ List<CriticalPathStep> tempCP = Lists.newLinkedList();
+ if (lastAttempt != null) {
+ TaskAttemptInfo currentAttempt = lastAttempt;
+ CriticalPathStep currentStep = new CriticalPathStep(currentAttempt, EntityType.DAG_COMMIT);
+ long currentAttemptStopCriticalPathTime = lastAttemptFinishTime;
+
+ // add the commit step
+ currentStep.stopCriticalPathTime = dagInfo.getFinishTime();
+ currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime;
+ currentStep.reason = COMMIT_DEPENDENCY;
+ tempCP.add(currentStep);
+ while (true) {
+ Preconditions.checkState(currentAttempt != null);
+ Preconditions.checkState(currentAttemptStopCriticalPathTime > 0);
+ System.out.println(
+ "Step: " + tempCP.size() + " Attempt: " + currentAttempt.getTaskAttemptId());
+ currentStep = new CriticalPathStep(currentAttempt, EntityType.ATTEMPT);
+ currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime;
+ tempCP.add(currentStep);
+
+ // find the next attempt on the critical path
+ boolean dataDependency = false;
+ // find out predecessor dependency
+ if (currentAttempt.getLastDataEventTime() > currentAttempt.getCreationTime()) {
+ dataDependency = true;
+ }
+
+ long startCriticalPathTime = 0;
+ String nextAttemptId = null;
+ String reason = null;
+ if (dataDependency) {
+ // last data event was produced after the attempt was scheduled. use
+ // data dependency
+ // typically case when scheduling ahead of time
+ System.out.println("Has data dependency");
+ if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) {
+ // there is a valid data causal TA. Use it.
+ nextAttemptId = currentAttempt.getLastDataEventSourceTA();
+ reason = DATA_DEPENDENCY;
+ startCriticalPathTime = currentAttempt.getLastDataEventTime();
+ System.out.println("Using data dependency " + nextAttemptId);
+ } else {
+ // there is no valid data causal TA. This means data event came from the same vertex
+ VertexInfo vertex = currentAttempt.getTaskInfo().getVertexInfo();
+ Preconditions.checkState(!vertex.getAdditionalInputInfoList().isEmpty(),
+ "Vertex: " + vertex.getVertexId() + " has no external inputs but the last data event "
+ + "TA is null for " + currentAttempt.getTaskAttemptId());
+ nextAttemptId = null;
+ reason = INIT_DEPENDENCY;
+ System.out.println("Using init dependency");
+ }
+ } else {
+ // attempt was scheduled after last data event. use scheduling dependency
+ // typically happens for retries
+ System.out.println("Has scheduling dependency");
+ if (!Strings.isNullOrEmpty(currentAttempt.getCreationCausalTA())) {
+ // there is a scheduling causal TA. Use it.
+ nextAttemptId = currentAttempt.getCreationCausalTA();
+ reason = NON_DATA_DEPENDENCY;
+ TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId);
+ if (nextAttempt != null) {
+ VertexInfo currentVertex = currentAttempt.getTaskInfo().getVertexInfo();
+ VertexInfo nextVertex = nextAttempt.getTaskInfo().getVertexInfo();
+ if (!nextVertex.getVertexName().equals(currentVertex.getVertexName())){
+ // cause from different vertex. Might be rerun to re-generate outputs
+ for (VertexInfo outVertex : currentVertex.getOutputVertices()) {
+ if (nextVertex.getVertexName().equals(outVertex.getVertexName())) {
+ // next vertex is an output vertex
+ reason = OUTPUT_LOST;
+ break;
+ }
+ }
+ }
+ }
+ startCriticalPathTime = currentAttempt.getCreationTime();
+ System.out.println("Using scheduling dependency " + nextAttemptId);
+ } else {
+ // there is no scheduling causal TA.
+ if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) {
+ // there is a data event going to the vertex. Count the time between data event and
+ // scheduling time as Initializer/Manager overhead and follow data dependency
+ nextAttemptId = currentAttempt.getLastDataEventSourceTA();
+ reason = DATA_DEPENDENCY;
+ startCriticalPathTime = currentAttempt.getLastDataEventTime();
+ long overhead = currentAttempt.getCreationTime()
+ - currentAttempt.getLastDataEventTime();
+ currentStep.notes
+ .add("Initializer/VertexManager scheduling overhead " + overhead + " ms");
+ System.out.println("Using data dependency " + nextAttemptId);
+ } else {
+ // there is no scheduling causal TA and no data event casual TA.
+ // the vertex has external input that sent the last data events
+ // or the vertex has external input but does not use events
+ // or the vertex has no external inputs or edges
+ nextAttemptId = null;
+ reason = INIT_DEPENDENCY;
+ System.out.println("Using init dependency");
+ }
+ }
+ }
+
+ currentStep.startCriticalPathTime = startCriticalPathTime;
+ currentStep.reason = reason;
+
+ if (Strings.isNullOrEmpty(nextAttemptId)) {
+ Preconditions.checkState(reason.equals(INIT_DEPENDENCY));
+ Preconditions.checkState(startCriticalPathTime == 0);
+ // no predecessor attempt found. this is the last step in the critical path
+ // assume attempts start critical path time is when its scheduled. before that is
+ // vertex initialization time
+ currentStep.startCriticalPathTime = currentStep.attempt.getCreationTime();
+
+ // add vertex init step
+ long initStepStopCriticalTime = currentStep.startCriticalPathTime;
+ currentStep = new CriticalPathStep(currentAttempt, EntityType.VERTEX_INIT);
+ currentStep.stopCriticalPathTime = initStepStopCriticalTime;
+ currentStep.startCriticalPathTime = dagInfo.getStartTime();
+ currentStep.reason = INIT_DEPENDENCY;
+ tempCP.add(currentStep);
+
+ if (!tempCP.isEmpty()) {
+ for (int i=tempCP.size() - 1; i>=0; --i) {
+ criticalPath.add(tempCP.get(i));
+ }
+ }
+ return;
+ }
+
+ currentAttempt = attempts.get(nextAttemptId);
+ currentAttemptStopCriticalPathTime = startCriticalPathTime;
+ }
+ }
+ }
+
@Override
public CSVResult getResult() throws TezException {
+ String[] headers = { "Entity", "PathReason", "Status", "CriticalStartTime",
+ "CriticalStopTime", "Notes" };
+
+ CSVResult csvResult = new CSVResult(headers);
+ for (CriticalPathStep step : criticalPath) {
+ String entity = (step.getType() == EntityType.ATTEMPT ? step.getAttempt().getTaskAttemptId()
+ : (step.getType() == EntityType.VERTEX_INIT
+ ? step.attempt.getTaskInfo().getVertexInfo().getVertexName() : "DAG COMMIT"));
+ String [] record = {entity, step.getReason(),
+ step.getAttempt().getDetailedStatus(), String.valueOf(step.getStartCriticalTime()),
+ String.valueOf(step.getStopCriticalTime()),
+ Joiner.on(";").join(step.getNotes())};
+ csvResult.addRecord(record);
+ }
return csvResult;
}
@@ -105,38 +369,12 @@ public class CriticalPathAnalyzer implements Analyzer {
@Override
public Configuration getConfiguration() {
- return config;
+ return getConf();
}
-
- private static Map<String, Long> sortByValues(Map<String, Long> result) {
- //Sort result by time in reverse order
- final Ordering<String> reversValueOrdering =
- Ordering.natural().reverse().nullsLast().onResultOf(Functions.forMap(result, null));
- Map<String, Long> orderedMap = ImmutableSortedMap.copyOf(result, reversValueOrdering);
- return orderedMap;
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new CriticalPathAnalyzer(), args);
+ System.exit(res);
}
- private static void getCriticalPath(String predecessor, VertexInfo dest, long time,
- Map<String, Long> result) {
- String destVertexName = (dest != null) ? (dest.getVertexName()) : "";
-
- if (dest != null) {
- time += dest.getTimeTaken();
- predecessor += destVertexName + CONNECTOR;
-
- for (VertexInfo incomingVertex : dest.getInputVertices()) {
- getCriticalPath(predecessor, incomingVertex, time, result);
- }
-
- result.put(predecessor, time);
- }
- }
-
- private static List<String> getVertexNames(String criticalPath) {
- if (Strings.isNullOrEmpty(criticalPath)) {
- return Lists.newLinkedList();
- }
- return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split
- (criticalPath));
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
new file mode 100644
index 0000000..3eb2f57
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import java.io.File;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+
+import com.google.common.base.Preconditions;
+
+public abstract class TezAnalyzerBase extends Configured implements Tool, Analyzer {
+
+
+ private static final String ATS_FILE_NAME = "atsFileName";
+ private static final String OUTPUT_DIR = "outputDir";
+ private static final String DAG_ID = "dagId";
+ private static final String HELP = "help";
+
+ private String outputDir;
+
+ @SuppressWarnings("static-access")
+ private static Options buildOptions() {
+ Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID)
+ .withDescription("DagId that needs to be analyzed").hasArg().isRequired(true).create();
+
+ Option outputDirOption = OptionBuilder.withArgName(OUTPUT_DIR).withLongOpt(OUTPUT_DIR)
+ .withDescription("Directory to write outputs to.").hasArg().isRequired(false).create();
+
+ Option inputATSFileNameOption = OptionBuilder.withArgName(ATS_FILE_NAME).withLongOpt
+ (ATS_FILE_NAME)
+ .withDescription("File with ATS data for the DAG").hasArg()
+ .isRequired(true).create();
+ Option help = OptionBuilder.withArgName(HELP).withLongOpt
+ (HELP)
+ .withDescription("print help")
+ .isRequired(false).create();
+
+ Options opts = new Options();
+ opts.addOption(dagIdOption);
+ opts.addOption(outputDirOption);
+ opts.addOption(inputATSFileNameOption);
+ opts.addOption(help);
+ return opts;
+ }
+
+ protected String getOutputDir() {
+ return outputDir;
+ }
+
+ private void printUsage() {
+ System.err.println("Analyzer base options are");
+ Options options = buildOptions();
+ for (Object obj : options.getOptions()) {
+ Option option = (Option) obj;
+ System.err.println(option.getArgName() + " : " + option.getDescription());
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ //Parse downloaded contents
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = new GnuParser().parse(buildOptions(), args);
+ } catch (ParseException e) {
+ System.err.println("Invalid options on command line");
+ printUsage();
+ return -1;
+ }
+
+ if(cmdLine.hasOption(HELP)) {
+ printUsage();
+ return 0;
+ }
+
+ outputDir = cmdLine.getOptionValue(OUTPUT_DIR);
+ if (outputDir == null) {
+ outputDir = System.getProperty("user.dir");
+ }
+
+ File file = new File(cmdLine.getOptionValue(ATS_FILE_NAME));
+ String dagId = cmdLine.getOptionValue(DAG_ID);
+
+ ATSFileParser parser = new ATSFileParser(file);
+ DagInfo dagInfo = parser.getDAGData(dagId);
+ Preconditions.checkState(dagInfo.getDagId().equals(dagId));
+ analyze(dagInfo);
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
new file mode 100644
index 0000000..9661ea3
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.utils.Utils;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Identify a set of vertices which fall in the critical path in a DAG.
+ */
+public class VertexLevelCriticalPathAnalyzer implements Analyzer {
+ private final Configuration config;
+
+ private static final String[] headers = { "CriticalPath", "Score" };
+
+ private final CSVResult csvResult;
+
+ private static final String DOT_FILE_DIR = "tez.critical-path.analyzer.dot.output.loc";
+ private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory
+
+ private final String dotFileLocation;
+
+ private static final String CONNECTOR = "-->";
+
+ public VertexLevelCriticalPathAnalyzer(Configuration config) {
+ this.config = config;
+ this.csvResult = new CSVResult(headers);
+ this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT);
+ }
+
+ @Override public void analyze(DagInfo dagInfo) throws TezException {
+ Map<String, Long> result = Maps.newLinkedHashMap();
+ getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result);
+
+ Map<String, Long> sortedByValues = sortByValues(result);
+ for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) {
+ List<String> record = Lists.newLinkedList();
+ record.add(entry.getKey());
+ record.add(entry.getValue() + "");
+ csvResult.addRecord(record.toArray(new String[record.size()]));
+ }
+
+ String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + ".dot";
+ try {
+ List<String> criticalVertices = null;
+ if (!sortedByValues.isEmpty()) {
+ String criticalPath = sortedByValues.keySet().iterator().next();
+ criticalVertices = getVertexNames(criticalPath);
+ } else {
+ criticalVertices = Lists.newLinkedList();
+ }
+ Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices);
+ } catch (IOException e) {
+ throw new TezException(e);
+ }
+ }
+
+ @Override
+ public CSVResult getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "CriticalPathAnalyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Analyze vertex level critical path of the DAG";
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ private static Map<String, Long> sortByValues(Map<String, Long> result) {
+ //Sort result by time in reverse order
+ final Ordering<String> reversValueOrdering =
+ Ordering.natural().reverse().nullsLast().onResultOf(Functions.forMap(result, null));
+ Map<String, Long> orderedMap = ImmutableSortedMap.copyOf(result, reversValueOrdering);
+ return orderedMap;
+ }
+
+ private static void getCriticalPath(String predecessor, VertexInfo dest, long time,
+ Map<String, Long> result) {
+ String destVertexName = (dest != null) ? (dest.getVertexName()) : "";
+
+ if (dest != null) {
+ time += dest.getTimeTaken();
+ predecessor += destVertexName + CONNECTOR;
+
+ for (VertexInfo incomingVertex : dest.getInputVertices()) {
+ getCriticalPath(predecessor, incomingVertex, time, result);
+ }
+
+ result.put(predecessor, time);
+ }
+ }
+
+ private static List<String> getVertexNames(String criticalPath) {
+ if (Strings.isNullOrEmpty(criticalPath)) {
+ return Lists.newLinkedList();
+ }
+ return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split
+ (criticalPath));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
index 4a582bb..50fe033 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
@@ -18,247 +18,294 @@
package org.apache.tez.analyzer.utils;
import org.apache.commons.io.IOUtils;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.commons.io.output.FileWriterWithEncoding;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep;
+import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
-import org.apache.tez.history.parser.datamodel.TaskInfo;
-import org.apache.tez.history.parser.datamodel.VertexInfo;
-import org.plutext.jaxb.svg11.Line;
-import org.plutext.jaxb.svg11.ObjectFactory;
-import org.plutext.jaxb.svg11.Svg;
-import org.plutext.jaxb.svg11.Title;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.namespace.QName;
-import java.io.BufferedReader;
+import com.google.common.base.Joiner;
+
import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.TreeSet;
+import java.text.DecimalFormat;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
public class SVGUtils {
- private static final String UTF8 = "UTF-8";
-
- private static final Logger LOG = LoggerFactory.getLogger(SVGUtils.class);
-
-
- private final ObjectFactory objectFactory;
- private final Svg svg;
- private final QName titleName = new QName("title");
-
private static int MAX_DAG_RUNTIME = 0;
private static final int SCREEN_WIDTH = 1800;
- private final DagInfo dagInfo;
-
- //Gap between various components
- private static final int DAG_GAP = 70;
- private static final int VERTEX_GAP = 50;
- private static final int TASK_GAP = 5;
- private static final int STROKE_WIDTH = 5;
-
- //To compute the size of the graph.
- private long MIN_X = Long.MAX_VALUE;
- private long MAX_X = Long.MIN_VALUE;
+ public SVGUtils() {
+ }
- private int x1 = 0;
- private int y1 = 0;
- private int y2 = 0;
+ private int Y_MAX;
+ private int X_MAX;
+ private static final DecimalFormat secondFormat = new DecimalFormat("#.##");
+ private static final int X_BASE = 100;
+ private static final int Y_BASE = 100;
+ private static final int TICK = 1;
+ private static final int STEP_GAP = 50;
+ private static final int TEXT_SIZE = 20;
+ private static final String RUNTIME_COLOR = "LightGreen";
+ private static final String ALLOCATION_OVERHEAD_COLOR = "GoldenRod";
+ private static final String LAUNCH_OVERHEAD_COLOR = "DarkSalmon";
+ private static final String BORDER_COLOR = "Sienna";
+ private static final String VERTEX_INIT_COMMIT_COLOR = "orange";
+ private static final String CRITICAL_COLOR = "IndianRed";
+ private static final float RECT_OPACITY = 1.0f;
+ private static final String TITLE_BR = " ";
- public SVGUtils(DagInfo dagInfo) {
- this.dagInfo = dagInfo;
- this.objectFactory = new ObjectFactory();
- this.svg = objectFactory.createSvg();
+ public static String getTimeStr(final long millis) {
+ long minutes = TimeUnit.MILLISECONDS.toMinutes(millis)
+ - TimeUnit.HOURS.toMinutes(TimeUnit.MILLISECONDS.toHours(millis));
+ long hours = TimeUnit.MILLISECONDS.toHours(millis);
+ StringBuilder b = new StringBuilder();
+ b.append(hours == 0 ? "" : String.valueOf(hours) + "h");
+ b.append(minutes == 0 ? "" : String.valueOf(minutes) + "m");
+ long seconds = millis - TimeUnit.MINUTES.toMillis(
+ TimeUnit.MILLISECONDS.toMinutes(millis));
+ b.append(secondFormat.format(seconds/1000.0) + "s");
+
+ return b.toString();
}
-
- private Line createLine(int x1, int y1, int x2, int y2) {
- Line line = objectFactory.createLine();
- line.setX1(scaleDown(x1) + "");
- line.setY1(y1 + "");
- line.setX2(scaleDown(x2) + "");
- line.setY2(y2 + "");
- return line;
+
+ List<String> svgLines = new LinkedList<>();
+
+ private final int addOffsetX(int x) {
+ int xOff = x + X_BASE;
+ X_MAX = Math.max(X_MAX, xOff);
+ return xOff;
}
-
- private Title createTitle(String msg) {
- Title t = objectFactory.createTitle();
- t.setContent(msg);
- return t;
+
+ private final int addOffsetY(int y) {
+ int yOff = y + Y_BASE;
+ Y_MAX = Math.max(Y_MAX, yOff);
+ return yOff;
}
-
- private Title createTitleForVertex(VertexInfo vertex) {
- String titleStr = vertex.getVertexName() + ":"
- + (vertex.getFinishTimeInterval())
- + " ms, RelativeTimeToDAG:"
- + (vertex.getInitTime() - this.dagInfo.getStartTime())
- + " ms, counters:" + vertex.getTezCounters();
- Title title = createTitle(titleStr);
- return title;
+
+ private int scaleDown(int len) {
+ return Math.round((len * 1.0f / MAX_DAG_RUNTIME) * SCREEN_WIDTH);
}
-
- private Title createTitleForTaskAttempt(TaskAttemptInfo taskAttemptInfo) {
- String titleStr = "RelativeTimeToVertex:"
- + (taskAttemptInfo.getStartTime() -
- taskAttemptInfo.getTaskInfo().getVertexInfo().getInitTime()) +
- " ms, " + taskAttemptInfo.toString() + ", counters:" + taskAttemptInfo.getTezCounters();
- Title title = createTitle(titleStr);
- return title;
+
+ private void addRectStr(int x, int width, int y, int height,
+ String fillColor, String borderColor, float opacity, String title) {
+ String rectStyle = "stroke: " + borderColor + "; fill: " + fillColor + "; opacity: " + opacity;
+ String rectStr = "<rect x=\"" + addOffsetX(scaleDown(x)) + "\""
+ + " y=\"" + addOffsetY(y) + "\""
+ + " width=\"" + scaleDown(width) + "\""
+ + " height=\"" + height + "\""
+ + " style=\"" + rectStyle + "\""
+ + " >"
+ + " <title>" + title +"</title>"
+ + " </rect>";
+ svgLines.add(rectStr);
}
-
- /**
- * Draw DAG from dagInfo
- *
- * @param dagInfo
- */
- private void drawDAG(DagInfo dagInfo) {
- Title title = createTitle(dagInfo.getDagId() + " : " + dagInfo.getTimeTaken() + " ms");
- int duration = (int) dagInfo.getFinishTimeInterval();
- MAX_DAG_RUNTIME = duration;
- MIN_X = Math.min(dagInfo.getStartTimeInterval(), MIN_X);
- MAX_X = Math.max(dagInfo.getFinishTimeInterval(), MAX_X);
- Line line = createLine(x1, y1, x1 + duration, y2);
- line.getSVGDescriptionClass().add(new JAXBElement<Title>(titleName, Title.class, title));
- line.setStyle("stroke: black; stroke-width:20");
- line.setOpacity("0.3");
- svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line);
- drawVertex();
+
+ private void addTextStr(int x, int y, String text, String anchor, int size, String title) {
+ String textStyle = "text-anchor: " + anchor + "; font-size: " + size + "px;";
+ String textStr = "<text x=\"" + addOffsetX(scaleDown(x)) + "\" "
+ + "y=\"" + addOffsetY(y) + "\" "
+ + "style=\"" + textStyle + "\" transform=\"\">"
+ + text
+ + " <title>" + title +"</title>"
+ + "</text>";
+ svgLines.add(textStr);
}
-
- private Collection<VertexInfo> getSortedVertices() {
- Collection<VertexInfo> vertices = this.dagInfo.getVertices();
- // Add corresponding vertex details
- TreeSet<VertexInfo> vertexSet = new TreeSet<VertexInfo>(
- new Comparator<VertexInfo>() {
- @Override
- public int compare(VertexInfo o1, VertexInfo o2) {
- return (int) (o1.getFirstTaskStartTimeInterval() - o2.getFirstTaskStartTimeInterval());
- }
- });
- vertexSet.addAll(vertices);
- return vertexSet;
+
+ private void addLineStr(int x1, int y1, int x2, int y2, String color, String title, int width) {
+ String style = "stroke: " + color + "; stroke-width:" + width;
+ String str = "<line x1=\"" + addOffsetX(scaleDown(x1)) + "\""
+ + " y1=\"" + addOffsetY(y1) + "\""
+ + " x2=\"" + addOffsetX(scaleDown(x2)) + "\""
+ + " y2=\"" + addOffsetY(y2) + "\""
+ + " style=\"" + style + "\""
+ + " >"
+ + " <title>" + title +"</title>"
+ + " </line>";
+ svgLines.add(str);
}
-
- private Collection<TaskInfo> getSortedTasks(VertexInfo vertexInfo) {
- Collection<TaskInfo> tasks = vertexInfo.getTasks();
- // Add corresponding task details
- TreeSet<TaskInfo> taskSet = new TreeSet<TaskInfo>(new Comparator<TaskInfo>() {
- @Override
- public int compare(TaskInfo o1, TaskInfo o2) {
- return (int) (o1.getSuccessfulTaskAttempt().getStartTimeInterval()
- - o2.getSuccessfulTaskAttempt().getStartTimeInterval());
+
+ public void drawStep(CriticalPathStep step, long dagStartTime, int yOffset) {
+ if (step.getType() != EntityType.ATTEMPT) {
+ // draw initial vertex or final commit overhead
+ StringBuilder title = new StringBuilder();
+ String text = null;
+ if (step.getType() == EntityType.VERTEX_INIT) {
+ String vertex = step.getAttempt().getTaskInfo().getVertexInfo().getVertexName();
+ text = vertex + " : Init";
+ title.append(text).append(TITLE_BR);
+ } else {
+ text = "Output Commit";
+ title.append(text).append(TITLE_BR);
}
- });
- taskSet.addAll(tasks);
- return taskSet;
- }
+ title.append("Critical Path Dependency: " + step.getReason()).append(TITLE_BR);
+ title.append(
+ "Critical Time: " + getTimeStr(step.getStopCriticalTime() - step.getStartCriticalTime()))
+ .append("");
+ title.append(Joiner.on(TITLE_BR).join(step.getNotes()));
+ String titleStr = title.toString();
+ int stopTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime);
+ int startTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime);
+ addRectStr(startTimeInterval,
+ (stopTimeInterval - startTimeInterval), yOffset * STEP_GAP, STEP_GAP,
+ VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
+ addTextStr((stopTimeInterval + startTimeInterval) / 2,
+ (yOffset * STEP_GAP + STEP_GAP / 2),
+ text, "middle",
+ TEXT_SIZE, titleStr);
+ } else {
+ TaskAttemptInfo attempt = step.getAttempt();
+ int startCriticalTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime);
+ int stopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime);
+ int creationTimeInterval = (int) (attempt.getCreationTime() - dagStartTime);
+ int allocationTimeInterval = (int) (attempt.getAllocationTime() - dagStartTime);
+ int launchTimeInterval = (int) (attempt.getStartTime() - dagStartTime);
+ int finishTimeInterval = (int) (attempt.getFinishTime() - dagStartTime);
+ System.out.println(attempt.getTaskAttemptId() + " " + creationTimeInterval + " "
+ + allocationTimeInterval + " " + launchTimeInterval + " " + finishTimeInterval);
+
+ StringBuilder title = new StringBuilder();
+ title.append("Attempt: " + attempt.getTaskAttemptId()).append(TITLE_BR);
+ title.append("Critical Path Dependency: " + step.getReason()).append(TITLE_BR);
+ title.append("Completion Status: " + attempt.getDetailedStatus()).append(TITLE_BR);
+ title.append(
+ "Critical Time Contribution: " +
+ getTimeStr(step.getStopCriticalTime() - step.getStartCriticalTime())).append(TITLE_BR);
+ title.append("Critical start at: " + getTimeStr(startCriticalTimeInterval)).append(TITLE_BR);
+ title.append("Critical stop at: " + getTimeStr(stopCriticalTimeInterval)).append(TITLE_BR);
+ title.append("Created at: " + getTimeStr(creationTimeInterval)).append(TITLE_BR);
+ title.append("Allocated at: " + getTimeStr(allocationTimeInterval)).append(TITLE_BR);
+ title.append("Launched at: " + getTimeStr(launchTimeInterval)).append(TITLE_BR);
+ title.append("Finished at: " + getTimeStr(finishTimeInterval)).append(TITLE_BR);
+ title.append(Joiner.on(TITLE_BR).join(step.getNotes()));
+ String titleStr = title.toString();
- /**
- * Draw the vertices
- *
- */
- public void drawVertex() {
- Collection<VertexInfo> vertices = getSortedVertices();
- for (VertexInfo vertex : vertices) {
- //Set vertex start time as the one when its first task attempt started executing
- x1 = (int) vertex.getStartTimeInterval();
- y1 += VERTEX_GAP;
- int duration = ((int) (vertex.getTimeTaken()));
- Line line = createLine(x1, y1, x1 + duration, y1);
- line.setStyle("stroke: red; stroke-width:" + STROKE_WIDTH);
- line.setOpacity("0.3");
+ addRectStr(creationTimeInterval, allocationTimeInterval - creationTimeInterval,
+ yOffset * STEP_GAP, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY,
+ titleStr);
- Title vertexTitle = createTitleForVertex(vertex);
- line.getSVGDescriptionClass().add(
- new JAXBElement<Title>(titleName, Title.class, vertexTitle));
- svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line);
- // For each vertex, draw the tasks
- drawTask(vertex);
+ addRectStr(allocationTimeInterval, launchTimeInterval - allocationTimeInterval,
+ yOffset * STEP_GAP, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY,
+ titleStr);
+
+ addRectStr(launchTimeInterval, finishTimeInterval - launchTimeInterval, yOffset * STEP_GAP,
+ STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
+
+ addTextStr((finishTimeInterval + creationTimeInterval) / 2,
+ (yOffset * STEP_GAP + STEP_GAP / 2), attempt.getShortName(), "middle", TEXT_SIZE, titleStr);
}
- x1 = x1 + (int) dagInfo.getFinishTimeInterval();
- y1 = y1 + DAG_GAP;
- y2 = y1;
}
- /**
- * Draw tasks
- *
- * @param vertex
- */
- public void drawTask(VertexInfo vertex) {
- Collection<TaskInfo> tasks = getSortedTasks(vertex);
- for (TaskInfo task : tasks) {
- for (TaskAttemptInfo taskAttemptInfo : task.getTaskAttempts()) {
- x1 = (int) taskAttemptInfo.getStartTimeInterval();
- y1 += TASK_GAP;
- int duration = (int) taskAttemptInfo.getTimeTaken();
- Line line = createLine(x1, y1, x1 + duration, y1);
- String color =
- taskAttemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.name())
- ? "green" : "red";
- line.setStyle("stroke: " + color + "; stroke-width:" + STROKE_WIDTH);
- Title title = createTitleForTaskAttempt(taskAttemptInfo);
- line.getSVGDescriptionClass().add(
- new JAXBElement<Title>(titleName, Title.class, title));
- svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass()
- .add(line);
+ private void drawCritical(DagInfo dagInfo, List<CriticalPathStep> criticalPath) {
+ int duration = (int) dagInfo.getFinishTimeInterval();
+ MAX_DAG_RUNTIME = duration;
+ long dagStartTime = dagInfo.getStartTime();
+ int dagStartTimeInterval = 0; // this is 0 since we are offseting from the dag start time
+ int dagFinishTimeInterval = (int) (dagInfo.getFinishTime() - dagStartTime);
+
+ // draw grid
+ addLineStr(dagStartTimeInterval, 0, dagFinishTimeInterval, 0, BORDER_COLOR, "", TICK);
+ int yGrid = (criticalPath.size() + 2)*STEP_GAP;
+ for (int i=0; i<11; ++i) {
+ int x = Math.round(((dagFinishTimeInterval - dagStartTimeInterval)/10.0f)*i);
+ addLineStr(x, 0, x, yGrid, BORDER_COLOR, "", TICK);
+ addTextStr(x, 0, getTimeStr(x), "left", TEXT_SIZE, "");
+ }
+ addLineStr(dagStartTimeInterval, yGrid, dagFinishTimeInterval, yGrid, BORDER_COLOR, "", TICK);
+ addTextStr((dagFinishTimeInterval + dagStartTimeInterval) / 2, yGrid + STEP_GAP,
+ "Critical Path for " + dagInfo.getName() + " (" + dagInfo.getDagId() + ")", "middle",
+ TEXT_SIZE, "");
+
+ // draw steps
+ for (int i=1; i<=criticalPath.size(); ++i) {
+ CriticalPathStep step = criticalPath.get(i-1);
+ drawStep(step, dagStartTime, i);
+ }
+
+ // draw critical path on top
+ for (int i=1; i<=criticalPath.size(); ++i) {
+ CriticalPathStep step = criticalPath.get(i-1);
+ boolean isLast = i == criticalPath.size();
+
+ // draw critical path for step
+ int startCriticalTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime);
+ int stopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime);
+ addLineStr(startCriticalTimeInterval, (i + 1) * STEP_GAP, stopCriticalTimeInterval,
+ (i + 1) * STEP_GAP, CRITICAL_COLOR, "Critical Time " + step.getAttempt().getShortName(), TICK*5);
+
+ if (isLast) {
+ // last step. add commit overhead
+ int stepStopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime);
+ addLineStr(stepStopCriticalTimeInterval, (i + 1) * STEP_GAP, dagFinishTimeInterval,
+ (i + 1) * STEP_GAP, CRITICAL_COLOR,
+ "Critical Time " + step.getAttempt().getTaskInfo().getVertexInfo().getVertexName(), TICK*5);
+ } else {
+ // connect to next step in critical path
+ addLineStr(stopCriticalTimeInterval, (i + 1) * STEP_GAP, stopCriticalTimeInterval,
+ (i + 2) * STEP_GAP, CRITICAL_COLOR, "Critical Time " + step.getAttempt().getShortName(), TICK*5);
}
}
+
+ // draw legend
+ int legendX = 0;
+ int legendY = (criticalPath.size() + 2) * STEP_GAP;
+ int legendWidth = 10000;
+
+ addRectStr(legendX, legendWidth, legendY, STEP_GAP, VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+ addTextStr(legendX, legendY + STEP_GAP/2, "Vertex Init/Commit Overhead", "left", TEXT_SIZE, "");
+ legendY += STEP_GAP;
+ addRectStr(legendX, legendWidth, legendY, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+ addTextStr(legendX, legendY + STEP_GAP/2, "Task Allocation Overhead", "left", TEXT_SIZE, "");
+ legendY += STEP_GAP;
+ addRectStr(legendX, legendWidth, legendY, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+ addTextStr(legendX, legendY + STEP_GAP/2, "Task Launch Overhead", "left", TEXT_SIZE, "");
+ legendY += STEP_GAP;
+ addRectStr(legendX, legendWidth, legendY, STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, "");
+ addTextStr(legendX, legendY + STEP_GAP/2, "Task Execution Time", "left", TEXT_SIZE, "");
+
+ Y_MAX += Y_BASE*2;
+ X_MAX += X_BASE*2;
}
-
- /**
- * Convert DAG to graph
- *
- * @throws java.io.IOException
- * @throws javax.xml.bind.JAXBException
- */
- public void saveAsSVG(String fileName) throws IOException, JAXBException {
- drawDAG(dagInfo);
- svg.setHeight("" + y2);
- svg.setWidth("" + (MAX_X - MIN_X));
- String tempFileName = System.nanoTime() + ".svg";
- File file = new File(tempFileName);
- JAXBContext jaxbContext = JAXBContext.newInstance(Svg.class);
- Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
- jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
- jaxbMarshaller.marshal(svg, file);
- //TODO: dirty workaround to get rid of XMLRootException issue
- BufferedReader reader = new BufferedReader(
- new InputStreamReader(new FileInputStream(file), UTF8));
- BufferedWriter writer = new BufferedWriter(
- new OutputStreamWriter(new FileOutputStream(fileName), UTF8));
+
+ public void saveCriticalPathAsSVG(DagInfo dagInfo,
+ String fileName, List<CriticalPathStep> criticalPath) {
+ drawCritical(dagInfo, criticalPath);
+ saveFileStr(fileName);
+ }
+
+ private void saveFileStr(String fileName) {
+ String header = "<?xml version=\"1.0\" standalone=\"no\"?> "
+ + "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\" "
+ + "\"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">"
+ + "<svg xmlns=\"http://www.w3.org/2000/svg\" version=\"1.1\" "
+ + "xmlns:xlink=\"http://www.w3.org/1999/xlink\" "
+ + "height=\"" + Y_MAX + "\" "
+ + "width=\"" + X_MAX + "\"> "
+ + "<script type=\"text/ecmascript\" "
+ + "xlink:href=\"http://code.jquery.com/jquery-2.1.1.min.js\" />";
+ String footer = "</svg>";
+ String newline = System.getProperty("line.separator");
+ BufferedWriter writer = null;
try {
- while (reader.ready()) {
- String line = reader.readLine();
- if (line != null) {
- line = line.replaceAll(
- " xmlns:ns3=\"http://www.w3.org/2000/svg\" xmlns=\"\"", "");
- writer.write(line);
- writer.newLine();
- }
+ writer = new BufferedWriter(new FileWriterWithEncoding(fileName, "UTF-8"));
+ writer.write(header);
+ writer.write(newline);
+ for (String str : svgLines) {
+ writer.write(str);
+ writer.write(newline);
}
+ writer.write(footer);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
} finally {
- IOUtils.closeQuietly(reader);
- IOUtils.closeQuietly(writer);
- if (file.exists()) {
- boolean deleted = file.delete();
- LOG.debug("Deleted {}" + file.getAbsolutePath());
+ if (writer != null) {
+ IOUtils.closeQuietly(writer);
}
}
- }
- private float scaleDown(int len) {
- return (len * 1.0f / MAX_DAG_RUNTIME) * SCREEN_WIDTH;
}
+
}
\ No newline at end of file