You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/08/11 14:46:18 UTC
[1/2] tez git commit: TEZ-2692. bugfixes & enhancements related to
job parser and analyzer (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master eadbfec44 -> ecd90dc1f
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
index 8df40ba..a570493 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
@@ -18,6 +18,7 @@
package org.apache.tez.analyzer.plugins;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.analyzer.Analyzer;
@@ -43,15 +44,21 @@ import java.util.Map;
*/
public class ShuffleTimeAnalyzer implements Analyzer {
- private static final String SHUFFLE_TIME_RATIO = "tez.shuffle-time-analyzer.shuffle.ratio";
- private static final float SHUFFLE_TIME_RATIO_DEFAULT = 0.5f;
+ /**
+ * ratio of (total time taken by task - shuffle time) / (total time taken by task)
+ */
+ private static final String REAL_WORK_DONE_RATIO = "tez.shuffle-time-analyzer.real-work.done.ratio";
+ private static final float REAL_WORK_DONE_RATIO_DEFAULT = 0.5f;
+ /**
+ * Number of min records that needs to get in as reduce input records.
+ */
private static final String MIN_SHUFFLE_RECORDS = "tez.shuffle-time-analyzer.shuffle.min.records";
private static final long MIN_SHUFFLE_RECORDS_DEFAULT = 10000;
private static final String[] headers = { "vertexName", "taskAttemptId", "Node", "counterGroup",
"Comments", "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES",
- "Time taken to receive all events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME",
+ "TotalTime", "Time_taken_to_receive_all_events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME",
"TimeTaken_For_Real_Task", "FIRST_EVENT_RECEIVED", "LAST_EVENT_RECEIVED",
"SHUFFLE_BYTES_DISK_DIRECT" };
@@ -59,15 +66,15 @@ public class ShuffleTimeAnalyzer implements Analyzer {
private final Configuration config;
- private final float shuffleTimeRatio;
+ private final float realWorkDoneRatio;
private final long minShuffleRecords;
public ShuffleTimeAnalyzer(Configuration config) {
this.config = config;
- shuffleTimeRatio = config.getFloat
- (SHUFFLE_TIME_RATIO, SHUFFLE_TIME_RATIO_DEFAULT);
+ realWorkDoneRatio = config.getFloat
+ (REAL_WORK_DONE_RATIO, REAL_WORK_DONE_RATIO_DEFAULT);
minShuffleRecords = config.getLong(MIN_SHUFFLE_RECORDS, MIN_SHUFFLE_RECORDS_DEFAULT);
}
@@ -105,15 +112,20 @@ public class ShuffleTimeAnalyzer implements Analyzer {
result.add(counterGroupName);
//Real work done in the task
- long timeTakenForRealWork = attemptInfo.getTimeTaken() -
- Long.parseLong(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName,
- attemptInfo));
-
String comments = "";
- if ((timeTakenForRealWork * 1.0f / attemptInfo.getTimeTaken()) < shuffleTimeRatio) {
- comments = "Time taken in shuffle is more than the actual work being done in task. "
- + " Check if source/destination machine is a slow node. Check if merge phase "
- + "time is more to understand disk bottlenecks in this node. Check for skew";
+ String mergePhaseTime = getCounterValue(TaskCounter.MERGE_PHASE_TIME,
+ counterGroupName, attemptInfo);
+ String timeTakenForRealWork = "";
+ if (!Strings.isNullOrEmpty(mergePhaseTime)) {
+ long realWorkDone = attemptInfo.getTimeTaken() - Long.parseLong(mergePhaseTime);
+
+ if ((realWorkDone * 1.0f / attemptInfo.getTimeTaken()) < realWorkDoneRatio) {
+ comments = "Time taken in shuffle is more than the actual work being done in task. "
+ + " Check if source/destination machine is a slow node. Check if merge phase "
+ + "time is more to understand disk bottlenecks in this node. Check for skew";
+ }
+
+ timeTakenForRealWork = Long.toString(realWorkDone);
}
result.add(comments);
@@ -122,13 +134,14 @@ public class ShuffleTimeAnalyzer implements Analyzer {
result.add("" + (1.0f * reduceInputGroupsVal / reduceInputRecordsVal));
result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES, counterGroupName, attemptInfo));
+ result.add(Long.toString(attemptInfo.getTimeTaken()));
+
//Total time taken for receiving all events from source tasks
result.add(getOverheadFromSourceTasks(counterGroupName, attemptInfo));
result.add(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName, attemptInfo));
result.add(getCounterValue(TaskCounter.SHUFFLE_PHASE_TIME, counterGroupName, attemptInfo));
-
- result.add(Long.toString(timeTakenForRealWork));
+ result.add(timeTakenForRealWork);
result.add(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, counterGroupName, attemptInfo));
result.add(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, counterGroupName, attemptInfo));
@@ -150,11 +163,16 @@ public class ShuffleTimeAnalyzer implements Analyzer {
* @return String
*/
private String getOverheadFromSourceTasks(String counterGroupName, TaskAttemptInfo attemptInfo) {
- long firstEventReceived = Long.parseLong(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED,
- counterGroupName, attemptInfo));
- long lastEventReceived = Long.parseLong(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED,
- counterGroupName, attemptInfo));
- return Long.toString(lastEventReceived - firstEventReceived);
+ String firstEventReceived = getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED,
+ counterGroupName, attemptInfo);
+ String lastEventReceived = getCounterValue(TaskCounter.LAST_EVENT_RECEIVED,
+ counterGroupName, attemptInfo);
+
+ if (!Strings.isNullOrEmpty(firstEventReceived) && !Strings.isNullOrEmpty(lastEventReceived)) {
+ return Long.toString(Long.parseLong(lastEventReceived) - Long.parseLong(firstEventReceived));
+ } else {
+ return "";
+ }
}
private String getCounterValue(TaskCounter counter, String counterGroupName,
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
index 8152344..f09380d 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
@@ -57,6 +57,10 @@ import java.util.Map;
*/
public class SkewAnalyzer implements Analyzer {
+ /**
+ * Amount of bytes that was sent as shuffle bytes from source. If it is below this threshold,
+ * it would not be considered for analysis.
+ */
private static final String SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE = "tez.skew-analyzer.shuffle"
+ ".bytes.per.source";
private static final long SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT = 900 * 1024 * 1024l;
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
index 7c7f5c0..1a8d9d3 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
@@ -41,7 +41,7 @@ import java.util.List;
public class SlowTaskIdentifier implements Analyzer {
private static final String[] headers = { "vertexName", "taskAttemptId",
- "Node", "taskDuration", "Status",
+ "Node", "taskDuration", "Status", "diagnostics",
"NoOfInputs" };
private final CSVResult csvResult;
@@ -72,14 +72,21 @@ public class SlowTaskIdentifier implements Analyzer {
}
});
- int limit = config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT);
- for(int i=0;i<limit;i++) {
+ int limit = Math.min(taskAttempts.size(),
+ Math.max(0, config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT)));
+
+ if (limit == 0) {
+ return;
+ }
+
+ for (int i = 0; i < limit - 1; i++) {
List<String> record = Lists.newLinkedList();
record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getVertexName());
record.add(taskAttempts.get(i).getTaskAttemptId());
record.add(taskAttempts.get(i).getContainer().getHost());
record.add(taskAttempts.get(i).getTimeTaken() + "");
record.add(taskAttempts.get(i).getStatus());
+ record.add(taskAttempts.get(i).getDiagnostics());
record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getInputEdges().size() + "");
csvResult.addRecord(record.toArray(new String[record.size()]));
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
index b7fca0b..c8d9695 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
@@ -29,6 +29,7 @@ import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;
import java.util.List;
@@ -41,7 +42,7 @@ public class SlowestVertexAnalyzer implements Analyzer {
private static final String[] headers = { "vertexName", "taskAttempts", "totalTime",
"shuffleTime", "shuffleTime_Max", "LastEventReceived", "LastEventReceivedFrom",
- "TimeTaken_ForRealWork", "75thPercentile", "95thPercentile", "98thPercentile", "Median",
+ "75thPercentile", "95thPercentile", "98thPercentile", "Median",
"observation", "comments" };
private final CSVResult csvResult = new CSVResult(headers);
@@ -50,8 +51,27 @@ public class SlowestVertexAnalyzer implements Analyzer {
private final MetricRegistry metrics = new MetricRegistry();
private Histogram taskAttemptRuntimeHistorgram;
+ private final static String MAX_VERTEX_RUNTIME = "tez.slowest-vertex-analyzer.max.vertex.runtime";
+ private final static long MAX_VERTEX_RUNTIME_DEFAULT = 100000;
+
+ private final long vertexRuntimeThreshold;
+
public SlowestVertexAnalyzer(Configuration config) {
this.config = config;
+ this.vertexRuntimeThreshold = Math.max(1, config.getLong(MAX_VERTEX_RUNTIME,
+ MAX_VERTEX_RUNTIME_DEFAULT));
+
+ }
+
+ private long getTaskRuntime(VertexInfo vertexInfo) {
+ TaskInfo firstTaskToStart = vertexInfo.getFirstTaskToStart();
+ TaskInfo lastTaskToFinish = vertexInfo.getLastTaskToFinish();
+
+ DagInfo dagInfo = vertexInfo.getDagInfo();
+ long totalTime = ((lastTaskToFinish == null) ?
+ dagInfo.getFinishTime() : lastTaskToFinish.getFinishTime()) -
+ ((firstTaskToStart == null) ? dagInfo.getStartTime() : firstTaskToStart.getStartTime());
+ return totalTime;
}
@Override
@@ -59,9 +79,13 @@ public class SlowestVertexAnalyzer implements Analyzer {
for (VertexInfo vertexInfo : dagInfo.getVertices()) {
String vertexName = vertexInfo.getVertexName();
- long totalTime = vertexInfo.getTimeTaken();
+ if (vertexInfo.getFirstTaskToStart() == null || vertexInfo.getLastTaskToFinish() == null) {
+ continue;
+ }
+
+ long totalTime = getTaskRuntime(vertexInfo);
- long max = Long.MIN_VALUE;
+ long slowestLastEventTime = Long.MIN_VALUE;
String maxSourceName = "";
taskAttemptRuntimeHistorgram = metrics.histogram(vertexName);
@@ -81,10 +105,8 @@ public class SlowestVertexAnalyzer implements Analyzer {
continue;
}
//Find the slowest last event received
- if (entry.getValue().getValue() > max) {
- //w.r.t vertex start time.
- max =(attemptInfo.getStartTimeInterval() + entry.getValue().getValue()) -
- (vertexInfo.getStartTimeInterval());
+ if (entry.getValue().getValue() > slowestLastEventTime) {
+ slowestLastEventTime = entry.getValue().getValue();
maxSourceName = entry.getKey();
}
}
@@ -104,9 +126,7 @@ public class SlowestVertexAnalyzer implements Analyzer {
}
//Find the slowest last event received
if (entry.getValue().getValue() > shuffleMax) {
- //w.r.t vertex start time.
- shuffleMax =(attemptInfo.getStartTimeInterval() + entry.getValue().getValue()) -
- (vertexInfo.getStartTimeInterval());
+ shuffleMax = entry.getValue().getValue();
shuffleMaxSource = entry.getKey();
}
}
@@ -120,9 +140,10 @@ public class SlowestVertexAnalyzer implements Analyzer {
record.add(totalTime + "");
record.add(Math.max(0, shuffleMax) + "");
record.add(shuffleMaxSource);
- record.add(Math.max(0, max) + "");
+ record.add(Math.max(0, slowestLastEventTime) + "");
record.add(maxSourceName);
- record.add(Math.max(0,(totalTime - max)) + "");
+ //Finding out real_work done at vertex level might be meaningless (as it is quite posisble
+ // that it went to starvation).
StringBuilder sb = new StringBuilder();
double percentile75 = taskAttemptRuntimeHistorgram.getSnapshot().get75thPercentile();
@@ -145,7 +166,7 @@ public class SlowestVertexAnalyzer implements Analyzer {
if (totalTime > 0 && vertexInfo.getTaskAttempts().size() > 0) {
if ((shuffleMax * 1.0f / totalTime) > 0.5) {
- if ((max * 1.0f / totalTime) > 0.5) {
+ if ((slowestLastEventTime * 1.0f / totalTime) > 0.5) {
comments = "This vertex is slow due to its dependency on parent. Got a lot delayed last"
+ " event received";
} else {
@@ -153,8 +174,9 @@ public class SlowestVertexAnalyzer implements Analyzer {
"Spending too much time on shuffle. Check shuffle bytes from previous vertex";
}
} else {
- if (totalTime > 10000) { //greater than 10 seconds. //TODO: Configure it later.
- comments = "Concentrate on this vertex (totalTime > 10 seconds)";
+ if (totalTime > vertexRuntimeThreshold) { //greater than X seconds.
+ comments = "Concentrate on this vertex (totalTime > " + vertexRuntimeThreshold
+ + " seconds)";
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
index c650104..83b1bb0 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
@@ -49,12 +49,21 @@ public class SpillAnalyzerImpl implements Analyzer {
private final CSVResult csvResult;
- private static long OUTPUT_BYTES_THRESHOLD = 1 * 1024 * 1024 * 1024l;
+ /**
+ * Minimum output bytes that should be chunrned out by a task
+ */
+ private static final String OUTPUT_BYTES_THRESHOLD = "tez.spill-analyzer.min.output.bytes"
+ + ".threshold";
+ private static long OUTPUT_BYTES_THRESHOLD_DEFAULT = 1 * 1024 * 1024 * 1024l;
+
+ private final long minOutputBytesPerTask;
private final Configuration config;
public SpillAnalyzerImpl(Configuration config) {
this.config = config;
+ minOutputBytesPerTask = Math.max(0, config.getLong(OUTPUT_BYTES_THRESHOLD,
+ OUTPUT_BYTES_THRESHOLD_DEFAULT));
this.csvResult = new CSVResult(headers);
}
@@ -83,7 +92,7 @@ public class SpillAnalyzerImpl implements Analyzer {
long outputRecords = outputRecordsMap.get(source).getValue();
long spilledRecords = spilledRecordsMap.get(source).getValue();
- if (spillCount > 1 && outBytes > OUTPUT_BYTES_THRESHOLD) {
+ if (spillCount > 1 && outBytes > minOutputBytesPerTask) {
List<String> recorList = Lists.newLinkedList();
recorList.add(vertexName);
recorList.add(attemptInfo.getTaskAttemptId());
@@ -95,7 +104,7 @@ public class SpillAnalyzerImpl implements Analyzer {
recorList.add(outputRecords + "");
recorList.add(spilledRecords + "");
recorList.add("Consider increasing " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB
- + ", try increasing container size.");
+ + ". Try increasing container size.");
csvResult.addRecord(recorList.toArray(new String[recorList.size()]));
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
new file mode 100644
index 0000000..c07ff83
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.TreeMultiset;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Analyze concurrent tasks running in every vertex at regular intervals.
+ */
+public class TaskConcurrencyAnalyzer implements Analyzer {
+
+ private static final String[] headers = { "time", "vertexName", "concurrentTasksRunning" };
+
+ private final CSVResult csvResult;
+ private final Configuration config;
+
+ public TaskConcurrencyAnalyzer(Configuration conf) {
+ this.csvResult = new CSVResult(headers);
+ this.config = conf;
+ }
+
+ private enum EventType {START, FINISH}
+
+ static class TimeInfo {
+ EventType eventType;
+ long timestamp;
+ int concurrentTasks;
+
+ public TimeInfo(EventType eventType, long timestamp) {
+ this.eventType = eventType;
+ this.timestamp = timestamp;
+ }
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+
+ //For each vertex find the concurrent tasks running at any point
+ for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+ List<TaskAttemptInfo> taskAttempts =
+ Lists.newLinkedList(vertexInfo.getTaskAttempts(true, null));
+
+ String vertexName = vertexInfo.getVertexName();
+
+ /**
+ * - Get sorted multi-set of timestamps (S1, S2,...E1, E2..). Possible to have multiple
+ * tasks starting/ending at same time.
+ * - Walk through the set
+ * - Increment concurrent tasks when start event is encountered
+ * - Decrement concurrent tasks when start event is encountered
+ */
+ TreeMultiset<TimeInfo> timeInfoSet = TreeMultiset.create(new Comparator<TimeInfo>() {
+ @Override public int compare(TimeInfo o1, TimeInfo o2) {
+ return (o1.timestamp < o2.timestamp) ? -1 :
+ ((o1.timestamp == o2.timestamp) ? 0 : 1);
+ }
+ });
+
+ for (TaskAttemptInfo attemptInfo : taskAttempts) {
+ TimeInfo startTimeInfo = new TimeInfo(EventType.START, attemptInfo.getStartTime());
+ TimeInfo stopTimeInfo = new TimeInfo(EventType.FINISH, attemptInfo.getFinishTime());
+
+ timeInfoSet.add(startTimeInfo);
+ timeInfoSet.add(stopTimeInfo);
+ }
+
+ //Compute concurrent tasks in the list now.
+ int concurrentTasks = 0;
+ for(TimeInfo timeInfo : timeInfoSet.elementSet()) {
+ switch (timeInfo.eventType) {
+ case START:
+ concurrentTasks += timeInfoSet.count(timeInfo);
+ break;
+ case FINISH:
+ concurrentTasks -= timeInfoSet.count(timeInfo);
+ break;
+ default:
+ break;
+ }
+ timeInfo.concurrentTasks = concurrentTasks;
+ addToResult(vertexName, timeInfo.timestamp, timeInfo.concurrentTasks);
+ }
+ }
+ }
+
+ private void addToResult(String vertexName, long currentTime, int concurrentTasks) {
+ String[] record = { currentTime + "", vertexName, concurrentTasks + "" };
+ csvResult.addRecord(record);
+ }
+
+ @Override
+ public CSVResult getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "TaskConcurrencyAnalyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Analyze how many tasks were running in every vertex at given point in time. This "
+ + "would be helpful in understanding whether any starvation was there or not.";
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
new file mode 100644
index 0000000..4a582bb
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.analyzer.utils;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.plutext.jaxb.svg11.Line;
+import org.plutext.jaxb.svg11.ObjectFactory;
+import org.plutext.jaxb.svg11.Svg;
+import org.plutext.jaxb.svg11.Title;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.namespace.QName;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.TreeSet;
+
+public class SVGUtils {
+
+ private static final String UTF8 = "UTF-8";
+
+ private static final Logger LOG = LoggerFactory.getLogger(SVGUtils.class);
+
+
+ private final ObjectFactory objectFactory;
+ private final Svg svg;
+ private final QName titleName = new QName("title");
+
+ private static int MAX_DAG_RUNTIME = 0;
+ private static final int SCREEN_WIDTH = 1800;
+
+ private final DagInfo dagInfo;
+
+ //Gap between various components
+ private static final int DAG_GAP = 70;
+ private static final int VERTEX_GAP = 50;
+ private static final int TASK_GAP = 5;
+ private static final int STROKE_WIDTH = 5;
+
+ //To compute the size of the graph.
+ private long MIN_X = Long.MAX_VALUE;
+ private long MAX_X = Long.MIN_VALUE;
+
+ private int x1 = 0;
+ private int y1 = 0;
+ private int y2 = 0;
+
+ public SVGUtils(DagInfo dagInfo) {
+ this.dagInfo = dagInfo;
+ this.objectFactory = new ObjectFactory();
+ this.svg = objectFactory.createSvg();
+ }
+
+ private Line createLine(int x1, int y1, int x2, int y2) {
+ Line line = objectFactory.createLine();
+ line.setX1(scaleDown(x1) + "");
+ line.setY1(y1 + "");
+ line.setX2(scaleDown(x2) + "");
+ line.setY2(y2 + "");
+ return line;
+ }
+
+ private Title createTitle(String msg) {
+ Title t = objectFactory.createTitle();
+ t.setContent(msg);
+ return t;
+ }
+
+ private Title createTitleForVertex(VertexInfo vertex) {
+ String titleStr = vertex.getVertexName() + ":"
+ + (vertex.getFinishTimeInterval())
+ + " ms, RelativeTimeToDAG:"
+ + (vertex.getInitTime() - this.dagInfo.getStartTime())
+ + " ms, counters:" + vertex.getTezCounters();
+ Title title = createTitle(titleStr);
+ return title;
+ }
+
+ private Title createTitleForTaskAttempt(TaskAttemptInfo taskAttemptInfo) {
+ String titleStr = "RelativeTimeToVertex:"
+ + (taskAttemptInfo.getStartTime() -
+ taskAttemptInfo.getTaskInfo().getVertexInfo().getInitTime()) +
+ " ms, " + taskAttemptInfo.toString() + ", counters:" + taskAttemptInfo.getTezCounters();
+ Title title = createTitle(titleStr);
+ return title;
+ }
+
+ /**
+ * Draw DAG from dagInfo
+ *
+ * @param dagInfo
+ */
+ private void drawDAG(DagInfo dagInfo) {
+ Title title = createTitle(dagInfo.getDagId() + " : " + dagInfo.getTimeTaken() + " ms");
+ int duration = (int) dagInfo.getFinishTimeInterval();
+ MAX_DAG_RUNTIME = duration;
+ MIN_X = Math.min(dagInfo.getStartTimeInterval(), MIN_X);
+ MAX_X = Math.max(dagInfo.getFinishTimeInterval(), MAX_X);
+ Line line = createLine(x1, y1, x1 + duration, y2);
+ line.getSVGDescriptionClass().add(new JAXBElement<Title>(titleName, Title.class, title));
+ line.setStyle("stroke: black; stroke-width:20");
+ line.setOpacity("0.3");
+ svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line);
+ drawVertex();
+ }
+
+ private Collection<VertexInfo> getSortedVertices() {
+ Collection<VertexInfo> vertices = this.dagInfo.getVertices();
+ // Add corresponding vertex details
+ TreeSet<VertexInfo> vertexSet = new TreeSet<VertexInfo>(
+ new Comparator<VertexInfo>() {
+ @Override
+ public int compare(VertexInfo o1, VertexInfo o2) {
+ return (int) (o1.getFirstTaskStartTimeInterval() - o2.getFirstTaskStartTimeInterval());
+ }
+ });
+ vertexSet.addAll(vertices);
+ return vertexSet;
+ }
+
+ private Collection<TaskInfo> getSortedTasks(VertexInfo vertexInfo) {
+ Collection<TaskInfo> tasks = vertexInfo.getTasks();
+ // Add corresponding task details
+ TreeSet<TaskInfo> taskSet = new TreeSet<TaskInfo>(new Comparator<TaskInfo>() {
+ @Override
+ public int compare(TaskInfo o1, TaskInfo o2) {
+ return (int) (o1.getSuccessfulTaskAttempt().getStartTimeInterval()
+ - o2.getSuccessfulTaskAttempt().getStartTimeInterval());
+ }
+ });
+ taskSet.addAll(tasks);
+ return taskSet;
+ }
+
+ /**
+ * Draw the vertices
+ *
+ */
+ public void drawVertex() {
+ Collection<VertexInfo> vertices = getSortedVertices();
+ for (VertexInfo vertex : vertices) {
+ //Set vertex start time as the one when its first task attempt started executing
+ x1 = (int) vertex.getStartTimeInterval();
+ y1 += VERTEX_GAP;
+ int duration = ((int) (vertex.getTimeTaken()));
+ Line line = createLine(x1, y1, x1 + duration, y1);
+ line.setStyle("stroke: red; stroke-width:" + STROKE_WIDTH);
+ line.setOpacity("0.3");
+
+ Title vertexTitle = createTitleForVertex(vertex);
+ line.getSVGDescriptionClass().add(
+ new JAXBElement<Title>(titleName, Title.class, vertexTitle));
+ svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line);
+ // For each vertex, draw the tasks
+ drawTask(vertex);
+ }
+ x1 = x1 + (int) dagInfo.getFinishTimeInterval();
+ y1 = y1 + DAG_GAP;
+ y2 = y1;
+ }
+
+ /**
+ * Draw tasks
+ *
+ * @param vertex
+ */
+ public void drawTask(VertexInfo vertex) {
+ Collection<TaskInfo> tasks = getSortedTasks(vertex);
+ for (TaskInfo task : tasks) {
+ for (TaskAttemptInfo taskAttemptInfo : task.getTaskAttempts()) {
+ x1 = (int) taskAttemptInfo.getStartTimeInterval();
+ y1 += TASK_GAP;
+ int duration = (int) taskAttemptInfo.getTimeTaken();
+ Line line = createLine(x1, y1, x1 + duration, y1);
+ String color =
+ taskAttemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.name())
+ ? "green" : "red";
+ line.setStyle("stroke: " + color + "; stroke-width:" + STROKE_WIDTH);
+ Title title = createTitleForTaskAttempt(taskAttemptInfo);
+ line.getSVGDescriptionClass().add(
+ new JAXBElement<Title>(titleName, Title.class, title));
+ svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass()
+ .add(line);
+ }
+ }
+ }
+
+ /**
+ * Convert DAG to graph
+ *
+ * @throws java.io.IOException
+ * @throws javax.xml.bind.JAXBException
+ */
+ public void saveAsSVG(String fileName) throws IOException, JAXBException {
+ drawDAG(dagInfo);
+ svg.setHeight("" + y2);
+ svg.setWidth("" + (MAX_X - MIN_X));
+ String tempFileName = System.nanoTime() + ".svg";
+ File file = new File(tempFileName);
+ JAXBContext jaxbContext = JAXBContext.newInstance(Svg.class);
+ Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
+ jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+ jaxbMarshaller.marshal(svg, file);
+ //TODO: dirty workaround to get rid of XMLRootException issue
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(new FileInputStream(file), UTF8));
+ BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(new FileOutputStream(fileName), UTF8));
+ try {
+ while (reader.ready()) {
+ String line = reader.readLine();
+ if (line != null) {
+ line = line.replaceAll(
+ " xmlns:ns3=\"http://www.w3.org/2000/svg\" xmlns=\"\"", "");
+ writer.write(line);
+ writer.newLine();
+ }
+ }
+ } finally {
+ IOUtils.closeQuietly(reader);
+ IOUtils.closeQuietly(writer);
+ if (file.exists()) {
+ boolean deleted = file.delete();
+ LOG.debug("Deleted {}" + file.getAbsolutePath());
+ }
+ }
+ }
+
+ private float scaleDown(int len) {
+ return (len * 1.0f / MAX_DAG_RUNTIME) * SCREEN_WIDTH;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
new file mode 100644
index 0000000..8bcf265
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.utils;
+
+import com.sun.istack.Nullable;
+import org.apache.tez.dag.utils.Graph;
+import org.apache.tez.history.parser.datamodel.AdditionalInputOutputDetails;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.EdgeInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Utils {
+
+ private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+");
+
+ public static String getShortClassName(String className) {
+ int pos = className.lastIndexOf(".");
+ if (pos != -1 && pos < className.length() - 1) {
+ return className.substring(pos + 1);
+ }
+ return className;
+ }
+
+ public static String sanitizeLabelForViz(String label) {
+ Matcher m = sanitizeLabelPattern.matcher(label);
+ return m.replaceAll("_");
+ }
+
+ public static void generateDAGVizFile(DagInfo dagInfo, String fileName,
+ @Nullable List<String> criticalVertices) throws IOException {
+ Graph graph = new Graph(sanitizeLabelForViz(dagInfo.getName()));
+
+ for (VertexInfo v : dagInfo.getVertices()) {
+ String nodeLabel = sanitizeLabelForViz(v.getVertexName())
+ + "[" + getShortClassName(v.getProcessorClassName()
+ + ", tasks=" + v.getTasks().size() + ", time=" + v.getTimeTaken() +" ms]");
+ Graph.Node n = graph.newNode(sanitizeLabelForViz(v.getVertexName()), nodeLabel);
+
+ boolean criticalVertex = (criticalVertices != null) ? criticalVertices.contains(v
+ .getVertexName()) : false;
+ if (criticalVertex) {
+ n.setColor("red");
+ }
+
+
+ for (AdditionalInputOutputDetails input : v.getAdditionalInputInfoList()) {
+ Graph.Node inputNode = graph.getNode(sanitizeLabelForViz(v.getVertexName())
+ + "_" + sanitizeLabelForViz(input.getName()));
+ inputNode.setLabel(sanitizeLabelForViz(v.getVertexName())
+ + "[" + sanitizeLabelForViz(input.getName()) + "]");
+ inputNode.setShape("box");
+ inputNode.addEdge(n, "Input name=" + input.getName()
+ + " [inputClass=" + getShortClassName(input.getClazz())
+ + ", initializer=" + getShortClassName(input.getInitializer()) + "]");
+ }
+ for (AdditionalInputOutputDetails output : v.getAdditionalOutputInfoList()) {
+ Graph.Node outputNode = graph.getNode(sanitizeLabelForViz(v.getVertexName())
+ + "_" + sanitizeLabelForViz(output.getName()));
+ outputNode.setLabel(sanitizeLabelForViz(v.getVertexName())
+ + "[" + sanitizeLabelForViz(output.getName()) + "]");
+ outputNode.setShape("box");
+ n.addEdge(outputNode, "Output name=" + output.getName()
+ + " [outputClass=" + getShortClassName(output.getClazz())
+ + ", committer=" + getShortClassName(output.getInitializer()) + "]");
+ }
+
+ }
+
+ for (EdgeInfo e : dagInfo.getEdges()) {
+ Graph.Node n = graph.getNode(sanitizeLabelForViz(e.getInputVertexName()));
+ n.addEdge(graph.getNode(sanitizeLabelForViz(e.getOutputVertexName())),
+ "[input=" + getShortClassName(e.getEdgeSourceClass())
+ + ", output=" + getShortClassName(e.getEdgeDestinationClass())
+ + ", dataMovement=" + e.getDataMovementType().trim() + "]");
+ }
+
+ graph.save(fileName);
+ }
+}
[2/2] tez git commit: TEZ-2692. bugfixes & enhancements related to
job parser and analyzer (rbalamohan)
Posted by rb...@apache.org.
TEZ-2692. bugfixes & enhancements related to job parser and analyzer (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ecd90dc1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ecd90dc1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ecd90dc1
Branch: refs/heads/master
Commit: ecd90dc1f79a4b1614174b75030b85feb8842793
Parents: eadbfec
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Aug 11 18:19:55 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue Aug 11 18:19:55 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/tez/dag/utils/Graph.java | 15 +-
.../tez/history/parser/SimpleHistoryParser.java | 239 ++++++
.../history/parser/datamodel/VertexInfo.java | 65 +-
.../apache/tez/history/TestATSFileParser.java | 587 --------------
.../apache/tez/history/TestHistoryParser.java | 785 +++++++++++++++++++
tez-tools/analyzers/job-analyzer/pom.xml | 9 +
.../java/org/apache/tez/analyzer/CSVResult.java | 5 +-
.../analyzer/plugins/CriticalPathAnalyzer.java | 44 +-
.../tez/analyzer/plugins/LocalityAnalyzer.java | 3 +-
.../analyzer/plugins/ShuffleTimeAnalyzer.java | 60 +-
.../tez/analyzer/plugins/SkewAnalyzer.java | 4 +
.../analyzer/plugins/SlowTaskIdentifier.java | 13 +-
.../analyzer/plugins/SlowestVertexAnalyzer.java | 52 +-
.../tez/analyzer/plugins/SpillAnalyzerImpl.java | 15 +-
.../plugins/TaskConcurrencyAnalyzer.java | 138 ++++
.../org/apache/tez/analyzer/utils/SVGUtils.java | 264 +++++++
.../org/apache/tez/analyzer/utils/Utils.java | 100 +++
18 files changed, 1751 insertions(+), 648 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b37eb9e..3de9fb7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@ INCOMPATIBLE CHANGES
TEZ-2699. Internalize strings in ATF parser
ALL CHANGES:
+ TEZ-2692. bugfixes & enhancements related to job parser and analyzer.
TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM.
TEZ-2630. TezChild receives IP address instead of FQDN.
TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized.
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
index 6de9c59..eb2bd41 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
@@ -62,6 +62,7 @@ public class Graph {
List<Edge> outs;
String label;
String shape;
+ String color;
public Node(String id) {
this(id, null);
@@ -104,6 +105,10 @@ public class Graph {
public void setShape(String shape) {
this.shape = shape;
}
+
+ public void setColor(String color) {
+ this.color = color;
+ }
}
private String name;
@@ -196,17 +201,19 @@ public class Graph {
for (Node n : nodes) {
if (n.shape != null && !n.shape.isEmpty()) {
sb.append(String.format(
- "%s%s [ label = %s, shape = %s ];",
+ "%s%s [ label = %s, shape = %s , color= %s];",
indent,
wrapSafeString(n.getUniqueId()),
wrapSafeString(n.getLabel()),
- wrapSafeString(n.shape)));
+ wrapSafeString(n.shape),
+ wrapSafeString(n.color == null ? "black" : n.color)));
} else {
sb.append(String.format(
- "%s%s [ label = %s ];",
+ "%s%s [ label = %s , color= %s ];",
indent,
wrapSafeString(n.getUniqueId()),
- wrapSafeString(n.getLabel())));
+ wrapSafeString(n.getLabel()),
+ wrapSafeString(n.color == null ? "black" : n.color)));
}
sb.append(System.getProperty("line.separator"));
List<Edge> combinedOuts = combineEdges(n.outs);
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
new file mode 100644
index 0000000..09c010a
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.history.parser;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.history.parser.datamodel.BaseParser;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Scanner;
+
+/**
+ * Parser utility to parse data generated by SimpleHistoryLogging to in-memory datamodel provided
+ * in org.apache.tez.history.parser.datamodel
+ * <p/>
+ * <p/>
+ * Most of the information should be available. Minor info like VersionInfo may not be available,
+ * as it is not captured in SimpleHistoryLogging.
+ */
+public class SimpleHistoryParser extends BaseParser {
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleHistoryParser.class);
+ private static final String UTF8 = "UTF-8";
+ private final File historyFile;
+
+
+ public SimpleHistoryParser(File historyFile) {
+ super();
+ Preconditions.checkArgument(historyFile.exists(), historyFile + " does not exist");
+ this.historyFile = historyFile;
+ }
+
+ /**
+ * Get in-memory representation of DagInfo
+ *
+ * @return DagInfo
+ * @throws TezException
+ */
+ public DagInfo getDAGData(String dagId) throws TezException {
+ try {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(dagId), "Please provide valid dagId");
+ dagId = dagId.trim();
+ parseContents(historyFile, dagId);
+ linkParsedContents();
+ return dagInfo;
+ } catch (IOException e) {
+ LOG.error("Error in reading DAG ", e);
+ throw new TezException(e);
+ } catch (JSONException e) {
+ LOG.error("Error in parsing DAG ", e);
+ throw new TezException(e);
+ }
+ }
+
+ private void populateOtherInfo(JSONObject source, JSONObject destination) throws JSONException {
+ if (source == null || destination == null) {
+ return;
+ }
+ for (Iterator it = source.keys(); it.hasNext(); ) {
+ String key = (String) it.next();
+ Object val = source.get(key);
+ destination.put(key, val);
+ }
+ }
+
+ private void populateOtherInfo(JSONObject source, String entityName,
+ Map<String, JSONObject> destMap) throws JSONException {
+ JSONObject destinationJson = destMap.get(entityName);
+ JSONObject destOtherInfo = destinationJson.getJSONObject(Constants.OTHER_INFO);
+ populateOtherInfo(source, destOtherInfo);
+ }
+
+ private void parseContents(File historyFile, String dagId)
+ throws JSONException, FileNotFoundException, TezException {
+ Scanner scanner = new Scanner(historyFile, UTF8);
+ scanner.useDelimiter(SimpleHistoryLoggingService.RECORD_SEPARATOR);
+ JSONObject dagJson = null;
+ Map<String, JSONObject> vertexJsonMap = Maps.newHashMap();
+ Map<String, JSONObject> taskJsonMap = Maps.newHashMap();
+ Map<String, JSONObject> attemptJsonMap = Maps.newHashMap();
+ TezDAGID tezDAGID = TezDAGID.fromString(dagId);
+ while (scanner.hasNext()) {
+ String line = scanner.next();
+ JSONObject jsonObject = new JSONObject(line);
+ String entity = jsonObject.getString(Constants.ENTITY);
+ String entityType = jsonObject.getString(Constants.ENTITY_TYPE);
+ switch (entityType) {
+ case Constants.TEZ_DAG_ID:
+ if (!dagId.equals(entity)) {
+ LOG.warn(dagId + " is not matching with " + entity);
+ continue;
+ }
+ // Club all DAG related information together (DAG_INIT, DAG_FINISH etc). Each of them
+ // would have a set of entities in otherinfo (e.g vertex mapping, dagPlan, start/finish
+ // time etc).
+ if (dagJson == null) {
+ dagJson = jsonObject;
+ }
+ JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+ JSONObject dagOtherInfo = dagJson.getJSONObject(Constants.OTHER_INFO);
+ populateOtherInfo(otherInfo, dagOtherInfo);
+ break;
+ case Constants.TEZ_VERTEX_ID:
+ String vertexName = entity;
+ TezVertexID tezVertexID = TezVertexID.fromString(vertexName);
+ if (!tezDAGID.equals(tezVertexID.getDAGId())) {
+ LOG.warn(vertexName + " does not belong to " + tezDAGID);
+ continue;
+ }
+ if (!vertexJsonMap.containsKey(vertexName)) {
+ vertexJsonMap.put(vertexName, jsonObject);
+ }
+ otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+ populateOtherInfo(otherInfo, vertexName, vertexJsonMap);
+ break;
+ case Constants.TEZ_TASK_ID:
+ String taskName = entity;
+ TezTaskID tezTaskID = TezTaskID.fromString(taskName);
+ if (!tezDAGID.equals(tezTaskID.getVertexID().getDAGId())) {
+ LOG.warn(taskName + " does not belong to " + tezDAGID);
+ continue;
+ }
+ if (!taskJsonMap.containsKey(taskName)) {
+ taskJsonMap.put(taskName, jsonObject);
+ }
+ otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+ populateOtherInfo(otherInfo, taskName, taskJsonMap);
+ break;
+ case Constants.TEZ_TASK_ATTEMPT_ID:
+ String taskAttemptName = entity;
+ TezTaskAttemptID tezAttemptId = TezTaskAttemptID.fromString(taskAttemptName);
+ if (!tezDAGID.equals(tezAttemptId.getTaskID().getVertexID().getDAGId())) {
+ LOG.warn(taskAttemptName + " does not belong to " + tezDAGID);
+ continue;
+ }
+ if (!attemptJsonMap.containsKey(taskAttemptName)) {
+ attemptJsonMap.put(taskAttemptName, jsonObject);
+ }
+ otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+ populateOtherInfo(otherInfo, taskAttemptName, attemptJsonMap);
+ break;
+ default:
+ break;
+ }
+ }
+ scanner.close();
+ if (dagJson != null) {
+ this.dagInfo = DagInfo.create(dagJson);
+ } else {
+ LOG.error("Dag is not yet parsed. Looks like partial file.");
+ throw new TezException(
+ "Please provide a valid/complete history log file containing " + dagId);
+ }
+ for (JSONObject jsonObject : vertexJsonMap.values()) {
+ VertexInfo vertexInfo = VertexInfo.create(jsonObject);
+ this.vertexList.add(vertexInfo);
+ LOG.debug("Parsed vertex {}", vertexInfo.getVertexName());
+ }
+ for (JSONObject jsonObject : taskJsonMap.values()) {
+ TaskInfo taskInfo = TaskInfo.create(jsonObject);
+ this.taskList.add(taskInfo);
+ LOG.debug("Parsed task {}", taskInfo.getTaskId());
+ }
+ for (JSONObject jsonObject : attemptJsonMap.values()) {
+ /**
+ * For converting SimpleHistoryLogging to in-memory representation
+ *
+ * We need to get "relatedEntities":[{"entity":"cn055-10.l42scl.hortonworks.com:58690",
+ * "entitytype":"nodeId"},{"entity":"container_1438652049951_0008_01_000152",
+ * "entitytype":"containerId"} and populate it in otherInfo object so that in-memory
+ * representation can parse it correctly
+ */
+ JSONObject subJsonObject = jsonObject.optJSONArray(Constants.RELATED_ENTITIES)
+ .optJSONObject(0);
+ if (subJsonObject != null) {
+ String nodeId = subJsonObject.optString(Constants.ENTITY_TYPE);
+ if (!Strings.isNullOrEmpty(nodeId) && nodeId.equalsIgnoreCase(Constants.NODE_ID)) {
+ //populate it in otherInfo
+ JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+ String nodeIdVal = subJsonObject.optString(Constants.ENTITY);
+ if (otherInfo != null && nodeIdVal != null) {
+ otherInfo.put(Constants.NODE_ID, nodeIdVal);
+ }
+ }
+ }
+
+ subJsonObject = jsonObject.optJSONArray(Constants.RELATED_ENTITIES)
+ .optJSONObject(1);
+ if (subJsonObject != null) {
+ String containerId = subJsonObject.optString(Constants.ENTITY_TYPE);
+ if (!Strings.isNullOrEmpty(containerId) && containerId.equalsIgnoreCase(Constants.CONTAINER_ID)) {
+ //populate it in otherInfo
+ JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+ String containerIdVal = subJsonObject.optString(Constants.ENTITY);
+ if (otherInfo != null && containerIdVal != null) {
+ otherInfo.put(Constants.CONTAINER_ID, containerIdVal);
+ }
+ }
+ }
+ TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(jsonObject);
+ this.attemptList.add(attemptInfo);
+ LOG.debug("Parsed task attempt {}", attemptInfo.getTaskAttemptId());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
index d2dac7d..6e227a5 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
@@ -33,6 +33,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
+import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -164,13 +165,25 @@ public class VertexInfo extends BaseInfo {
updateEdgeInfo();
}
+ public List<AdditionalInputOutputDetails> getAdditionalInputInfoList() {
+ return Collections.unmodifiableList(additionalInputInfoList);
+ }
+
+ public List<AdditionalInputOutputDetails> getAdditionalOutputInfoList() {
+ return Collections.unmodifiableList(additionalOutputInfoList);
+ }
+
@Override
public final long getStartTimeInterval() {
return startTime - (dagInfo.getStartTime());
}
public final long getFirstTaskStartTimeInterval() {
- return getFirstTaskToStart().getStartTimeInterval();
+ TaskInfo firstTask = getFirstTaskToStart();
+ if (firstTask == null) {
+ return 0;
+ }
+ return firstTask.getStartTimeInterval();
}
public final long getLastTaskFinishTimeInterval() {
@@ -270,14 +283,32 @@ public class VertexInfo extends BaseInfo {
}
+
+ private List<TaskInfo> getTasksInternal() {
+ return Lists.newLinkedList(taskInfoMap.values());
+ }
+
/**
* Get all tasks
*
* @return list of taskInfo
*/
public final List<TaskInfo> getTasks() {
- List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values());
- Collections.sort(taskInfoList, orderingOnStartTime());
+ return Collections.unmodifiableList(getTasksInternal());
+ }
+
+ /**
+ * Get all tasks in sorted order
+ *
+ * @param sorted
+ * @param ordering
+ * @return list of TaskInfo
+ */
+ public final List<TaskInfo> getTasks(boolean sorted, @Nullable Ordering<TaskInfo> ordering) {
+ List<TaskInfo> taskInfoList = getTasksInternal();
+ if (sorted) {
+ Collections.sort(taskInfoList, ((ordering == null) ? orderingOnStartTime() : ordering));
+ }
return Collections.unmodifiableList(taskInfoList);
}
@@ -352,12 +383,36 @@ public class VertexInfo extends BaseInfo {
return Collections.unmodifiableList(outputVertices);
}
- public List<TaskAttemptInfo> getTaskAttempts() {
+ private List<TaskAttemptInfo> getTaskAttemptsInternal() {
List<TaskAttemptInfo> taskAttemptInfos = Lists.newLinkedList();
for (TaskInfo taskInfo : getTasks()) {
taskAttemptInfos.addAll(taskInfo.getTaskAttempts());
}
- Collections.sort(taskAttemptInfos, orderingOnAttemptStartTime());
+ return taskAttemptInfos;
+ }
+
+ /**
+ * Get all task attempts
+ *
+ * @return List<TaskAttemptInfo> list of attempts
+ */
+ public List<TaskAttemptInfo> getTaskAttempts() {
+ return Collections.unmodifiableList(getTaskAttemptsInternal());
+ }
+
+ /**
+ * Get all task attempts in sorted order
+ *
+ * @param sorted
+ * @param ordering
+ * @return list of TaskAttemptInfo
+ */
+ public final List<TaskAttemptInfo> getTaskAttempts(boolean sorted,
+ @Nullable Ordering<TaskAttemptInfo> ordering) {
+ List<TaskAttemptInfo> taskAttemptInfos = getTaskAttemptsInternal();
+ if (sorted) {
+ Collections.sort(taskAttemptInfos, ((ordering == null) ? orderingOnAttemptStartTime() : ordering));
+ }
return Collections.unmodifiableList(taskAttemptInfos);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
deleted file mode 100644
index 0d76e03..0000000
--- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
+++ /dev/null
@@ -1,587 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.history;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.common.counters.DAGCounter;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DataSinkDescriptor;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.dag.api.event.VertexState;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.app.dag.DAGState;
-import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.examples.WordCount;
-import org.apache.tez.history.parser.ATSFileParser;
-import org.apache.tez.history.parser.datamodel.DagInfo;
-import org.apache.tez.history.parser.datamodel.EdgeInfo;
-import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
-import org.apache.tez.history.parser.datamodel.TaskInfo;
-import org.apache.tez.history.parser.datamodel.VersionInfo;
-import org.apache.tez.history.parser.datamodel.VertexInfo;
-import org.apache.tez.mapreduce.input.MRInput;
-import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
-import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
-import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-import org.apache.tez.tests.MiniTezClusterWithTimeline;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestATSFileParser {
-
- private static MiniDFSCluster miniDFSCluster;
- private static MiniTezClusterWithTimeline miniTezCluster;
-
- //location within miniHDFS cluster's hdfs
- private static Path inputLoc = new Path("/tmp/sample.txt");
-
- private final static String INPUT = "Input";
- private final static String OUTPUT = "Output";
- private final static String TOKENIZER = "Tokenizer";
- private final static String SUMMATION = "Summation";
-
- private static Configuration conf = new Configuration();
- private static FileSystem fs;
- private static String TEST_ROOT_DIR =
- "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tmpDir";
- private static String TEZ_BASE_DIR =
- "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tez";
- private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download";
-
- private static TezClient tezClient;
-
- private static int dagNumber;
-
- @BeforeClass
- public static void setupCluster() throws Exception {
- conf = new Configuration();
- conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false);
- EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
- miniDFSCluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
- fs = miniDFSCluster.getFileSystem();
- conf.set("fs.defaultFS", fs.getUri().toString());
-
- setupTezCluster();
- }
-
- @AfterClass
- public static void shutdownCluster() {
- try {
- if (tezClient != null) {
- try {
- tezClient.stop();
- } catch (TezException e) {
- //ignore
- } catch (IOException e) {
- //ignore
- }
- }
- if (miniDFSCluster != null) {
- miniDFSCluster.shutdown();
- }
- if (miniTezCluster != null) {
- miniTezCluster.stop();
- }
- } finally {
- try {
- FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
- FileUtils.deleteDirectory(new File(TEZ_BASE_DIR));
- } catch (IOException e) {
- //safe to ignore
- }
- }
- }
-
- // @Before
- public static void setupTezCluster() throws Exception {
- conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000);
- conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000);
- conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2);
-
- //Enable per edge counters
- conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true);
- conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
- conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
- conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService
- .class.getName());
-
- miniTezCluster =
- new MiniTezClusterWithTimeline(TEZ_BASE_DIR, 1, 1, 1, true);
-
- miniTezCluster.init(conf);
- miniTezCluster.start();
-
- createSampleFile(inputLoc);
-
- TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
- tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
- tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188");
- tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
- tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
- ATSHistoryLoggingService.class.getName());
-
- tezClient = TezClient.create("WordCount", tezConf, true);
- tezClient.start();
- tezClient.waitTillReady();
- }
-
-
- /**
- * Run a word count example in mini cluster and check if it is possible to download
- * data from ATS and parse it.
- *
- * @throws Exception
- */
- @Test
- public void testParserWithSuccessfulJob() throws Exception {
- //Run basic word count example.
- String dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
- WordCount.SumProcessor.class.getName(), "WordCount");
-
- //Export the data from ATS
- String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
-
- int result = ATSImportTool.process(args);
- assertTrue(result == 0);
-
- //Parse ATS data
- DagInfo dagInfo = getDagInfo(dagId);
-
- //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
- verifyDagInfo(dagInfo);
-
- //Job specific
- assertTrue(dagInfo.getNumVertices() == 2);
- assertTrue(dagInfo.getName().equals("WordCount"));
- assertTrue(dagInfo.getVertex(TOKENIZER).getProcessorClassName().equals(
- WordCount.TokenProcessor.class.getName()));
- assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName()
- .equals(WordCount.SumProcessor.class.getName()));
- assertTrue(dagInfo.getEdges().size() == 1);
- EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next();
- assertTrue(edgeInfo.getDataMovementType().
- equals(EdgeProperty.DataMovementType.SCATTER_GATHER.toString()));
- assertTrue(edgeInfo.getSourceVertex().getVertexName().equals(TOKENIZER));
- assertTrue(edgeInfo.getDestinationVertex().getVertexName().equals(SUMMATION));
- assertTrue(edgeInfo.getInputVertexName().equals(TOKENIZER));
- assertTrue(edgeInfo.getOutputVertexName().equals(SUMMATION));
- assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName()));
- assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName()));
- assertTrue(dagInfo.getVertices().size() == 2);
- String lastSourceTA = null;
- String lastDataEventSourceTA = null;
- for (VertexInfo vertexInfo : dagInfo.getVertices()) {
- assertTrue(vertexInfo.getKilledTasksCount() == 0);
- assertTrue(vertexInfo.getInitRequestedTime() > 0);
- assertTrue(vertexInfo.getInitTime() > 0);
- assertTrue(vertexInfo.getStartRequestedTime() > 0);
- assertTrue(vertexInfo.getStartTime() > 0);
- assertTrue(vertexInfo.getFinishTime() > 0);
- long finishTime = 0;
- for (TaskInfo taskInfo : vertexInfo.getTasks()) {
- assertTrue(taskInfo.getNumberOfTaskAttempts() == 1);
- assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0);
- assertTrue(taskInfo.getMinTaskAttemptDuration() >= 0);
- assertTrue(taskInfo.getAvgTaskAttemptDuration() >= 0);
- assertTrue(taskInfo.getLastTaskAttemptToFinish() != null);
- assertTrue(taskInfo.getContainersMapping().size() > 0);
- assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
- assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
- assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
- List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts();
- if (vertexInfo.getVertexName().equals(TOKENIZER)) {
- // get the last task to finish and track its successful attempt
- if (finishTime < taskInfo.getFinishTime()) {
- finishTime = taskInfo.getFinishTime();
- lastSourceTA = taskInfo.getSuccessfulAttemptId();
- }
- } else {
- for (TaskAttemptInfo attempt : attempts) {
- assertTrue(attempt.getLastDataEventTime() > 0);
- if (lastDataEventSourceTA == null) {
- lastDataEventSourceTA = attempt.getLastDataEventSourceTA();
- } else {
- // all attempts should have the same last data event source TA
- assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA()));
- }
- }
- }
- for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
- assertTrue(attemptInfo.getStartTimeInterval() > 0);
- assertTrue(attemptInfo.getScheduledTimeInterval() > 0);
- }
- }
- assertTrue(vertexInfo.getLastTaskToFinish() != null);
- if (vertexInfo.getVertexName().equals(TOKENIZER)) {
- assertTrue(vertexInfo.getInputEdges().size() == 0);
- assertTrue(vertexInfo.getOutputEdges().size() == 1);
- assertTrue(vertexInfo.getOutputVertices().size() == 1);
- assertTrue(vertexInfo.getInputVertices().size() == 0);
- } else {
- assertTrue(vertexInfo.getInputEdges().size() == 1);
- assertTrue(vertexInfo.getOutputEdges().size() == 0);
- assertTrue(vertexInfo.getOutputVertices().size() == 0);
- assertTrue(vertexInfo.getInputVertices().size() == 1);
- }
- }
- assertTrue(lastSourceTA.equals(lastDataEventSourceTA));
- }
-
- /**
- * Run a word count example in mini cluster.
- * Provide invalid URL for ATS.
- *
- * @throws Exception
- */
- @Test
- public void testParserWithSuccessfulJob_InvalidATS() throws Exception {
- //Run basic word count example.
- String dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
- WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL");
-
- //Export the data from ATS
- String atsAddress = "--atsAddress=http://atsHost:8188";
- String[] args = { "--dagId=" + dagId,
- "--downloadDir=" + DOWNLOAD_DIR,
- atsAddress
- };
-
- int result = ATSImportTool.process(args);
- assertTrue(result == -1);
- }
-
- /**
- * Run a failed job and parse the data from ATS
- */
- @Test
- public void testParserWithFailedJob() throws Exception {
- //Run a job which would fail
- String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), FailProcessor.class
- .getName(), "WordCount-With-Exception");
-
- //Export the data from ATS
- String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
-
- int result = ATSImportTool.process(args);
- assertTrue(result == 0);
-
- //Parse ATS data
- DagInfo dagInfo = getDagInfo(dagId);
-
- //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
- verifyDagInfo(dagInfo);
-
- //Dag specific
- VertexInfo summationVertex = dagInfo.getVertex(SUMMATION);
- assertTrue(summationVertex.getFailedTasks().size() == 1); //1 task, 4 attempts failed
- assertTrue(summationVertex.getFailedTasks().get(0).getFailedTaskAttempts().size() == 4);
- assertTrue(summationVertex.getStatus().equals(VertexState.FAILED.toString()));
-
- assertTrue(dagInfo.getFailedVertices().size() == 1);
- assertTrue(dagInfo.getFailedVertices().get(0).getVertexName().equals(SUMMATION));
- assertTrue(dagInfo.getSuccessfullVertices().size() == 1);
- assertTrue(dagInfo.getSuccessfullVertices().get(0).getVertexName().equals(TOKENIZER));
-
- assertTrue(dagInfo.getStatus().equals(DAGState.FAILED.toString()));
-
- verifyCounter(dagInfo.getCounter(DAGCounter.NUM_FAILED_TASKS.toString()), null, 4);
- verifyCounter(dagInfo.getCounter(DAGCounter.NUM_SUCCEEDED_TASKS.toString()), null, 1);
- verifyCounter(dagInfo.getCounter(DAGCounter.TOTAL_LAUNCHED_TASKS.toString()), null, 5);
-
- verifyCounter(dagInfo.getCounter(TaskCounter.INPUT_RECORDS_PROCESSED.toString()),
- "TaskCounter_Tokenizer_INPUT_Input", 10);
- verifyCounter(dagInfo.getCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ.toString()),
- "TaskCounter_Tokenizer_OUTPUT_Summation", 0);
- verifyCounter(dagInfo.getCounter(TaskCounter.OUTPUT_RECORDS.toString()),
- "TaskCounter_Tokenizer_OUTPUT_Summation",
- 20); //Every line has 2 words. 10 lines x 2 words = 20
- verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()),
- "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above
-
- for (TaskInfo taskInfo : summationVertex.getTasks()) {
- String lastAttemptId = null;
- for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
- if (lastAttemptId != null) {
- // failed attempt should be causal TA of next attempt
- assertTrue(lastAttemptId.equals(attemptInfo.getSchedulingCausalTA()));
- }
- lastAttemptId = attemptInfo.getTaskAttemptId();
- }
- }
-
- //TODO: Need to check for SUMMATION vertex counters. Since all attempts are failed, counters are not getting populated.
- //TaskCounter.REDUCE_INPUT_RECORDS
-
- //Verify if the processor exception is given in diagnostics
- assertTrue(dagInfo.getDiagnostics().contains("Failing this processor for some reason"));
-
- }
-
- /**
- * Create sample file for wordcount program
- *
- * @param inputLoc
- * @throws IOException
- */
- private static void createSampleFile(Path inputLoc) throws IOException {
- fs.deleteOnExit(inputLoc);
- FSDataOutputStream out = fs.create(inputLoc);
- BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
- for (int i = 0; i < 10; i++) {
- writer.write("Sample " + RandomStringUtils.randomAlphanumeric(5));
- writer.newLine();
- }
- writer.close();
- }
-
- private DagInfo getDagInfo(String dagId) throws TezException {
- //Parse downloaded contents
- File downloadedFile = new File(DOWNLOAD_DIR
- + Path.SEPARATOR + dagId
- + Path.SEPARATOR + dagId + ".zip");
- ATSFileParser parser = new ATSFileParser(downloadedFile);
- DagInfo dagInfo = parser.getDAGData(dagId);
- assertTrue(dagInfo.getDagId().equals(dagId));
- return dagInfo;
- }
-
- private void verifyCounter(Map<String, TezCounter> counterMap,
- String counterGroupName, long expectedVal) {
- //Iterate through group-->tezCounter
- for (Map.Entry<String, TezCounter> entry : counterMap.entrySet()) {
- if (counterGroupName != null) {
- if (entry.getKey().equals(counterGroupName)) {
- assertTrue(entry.getValue().getValue() == expectedVal);
- }
- } else {
- assertTrue(entry.getValue().getValue() == expectedVal);
- }
- }
- }
-
- private String runWordCount(String tokenizerProcessor, String summationProcessor,
- String dagName)
- throws Exception {
- dagNumber++;
-
- //HDFS path
- Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis());
-
- DataSourceDescriptor dataSource = MRInput.createConfigBuilder(conf,
- TextInputFormat.class, inputLoc.toString()).build();
-
- DataSinkDescriptor dataSink =
- MROutput.createConfigBuilder(conf, TextOutputFormat.class, outputLoc.toString()).build();
-
- Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create(
- tokenizerProcessor)).addDataSource(INPUT, dataSource);
-
- OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig
- .newBuilder(Text.class.getName(), IntWritable.class.getName(),
- HashPartitioner.class.getName()).build();
-
- Vertex summationVertex = Vertex.create(SUMMATION,
- ProcessorDescriptor.create(summationProcessor), 1).addDataSink(OUTPUT, dataSink);
-
- // Create DAG and add the vertices. Connect the producer and consumer vertices via the edge
- DAG dag = DAG.create(dagName);
- dag.addVertex(tokenizerVertex).addVertex(summationVertex).addEdge(
- Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty()));
-
- DAGClient client = tezClient.submitDAG(dag);
- client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
- TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), dagNumber);
-
- return tezDAGID.toString();
- }
-
- /**
- * Processor which would just throw exception.
- */
- public static class FailProcessor extends SimpleMRProcessor {
- public FailProcessor(ProcessorContext context) {
- super(context);
- }
-
- @Override
- public void run() throws Exception {
- throw new Exception("Failing this processor for some reason");
- }
- }
-
- private void verifyDagInfo(DagInfo dagInfo) {
- VersionInfo versionInfo = dagInfo.getVersionInfo();
- assertTrue(versionInfo != null); //should be present post 0.5.4
- assertTrue(versionInfo.getVersion() != null);
- assertTrue(versionInfo.getRevision() != null);
- assertTrue(versionInfo.getBuildTime() != null);
-
- assertTrue(dagInfo.getStartTime() > 0);
- assertTrue(dagInfo.getFinishTimeInterval() > 0);
- assertTrue(dagInfo.getStartTimeInterval() == 0);
- assertTrue(dagInfo.getStartTime() > 0);
- if (dagInfo.getStatus().equalsIgnoreCase(DAGState.SUCCEEDED.toString())) {
- assertTrue(dagInfo.getFinishTime() >= dagInfo.getStartTime());
- }
- assertTrue(dagInfo.getFinishTimeInterval() > dagInfo.getStartTimeInterval());
-
- assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime());
- assertTrue(dagInfo.getTimeTaken() > 0);
-
- //Verify all vertices
- for (VertexInfo vertexInfo : dagInfo.getVertices()) {
- verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0);
- }
-
- VertexInfo fastestVertex = dagInfo.getFastestVertex();
- assertTrue(fastestVertex != null);
-
- if (dagInfo.getStatus().equals(DAGState.SUCCEEDED)) {
- assertTrue(dagInfo.getSlowestVertex() != null);
- }
- }
-
- private void verifyVertex(VertexInfo vertexInfo, boolean hasFailedTasks) {
- assertTrue(vertexInfo != null);
- if (hasFailedTasks) {
- assertTrue(vertexInfo.getFailedTasksCount() > 0);
- }
- assertTrue(vertexInfo.getStartTimeInterval() > 0);
- assertTrue(vertexInfo.getStartTime() > 0);
- assertTrue(vertexInfo.getFinishTimeInterval() > 0);
- assertTrue(vertexInfo.getStartTimeInterval() < vertexInfo.getFinishTimeInterval());
- assertTrue(vertexInfo.getVertexName() != null);
- if (!hasFailedTasks) {
- assertTrue(vertexInfo.getFinishTime() > 0);
- assertTrue(vertexInfo.getFailedTasks().size() == 0);
- assertTrue(vertexInfo.getSucceededTasksCount() == vertexInfo.getSuccessfulTasks().size());
- assertTrue(vertexInfo.getFailedTasksCount() == 0);
- assertTrue(vertexInfo.getAvgTaskDuration() > 0);
- assertTrue(vertexInfo.getMaxTaskDuration() > 0);
- assertTrue(vertexInfo.getMinTaskDuration() > 0);
- assertTrue(vertexInfo.getTimeTaken() > 0);
- assertTrue(vertexInfo.getStatus().equalsIgnoreCase(VertexState.SUCCEEDED.toString()));
- assertTrue(vertexInfo.getCompletedTasksCount() > 0);
- assertTrue(vertexInfo.getFirstTaskToStart() != null);
- assertTrue(vertexInfo.getSucceededTasksCount() > 0);
- assertTrue(vertexInfo.getTasks().size() > 0);
- }
-
- for (TaskInfo taskInfo : vertexInfo.getTasks()) {
- if (taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())) {
- verifyTask(taskInfo, false);
- }
- }
-
- for (TaskInfo taskInfo : vertexInfo.getFailedTasks()) {
- verifyTask(taskInfo, true);
- }
-
- assertTrue(vertexInfo.getProcessorClassName() != null);
- assertTrue(vertexInfo.getStatus() != null);
- assertTrue(vertexInfo.getDagInfo() != null);
- assertTrue(vertexInfo.getInitTimeInterval() > 0);
- assertTrue(vertexInfo.getNumTasks() > 0);
- }
-
- private void verifyTask(TaskInfo taskInfo, boolean hasFailedAttempts) {
- assertTrue(taskInfo != null);
- assertTrue(taskInfo.getStatus() != null);
- assertTrue(taskInfo.getStartTimeInterval() > 0);
-
- //Not testing for killed attempts. So if there are no failures, it should succeed
- if (!hasFailedAttempts) {
- assertTrue(taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString()));
- assertTrue(taskInfo.getFinishTimeInterval() > 0 && taskInfo.getFinishTime() > taskInfo
- .getFinishTimeInterval());
- assertTrue(
- taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > taskInfo.getStartTimeInterval());
- assertTrue(taskInfo.getSuccessfulAttemptId() != null);
- assertTrue(taskInfo.getSuccessfulTaskAttempt() != null);
- }
- assertTrue(taskInfo.getTaskId() != null);
-
- for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
- verifyTaskAttemptInfo(attemptInfo);
- }
- }
-
- private void verifyTaskAttemptInfo(TaskAttemptInfo attemptInfo) {
- if (attemptInfo.getStatus() != null && attemptInfo.getStatus()
- .equals(TaskAttemptState.SUCCEEDED)) {
- assertTrue(attemptInfo.getStartTimeInterval() > 0);
- assertTrue(attemptInfo.getFinishTimeInterval() > 0);
- assertTrue(attemptInfo.getStartTime() > 0);
- assertTrue(attemptInfo.getFinishTime() > 0);
- assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime());
- assertTrue(attemptInfo.getFinishTime() > attemptInfo.getFinishTimeInterval());
- assertTrue(attemptInfo.getStartTime() > attemptInfo.getStartTimeInterval());
- assertTrue(attemptInfo.getNodeId() != null);
- assertTrue(attemptInfo.getTimeTaken() != -1);
- assertTrue(attemptInfo.getEvents() != null);
- assertTrue(attemptInfo.getTezCounters() != null);
- assertTrue(attemptInfo.getContainer() != null);
- }
- assertTrue(attemptInfo.getTaskInfo() != null);
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
new file mode 100644
index 0000000..c89acb2
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -0,0 +1,785 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.examples.WordCount;
+import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.SimpleHistoryParser;
+import org.apache.tez.history.parser.datamodel.BaseInfo;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.EdgeInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VersionInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.tests.MiniTezClusterWithTimeline;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestHistoryParser {
+
+ private static MiniDFSCluster miniDFSCluster;
+ private static MiniTezClusterWithTimeline miniTezCluster;
+
+ //location within miniHDFS cluster's hdfs
+ private static Path inputLoc = new Path("/tmp/sample.txt");
+
+ private final static String INPUT = "Input";
+ private final static String OUTPUT = "Output";
+ private final static String TOKENIZER = "Tokenizer";
+ private final static String SUMMATION = "Summation";
+ private final static String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/";
+ private final static String HISTORY_TXT = "history.txt";
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem fs;
+ private static String TEST_ROOT_DIR =
+ "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + "-tmpDir";
+ private static String TEZ_BASE_DIR =
+ "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + "-tez";
+ private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download";
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false);
+ EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ miniDFSCluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+ fs = miniDFSCluster.getFileSystem();
+ conf.set("fs.defaultFS", fs.getUri().toString());
+
+ setupTezCluster();
+ }
+
+ @AfterClass
+ public static void shutdownCluster() {
+ try {
+ if (miniDFSCluster != null) {
+ miniDFSCluster.shutdown();
+ }
+ if (miniTezCluster != null) {
+ miniTezCluster.stop();
+ }
+ } finally {
+ try {
+ FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
+ FileUtils.deleteDirectory(new File(TEZ_BASE_DIR));
+ } catch (IOException e) {
+ //safe to ignore
+ }
+ }
+ }
+
+ // @Before
+ public static void setupTezCluster() throws Exception {
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000);
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000);
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2);
+
+ //Enable per edge counters
+ conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true);
+ conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService
+ .class.getName());
+
+ conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR);
+
+ miniTezCluster =
+ new MiniTezClusterWithTimeline(TEZ_BASE_DIR, 1, 1, 1, true);
+
+ miniTezCluster.init(conf);
+ miniTezCluster.start();
+
+ createSampleFile(inputLoc);
+
+ TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+ tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188");
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+ tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ ATSHistoryLoggingService.class.getName());
+
+ }
+
+
+ /**
+ * Run a word count example in mini cluster and check if it is possible to download
+ * data from ATS and parse it. Also, run with SimpleHistoryLogging option and verify
+ * if it matches with ATS data.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testParserWithSuccessfulJob() throws Exception {
+ //Run basic word count example.
+ String dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
+ WordCount.SumProcessor.class.getName(), "WordCount", true);
+
+ //Export the data from ATS
+ String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
+
+ int result = ATSImportTool.process(args);
+ assertTrue(result == 0);
+
+ //Parse ATS data and verify results
+ DagInfo dagInfoFromATS = getDagInfo(dagId);
+ verifyDagInfo(dagInfoFromATS, true);
+ verifyJobSpecificInfo(dagInfoFromATS);
+
+ //Now run with SimpleHistoryLogging
+ dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
+ WordCount.SumProcessor.class.getName(), "WordCount", false);
+ Thread.sleep(10000); //For all flushes to happen and to avoid half-cooked download.
+
+ DagInfo shDagInfo = getDagInfoFromSimpleHistory(dagId);
+ verifyDagInfo(shDagInfo, false);
+ verifyJobSpecificInfo(shDagInfo);
+
+ //Compare dagInfo by parsing ATS data with DagInfo obtained by parsing SimpleHistoryLog
+ isDAGEqual(dagInfoFromATS, shDagInfo);
+ }
+
+ private DagInfo getDagInfoFromSimpleHistory(String dagId) throws TezException, IOException {
+ TezDAGID tezDAGID = TezDAGID.fromString(dagId);
+ ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(tezDAGID
+ .getApplicationId(), 1);
+ Path historyPath = new Path(conf.get("fs.defaultFS")
+ + SIMPLE_HISTORY_DIR + HISTORY_TXT + "."
+ + applicationAttemptId);
+ FileSystem fs = historyPath.getFileSystem(conf);
+
+ Path localPath = new Path(DOWNLOAD_DIR, HISTORY_TXT);
+ fs.copyToLocalFile(historyPath, localPath);
+ File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT);
+
+ //Now parse via SimpleHistory
+ SimpleHistoryParser parser = new SimpleHistoryParser(localFile);
+ DagInfo dagInfo = parser.getDAGData(dagId);
+ assertTrue(dagInfo.getDagId().equals(dagId));
+ return dagInfo;
+ }
+
+ private void verifyJobSpecificInfo(DagInfo dagInfo) {
+ //Job specific
+ assertTrue(dagInfo.getNumVertices() == 2);
+ assertTrue(dagInfo.getName().equals("WordCount"));
+ assertTrue(dagInfo.getVertex(TOKENIZER).getProcessorClassName().equals(
+ WordCount.TokenProcessor.class.getName()));
+ assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName()
+ .equals(WordCount.SumProcessor.class.getName()));
+ assertTrue(dagInfo.getEdges().size() == 1);
+ EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next();
+ assertTrue(edgeInfo.getDataMovementType().
+ equals(EdgeProperty.DataMovementType.SCATTER_GATHER.toString()));
+ assertTrue(edgeInfo.getSourceVertex().getVertexName().equals(TOKENIZER));
+ assertTrue(edgeInfo.getDestinationVertex().getVertexName().equals(SUMMATION));
+ assertTrue(edgeInfo.getInputVertexName().equals(TOKENIZER));
+ assertTrue(edgeInfo.getOutputVertexName().equals(SUMMATION));
+ assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName()));
+ assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName()));
+ assertTrue(dagInfo.getVertices().size() == 2);
+ String lastSourceTA = null;
+ String lastDataEventSourceTA = null;
+ for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+ assertTrue(vertexInfo.getKilledTasksCount() == 0);
+ assertTrue(vertexInfo.getInitRequestedTime() > 0);
+ assertTrue(vertexInfo.getInitTime() > 0);
+ assertTrue(vertexInfo.getStartRequestedTime() > 0);
+ assertTrue(vertexInfo.getStartTime() > 0);
+ assertTrue(vertexInfo.getFinishTime() > 0);
+ long finishTime = 0;
+ for (TaskInfo taskInfo : vertexInfo.getTasks()) {
+ assertTrue(taskInfo.getNumberOfTaskAttempts() == 1);
+ assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0);
+ assertTrue(taskInfo.getMinTaskAttemptDuration() >= 0);
+ assertTrue(taskInfo.getAvgTaskAttemptDuration() >= 0);
+ assertTrue(taskInfo.getLastTaskAttemptToFinish() != null);
+ assertTrue(taskInfo.getContainersMapping().size() > 0);
+ assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
+ assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
+ assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
+ List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts();
+ if (vertexInfo.getVertexName().equals(TOKENIZER)) {
+ // get the last task to finish and track its successful attempt
+ if (finishTime < taskInfo.getFinishTime()) {
+ finishTime = taskInfo.getFinishTime();
+ lastSourceTA = taskInfo.getSuccessfulAttemptId();
+ }
+ } else {
+ for (TaskAttemptInfo attempt : attempts) {
+ assertTrue(attempt.getLastDataEventTime() > 0);
+ if (lastDataEventSourceTA == null) {
+ lastDataEventSourceTA = attempt.getLastDataEventSourceTA();
+ } else {
+ // all attempts should have the same last data event source TA
+ assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA()));
+ }
+ }
+ }
+ for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+ assertTrue(attemptInfo.getStartTimeInterval() > 0);
+ assertTrue(attemptInfo.getScheduledTimeInterval() > 0);
+ }
+ }
+ assertTrue(vertexInfo.getLastTaskToFinish() != null);
+ if (vertexInfo.getVertexName().equals(TOKENIZER)) {
+ assertTrue(vertexInfo.getInputEdges().size() == 0);
+ assertTrue(vertexInfo.getOutputEdges().size() == 1);
+ assertTrue(vertexInfo.getOutputVertices().size() == 1);
+ assertTrue(vertexInfo.getInputVertices().size() == 0);
+ } else {
+ assertTrue(vertexInfo.getInputEdges().size() == 1);
+ assertTrue(vertexInfo.getOutputEdges().size() == 0);
+ assertTrue(vertexInfo.getOutputVertices().size() == 0);
+ assertTrue(vertexInfo.getInputVertices().size() == 1);
+ }
+ }
+ assertTrue(lastSourceTA.equals(lastDataEventSourceTA));
+ }
+
+ /**
+ * Run a word count example in mini cluster.
+ * Provide invalid URL for ATS.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testParserWithSuccessfulJob_InvalidATS() throws Exception {
+ //Run basic word count example.
+ String dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
+ WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL", true);
+
+ //Export the data from ATS
+ String atsAddress = "--atsAddress=http://atsHost:8188";
+ String[] args = { "--dagId=" + dagId,
+ "--downloadDir=" + DOWNLOAD_DIR,
+ atsAddress
+ };
+
+ int result = ATSImportTool.process(args);
+ assertTrue(result == -1);
+ }
+
+ /**
+ * Run a failed job and parse the data from ATS
+ */
+ @Test
+ public void testParserWithFailedJob() throws Exception {
+ //Run a job which would fail
+ String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), FailProcessor.class
+ .getName(), "WordCount-With-Exception", true);
+
+ //Export the data from ATS
+ String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
+
+ int result = ATSImportTool.process(args);
+ assertTrue(result == 0);
+
+ //Parse ATS data
+ DagInfo dagInfo = getDagInfo(dagId);
+
+ //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
+ verifyDagInfo(dagInfo, true);
+
+ //Dag specific
+ VertexInfo summationVertex = dagInfo.getVertex(SUMMATION);
+ assertTrue(summationVertex.getFailedTasks().size() == 1); //1 task, 4 attempts failed
+ assertTrue(summationVertex.getFailedTasks().get(0).getFailedTaskAttempts().size() == 4);
+ assertTrue(summationVertex.getStatus().equals(VertexState.FAILED.toString()));
+
+ assertTrue(dagInfo.getFailedVertices().size() == 1);
+ assertTrue(dagInfo.getFailedVertices().get(0).getVertexName().equals(SUMMATION));
+ assertTrue(dagInfo.getSuccessfullVertices().size() == 1);
+ assertTrue(dagInfo.getSuccessfullVertices().get(0).getVertexName().equals(TOKENIZER));
+
+ assertTrue(dagInfo.getStatus().equals(DAGState.FAILED.toString()));
+
+ verifyCounter(dagInfo.getCounter(DAGCounter.NUM_FAILED_TASKS.toString()), null, 4);
+ verifyCounter(dagInfo.getCounter(DAGCounter.NUM_SUCCEEDED_TASKS.toString()), null, 1);
+ verifyCounter(dagInfo.getCounter(DAGCounter.TOTAL_LAUNCHED_TASKS.toString()), null, 5);
+
+ verifyCounter(dagInfo.getCounter(TaskCounter.INPUT_RECORDS_PROCESSED.toString()),
+ "TaskCounter_Tokenizer_INPUT_Input", 10);
+ verifyCounter(dagInfo.getCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ.toString()),
+ "TaskCounter_Tokenizer_OUTPUT_Summation", 0);
+ verifyCounter(dagInfo.getCounter(TaskCounter.OUTPUT_RECORDS.toString()),
+ "TaskCounter_Tokenizer_OUTPUT_Summation",
+ 20); //Every line has 2 words. 10 lines x 2 words = 20
+ verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()),
+ "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above
+
+ for (TaskInfo taskInfo : summationVertex.getTasks()) {
+ String lastAttemptId = null;
+ for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+ if (lastAttemptId != null) {
+ // failed attempt should be causal TA of next attempt
+ assertTrue(lastAttemptId.equals(attemptInfo.getSchedulingCausalTA()));
+ }
+ lastAttemptId = attemptInfo.getTaskAttemptId();
+ }
+ }
+
+ //TODO: Need to check for SUMMATION vertex counters. Since all attempts are failed, counters are not getting populated.
+ //TaskCounter.REDUCE_INPUT_RECORDS
+
+ //Verify if the processor exception is given in diagnostics
+ assertTrue(dagInfo.getDiagnostics().contains("Failing this processor for some reason"));
+
+ }
+
+ /**
+ * Adding explicit equals here instead of in DAG/Vertex/Edge where hashCode also needs to
+ * change. Also, some custom comparisons are done here for unit testing.
+ */
+ private void isDAGEqual(DagInfo dagInfo1, DagInfo dagInfo2) {
+ assertNotNull(dagInfo1);
+ assertNotNull(dagInfo2);
+ assertEquals(dagInfo1.getStatus(), dagInfo2.getStatus());
+ isEdgeEqual(dagInfo1.getEdges(), dagInfo2.getEdges());
+ isVertexEqual(dagInfo1.getVertices(), dagInfo2.getVertices());
+ }
+
+ private void isVertexEqual(VertexInfo vertexInfo1, VertexInfo vertexInfo2) {
+ assertTrue(vertexInfo1 != null);
+ assertTrue(vertexInfo2 != null);
+ assertTrue(vertexInfo1.getVertexName().equals(vertexInfo2.getVertexName()));
+ assertTrue(vertexInfo1.getProcessorClassName().equals(vertexInfo2.getProcessorClassName()));
+ assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks());
+ assertTrue(vertexInfo1.getCompletedTasksCount() == vertexInfo2.getCompletedTasksCount());
+ assertTrue(vertexInfo1.getStatus().equals(vertexInfo2.getStatus()));
+
+ isEdgeEqual(vertexInfo1.getInputEdges(), vertexInfo2.getInputEdges());
+ isEdgeEqual(vertexInfo1.getOutputEdges(), vertexInfo2.getOutputEdges());
+
+ assertTrue(vertexInfo1.getInputVertices().size() == vertexInfo2.getInputVertices().size());
+ assertTrue(vertexInfo1.getOutputVertices().size() == vertexInfo2.getOutputVertices().size());
+
+ assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks());
+ isTaskEqual(vertexInfo1.getTasks(), vertexInfo2.getTasks());
+ }
+
+ private void isVertexEqual(List<VertexInfo> vertexList1, List<VertexInfo> vertexList2) {
+ assertTrue("Vertices sizes should be the same", vertexList1.size() == vertexList2.size());
+ Iterator<VertexInfo> it1 = vertexList1.iterator();
+ Iterator<VertexInfo> it2 = vertexList2.iterator();
+ while (it1.hasNext()) {
+ assertTrue(it2.hasNext());
+ VertexInfo info1 = it1.next();
+ VertexInfo info2 = it2.next();
+ isVertexEqual(info1, info2);
+ }
+ }
+
+ private void isEdgeEqual(EdgeInfo edgeInfo1, EdgeInfo edgeInfo2) {
+ assertTrue(edgeInfo1 != null);
+ assertTrue(edgeInfo2 != null);
+ String info1 = edgeInfo1.toString();
+ String info2 = edgeInfo1.toString();
+ assertTrue(info1.equals(info2));
+ }
+
+ private void isEdgeEqual(Collection<EdgeInfo> info1, Collection<EdgeInfo> info2) {
+ assertTrue("sizes should be the same", info1.size() == info1.size());
+ Iterator<EdgeInfo> it1 = info1.iterator();
+ Iterator<EdgeInfo> it2 = info2.iterator();
+ while (it1.hasNext()) {
+ assertTrue(it2.hasNext());
+ isEdgeEqual(it1.next(), it2.next());
+ }
+ }
+
+ private void isTaskEqual(Collection<TaskInfo> info1, Collection<TaskInfo> info2) {
+ assertTrue("sizes should be the same", info1.size() == info1.size());
+ Iterator<TaskInfo> it1 = info1.iterator();
+ Iterator<TaskInfo> it2 = info2.iterator();
+ while (it1.hasNext()) {
+ assertTrue(it2.hasNext());
+ isTaskEqual(it1.next(), it2.next());
+ }
+ }
+
+ private void isTaskEqual(TaskInfo taskInfo1, TaskInfo taskInfo2) {
+ assertTrue(taskInfo1 != null);
+ assertTrue(taskInfo2 != null);
+ assertTrue(taskInfo1.getVertexInfo() != null);
+ assertTrue(taskInfo2.getVertexInfo() != null);
+ assertTrue(taskInfo1.getStatus().equals(taskInfo2.getStatus()));
+ assertTrue(
+ taskInfo1.getVertexInfo().getVertexName()
+ .equals(taskInfo2.getVertexInfo().getVertexName()));
+ isTaskAttemptEqual(taskInfo1.getTaskAttempts(), taskInfo2.getTaskAttempts());
+
+ //Verify counters
+ isCountersSame(taskInfo1, taskInfo2);
+ }
+
+ private void isCountersSame(BaseInfo info1, BaseInfo info2) {
+ isCounterSame(info1.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()),
+ info2.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()));
+
+ isCounterSame(info1.getCounter(TaskCounter.SPILLED_RECORDS.name()),
+ info2.getCounter(TaskCounter.SPILLED_RECORDS.name()));
+
+ isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()),
+ info2.getCounter(TaskCounter.OUTPUT_RECORDS.name()));
+
+ isCounterSame(info1.getCounter(TaskCounter.OUTPUT_BYTES.name()),
+ info2.getCounter(TaskCounter.OUTPUT_BYTES.name()));
+
+ isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()),
+ info2.getCounter(TaskCounter.OUTPUT_RECORDS.name()));
+
+ isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()),
+ info2.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()));
+
+ isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()),
+ info2.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()));
+ }
+
+ private void isCounterSame(Map<String, TezCounter> counter1, Map<String, TezCounter> counter2) {
+ for (Map.Entry<String, TezCounter> entry : counter1.entrySet()) {
+ String source = entry.getKey();
+ long val = entry.getValue().getValue();
+
+ //check if other counter has the same value
+ assertTrue(counter2.containsKey(entry.getKey()));
+ assertTrue(counter2.get(entry.getKey()).getValue() == val);
+ }
+ }
+
+ private void isTaskAttemptEqual(Collection<TaskAttemptInfo> info1,
+ Collection<TaskAttemptInfo> info2) {
+ assertTrue("sizes should be the same", info1.size() == info1.size());
+ Iterator<TaskAttemptInfo> it1 = info1.iterator();
+ Iterator<TaskAttemptInfo> it2 = info2.iterator();
+ while (it1.hasNext()) {
+ assertTrue(it2.hasNext());
+ isTaskAttemptEqual(it1.next(), it2.next());
+ }
+ }
+
+ private void isTaskAttemptEqual(TaskAttemptInfo info1, TaskAttemptInfo info2) {
+ assertTrue(info1 != null);
+ assertTrue(info2 != null);
+ assertTrue(info1.getTaskInfo() != null);
+ assertTrue(info2.getTaskInfo() != null);
+ assertTrue(info1.getStatus().equals(info2.getStatus()));
+ assertTrue(info1.getTaskInfo().getVertexInfo().getVertexName().equals(info2.getTaskInfo()
+ .getVertexInfo().getVertexName()));
+
+ //Verify counters
+ isCountersSame(info1, info2);
+ }
+
+
+ /**
+ * Create sample file for wordcount program
+ *
+ * @param inputLoc
+ * @throws IOException
+ */
+ private static void createSampleFile(Path inputLoc) throws IOException {
+ fs.deleteOnExit(inputLoc);
+ FSDataOutputStream out = fs.create(inputLoc);
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+ for (int i = 0; i < 10; i++) {
+ writer.write("Sample " + RandomStringUtils.randomAlphanumeric(5));
+ writer.newLine();
+ }
+ writer.close();
+ }
+
+ private DagInfo getDagInfo(String dagId) throws TezException {
+ //Parse downloaded contents
+ File downloadedFile = new File(DOWNLOAD_DIR
+ + Path.SEPARATOR + dagId
+ + Path.SEPARATOR + dagId + ".zip");
+ ATSFileParser parser = new ATSFileParser(downloadedFile);
+ DagInfo dagInfo = parser.getDAGData(dagId);
+ assertTrue(dagInfo.getDagId().equals(dagId));
+ return dagInfo;
+ }
+
+ private void verifyCounter(Map<String, TezCounter> counterMap,
+ String counterGroupName, long expectedVal) {
+ //Iterate through group-->tezCounter
+ for (Map.Entry<String, TezCounter> entry : counterMap.entrySet()) {
+ if (counterGroupName != null) {
+ if (entry.getKey().equals(counterGroupName)) {
+ assertTrue(entry.getValue().getValue() == expectedVal);
+ }
+ } else {
+ assertTrue(entry.getValue().getValue() == expectedVal);
+ }
+ }
+ }
+
+ TezClient getTezClient(boolean withTimeline) throws Exception {
+ TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+ if (withTimeline) {
+ tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, withTimeline);
+ tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188");
+ tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ ATSHistoryLoggingService.class.getName());
+ } else {
+ tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ SimpleHistoryLoggingService.class.getName());
+ }
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+
+ TezClient tezClient = TezClient.create("WordCount", tezConf, false);
+ tezClient.start();
+ tezClient.waitTillReady();
+ return tezClient;
+ }
+
+ private String runWordCount(String tokenizerProcessor, String summationProcessor,
+ String dagName, boolean withTimeline)
+ throws Exception {
+ //HDFS path
+ Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis());
+
+ DataSourceDescriptor dataSource = MRInput.createConfigBuilder(conf,
+ TextInputFormat.class, inputLoc.toString()).build();
+
+ DataSinkDescriptor dataSink =
+ MROutput.createConfigBuilder(conf, TextOutputFormat.class, outputLoc.toString()).build();
+
+ Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create(
+ tokenizerProcessor)).addDataSource(INPUT, dataSource);
+
+ OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig
+ .newBuilder(Text.class.getName(), IntWritable.class.getName(),
+ HashPartitioner.class.getName()).build();
+
+ Vertex summationVertex = Vertex.create(SUMMATION,
+ ProcessorDescriptor.create(summationProcessor), 1).addDataSink(OUTPUT, dataSink);
+
+ // Create DAG and add the vertices. Connect the producer and consumer vertices via the edge
+ DAG dag = DAG.create(dagName);
+ dag.addVertex(tokenizerVertex).addVertex(summationVertex).addEdge(
+ Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty()));
+
+ TezClient tezClient = getTezClient(withTimeline);
+ DAGClient client = tezClient.submitDAG(dag);
+ client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+ TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), 1);
+
+ if (tezClient != null) {
+ tezClient.stop();
+ }
+ return tezDAGID.toString();
+ }
+
+ /**
+ * Processor which would just throw exception.
+ */
+ public static class FailProcessor extends SimpleMRProcessor {
+ public FailProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ throw new Exception("Failing this processor for some reason");
+ }
+ }
+
+ private void verifyDagInfo(DagInfo dagInfo, boolean ats) {
+ if (ats) {
+ VersionInfo versionInfo = dagInfo.getVersionInfo();
+ assertTrue(versionInfo != null); //should be present post 0.5.4
+ assertTrue(versionInfo.getVersion() != null);
+ assertTrue(versionInfo.getRevision() != null);
+ assertTrue(versionInfo.getBuildTime() != null);
+ }
+
+ assertTrue(dagInfo.getStartTime() > 0);
+ assertTrue(dagInfo.getFinishTimeInterval() > 0);
+ assertTrue(dagInfo.getStartTimeInterval() == 0);
+ assertTrue(dagInfo.getStartTime() > 0);
+ if (dagInfo.getStatus().equalsIgnoreCase(DAGState.SUCCEEDED.toString())) {
+ assertTrue(dagInfo.getFinishTime() >= dagInfo.getStartTime());
+ }
+ assertTrue(dagInfo.getFinishTimeInterval() > dagInfo.getStartTimeInterval());
+
+ assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime());
+ assertTrue(dagInfo.getTimeTaken() > 0);
+
+ //Verify all vertices
+ for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+ verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0);
+ }
+
+ VertexInfo fastestVertex = dagInfo.getFastestVertex();
+ assertTrue(fastestVertex != null);
+
+ if (dagInfo.getStatus().equals(DAGState.SUCCEEDED)) {
+ assertTrue(dagInfo.getSlowestVertex() != null);
+ }
+ }
+
+ private void verifyVertex(VertexInfo vertexInfo, boolean hasFailedTasks) {
+ assertTrue(vertexInfo != null);
+ if (hasFailedTasks) {
+ assertTrue(vertexInfo.getFailedTasksCount() > 0);
+ }
+ assertTrue(vertexInfo.getStartTimeInterval() > 0);
+ assertTrue(vertexInfo.getStartTime() > 0);
+ assertTrue(vertexInfo.getFinishTimeInterval() > 0);
+ assertTrue(vertexInfo.getStartTimeInterval() < vertexInfo.getFinishTimeInterval());
+ assertTrue(vertexInfo.getVertexName() != null);
+ if (!hasFailedTasks) {
+ assertTrue(vertexInfo.getFinishTime() > 0);
+ assertTrue(vertexInfo.getFailedTasks().size() == 0);
+ assertTrue(vertexInfo.getSucceededTasksCount() == vertexInfo.getSuccessfulTasks().size());
+ assertTrue(vertexInfo.getFailedTasksCount() == 0);
+ assertTrue(vertexInfo.getAvgTaskDuration() > 0);
+ assertTrue(vertexInfo.getMaxTaskDuration() > 0);
+ assertTrue(vertexInfo.getMinTaskDuration() > 0);
+ assertTrue(vertexInfo.getTimeTaken() > 0);
+ assertTrue(vertexInfo.getStatus().equalsIgnoreCase(VertexState.SUCCEEDED.toString()));
+ assertTrue(vertexInfo.getCompletedTasksCount() > 0);
+ assertTrue(vertexInfo.getFirstTaskToStart() != null);
+ assertTrue(vertexInfo.getSucceededTasksCount() > 0);
+ assertTrue(vertexInfo.getTasks().size() > 0);
+ }
+
+ for (TaskInfo taskInfo : vertexInfo.getTasks()) {
+ if (taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())) {
+ verifyTask(taskInfo, false);
+ }
+ }
+
+ for (TaskInfo taskInfo : vertexInfo.getFailedTasks()) {
+ verifyTask(taskInfo, true);
+ }
+
+ assertTrue(vertexInfo.getProcessorClassName() != null);
+ assertTrue(vertexInfo.getStatus() != null);
+ assertTrue(vertexInfo.getDagInfo() != null);
+ assertTrue(vertexInfo.getInitTimeInterval() > 0);
+ assertTrue(vertexInfo.getNumTasks() > 0);
+ }
+
+ private void verifyTask(TaskInfo taskInfo, boolean hasFailedAttempts) {
+ assertTrue(taskInfo != null);
+ assertTrue(taskInfo.getStatus() != null);
+ assertTrue(taskInfo.getStartTimeInterval() > 0);
+
+ //Not testing for killed attempts. So if there are no failures, it should succeed
+ if (!hasFailedAttempts) {
+ assertTrue(taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString()));
+ assertTrue(taskInfo.getFinishTimeInterval() > 0 && taskInfo.getFinishTime() > taskInfo
+ .getFinishTimeInterval());
+ assertTrue(
+ taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > taskInfo.getStartTimeInterval());
+ assertTrue(taskInfo.getSuccessfulAttemptId() != null);
+ assertTrue(taskInfo.getSuccessfulTaskAttempt() != null);
+ }
+ assertTrue(taskInfo.getTaskId() != null);
+
+ for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+ verifyTaskAttemptInfo(attemptInfo);
+ }
+ }
+
+ private void verifyTaskAttemptInfo(TaskAttemptInfo attemptInfo) {
+ if (attemptInfo.getStatus() != null && attemptInfo.getStatus()
+ .equals(TaskAttemptState.SUCCEEDED)) {
+ assertTrue(attemptInfo.getStartTimeInterval() > 0);
+ assertTrue(attemptInfo.getFinishTimeInterval() > 0);
+ assertTrue(attemptInfo.getStartTime() > 0);
+ assertTrue(attemptInfo.getFinishTime() > 0);
+ assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime());
+ assertTrue(attemptInfo.getFinishTime() > attemptInfo.getFinishTimeInterval());
+ assertTrue(attemptInfo.getStartTime() > attemptInfo.getStartTimeInterval());
+ assertTrue(attemptInfo.getNodeId() != null);
+ assertTrue(attemptInfo.getTimeTaken() != -1);
+ assertTrue(attemptInfo.getEvents() != null);
+ assertTrue(attemptInfo.getTezCounters() != null);
+ assertTrue(attemptInfo.getContainer() != null);
+ }
+ assertTrue(attemptInfo.getTaskInfo() != null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/pom.xml b/tez-tools/analyzers/job-analyzer/pom.xml
index fe28b14..36b12fe 100644
--- a/tez-tools/analyzers/job-analyzer/pom.xml
+++ b/tez-tools/analyzers/job-analyzer/pom.xml
@@ -38,6 +38,15 @@
<artifactId>tez-history-parser</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.plutext</groupId>
+ <artifactId>jaxb-svg11</artifactId>
+ <version>1.0.2</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
index 4151a90..27ad95e 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
@@ -19,15 +19,14 @@
package org.apache.tez.analyzer;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import com.google.common.base.Strings;
import org.apache.tez.dag.api.TezException;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
-import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
@@ -99,7 +98,7 @@ public class CSVResult implements Result {
StringBuilder sb = new StringBuilder();
for(int i=0;i<record.length;i++) {
- sb.append(Strings.isNullOrEmpty(record[i]) ? record[i] : " ");
+ sb.append(!Strings.isNullOrEmpty(record[i]) ? record[i] : " ");
if (i < record.length - 1) {
sb.append(",");
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index 6748f3f..88d45f3 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -19,6 +19,8 @@
package org.apache.tez.analyzer.plugins;
import com.google.common.base.Functions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -26,10 +28,13 @@ import com.google.common.collect.Ordering;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.analyzer.Analyzer;
import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.utils.Utils;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;
+import java.io.File;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -43,24 +48,43 @@ public class CriticalPathAnalyzer implements Analyzer {
private final CSVResult csvResult;
+ private static final String DOT_FILE_DIR = "tez.critical-path.analyzer.dot.output.loc";
+ private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory
+
+ private final String dotFileLocation;
+
+ private static final String CONNECTOR = "-->";
+
public CriticalPathAnalyzer(Configuration config) {
this.config = config;
this.csvResult = new CSVResult(headers);
+ this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT);
}
@Override public void analyze(DagInfo dagInfo) throws TezException {
Map<String, Long> result = Maps.newLinkedHashMap();
getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result);
- System.out.println();
- System.out.println();
-
- for (Map.Entry<String, Long> entry : sortByValues(result).entrySet()) {
+ Map<String, Long> sortedByValues = sortByValues(result);
+ for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) {
List<String> record = Lists.newLinkedList();
record.add(entry.getKey());
record.add(entry.getValue() + "");
csvResult.addRecord(record.toArray(new String[record.size()]));
- System.out.println(entry.getKey() + ", " + entry.getValue());
+ }
+
+ String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + ".dot";
+ try {
+ List<String> criticalVertices = null;
+ if (!sortedByValues.isEmpty()) {
+ String criticalPath = sortedByValues.keySet().iterator().next();
+ criticalVertices = getVertexNames(criticalPath);
+ } else {
+ criticalVertices = Lists.newLinkedList();
+ }
+ Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices);
+ } catch (IOException e) {
+ throw new TezException(e);
}
}
@@ -98,7 +122,7 @@ public class CriticalPathAnalyzer implements Analyzer {
if (dest != null) {
time += dest.getTimeTaken();
- predecessor += destVertexName + "-->";
+ predecessor += destVertexName + CONNECTOR;
for (VertexInfo incomingVertex : dest.getInputVertices()) {
getCriticalPath(predecessor, incomingVertex, time, result);
@@ -107,4 +131,12 @@ public class CriticalPathAnalyzer implements Analyzer {
result.put(predecessor, time);
}
}
+
+ private static List<String> getVertexNames(String criticalPath) {
+ if (Strings.isNullOrEmpty(criticalPath)) {
+ return Lists.newLinkedList();
+ }
+ return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split
+ (criticalPath));
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
index 67b4c51..7ed52da 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
@@ -108,7 +108,8 @@ public class LocalityAnalyzer implements Analyzer {
record.add(otherTaskResult.avgRuntime + "");
//Get the number of inputs to this vertex
- record.add(vertexInfo.getInputEdges().size() + "");
+ record.add(vertexInfo.getInputEdges().size()
+ + vertexInfo.getAdditionalInputInfoList().size() + "");
//Get the avg HDFS bytes read in this vertex for different type of locality
record.add(dataLocalResult.avgHDFSBytesRead + "");