You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2015/12/07 23:42:55 UTC
[2/5] tez git commit: TEZ-2973. Backport Analyzers to branch-0.7
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
new file mode 100644
index 0000000..ec72df1
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Get locality information for tasks for vertices and get their task execution times.
+ * This would be helpeful to co-relate if the vertex runtime is anyways related to the data
+ * locality.
+ */
+public class LocalityAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+ private final String[] headers = { "vertexName", "numTasks", "dataLocalRatio", "rackLocalRatio",
+ "otherRatio", "avgDataLocalTaskRuntime", "avgRackLocalTaskRuntime",
+ "avgOtherLocalTaskRuntime", "noOfInputs", "avgHDFSBytesRead_DataLocal",
+ "avgHDFSBytesRead_RackLocal", "avgHDFSBytesRead_Others", "recommendation" };
+
+ private static final String DATA_LOCAL_RATIO = "tez.locality-analyzer.data.local.ratio";
+ private static final float DATA_LOCAL_RATIO_DEFAULT = 0.5f;
+
+ private final Configuration config;
+
+ private final CSVResult csvResult;
+
+ public LocalityAnalyzer(Configuration config) {
+ this.config = config;
+ csvResult = new CSVResult(headers);
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+ for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+ String vertexName = vertexInfo.getVertexName();
+
+ Map<String, TezCounter> dataLocalTask = vertexInfo.getCounter(DAGCounter.class.getName(),
+ DAGCounter.DATA_LOCAL_TASKS.toString());
+ Map<String, TezCounter> rackLocalTask = vertexInfo.getCounter(DAGCounter.class.getName(),
+ DAGCounter.RACK_LOCAL_TASKS.toString());
+
+ long dataLocalTasks = 0;
+ long rackLocalTasks = 0;
+
+ if (!dataLocalTask.isEmpty()) {
+ dataLocalTasks = dataLocalTask.get(DAGCounter.class.getName()).getValue();
+ }
+
+ if (!rackLocalTask.isEmpty()) {
+ rackLocalTasks = rackLocalTask.get(DAGCounter.class.getName()).getValue();
+ }
+
+ long totalVertexTasks = vertexInfo.getNumTasks();
+
+ if (dataLocalTasks > 0 || rackLocalTasks > 0) {
+ //compute locality details.
+ float dataLocalRatio = dataLocalTasks * 1.0f / totalVertexTasks;
+ float rackLocalRatio = rackLocalTasks * 1.0f / totalVertexTasks;
+ float othersRatio = (totalVertexTasks - (dataLocalTasks + rackLocalTasks)) * 1.0f /
+ totalVertexTasks;
+
+ List<String> record = Lists.newLinkedList();
+ record.add(vertexName);
+ record.add(totalVertexTasks + "");
+ record.add(dataLocalRatio + "");
+ record.add(rackLocalRatio + "");
+ record.add(othersRatio + "");
+
+ TaskAttemptDetails dataLocalResult = computeAverages(vertexInfo,
+ DAGCounter.DATA_LOCAL_TASKS);
+ TaskAttemptDetails rackLocalResult = computeAverages(vertexInfo,
+ DAGCounter.RACK_LOCAL_TASKS);
+ TaskAttemptDetails otherTaskResult = computeAverages(vertexInfo,
+ DAGCounter.OTHER_LOCAL_TASKS);
+
+ record.add(dataLocalResult.avgRuntime + "");
+ record.add(rackLocalResult.avgRuntime + "");
+ record.add(otherTaskResult.avgRuntime + "");
+
+ //Get the number of inputs to this vertex
+ record.add(vertexInfo.getInputEdges().size()
+ + vertexInfo.getAdditionalInputInfoList().size() + "");
+
+ //Get the avg HDFS bytes read in this vertex for different type of locality
+ record.add(dataLocalResult.avgHDFSBytesRead + "");
+ record.add(rackLocalResult.avgHDFSBytesRead + "");
+ record.add(otherTaskResult.avgHDFSBytesRead + "");
+
+ String recommendation = "";
+ if (dataLocalRatio < config.getFloat(DATA_LOCAL_RATIO, DATA_LOCAL_RATIO_DEFAULT)) {
+ recommendation = "Data locality is poor for this vertex. Try tuning "
+ + TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS + ", "
+ + TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED + ", "
+ + TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED;
+ }
+
+ record.add(recommendation);
+ csvResult.addRecord(record.toArray(new String[record.size()]));
+ }
+ }
+ }
+
+ /**
+ * Compute counter averages for specific vertex
+ *
+ * @param vertexInfo
+ * @param counter
+ * @return task attempt details
+ */
+ private TaskAttemptDetails computeAverages(VertexInfo vertexInfo, DAGCounter counter) {
+ long totalTime = 0;
+ long totalTasks = 0;
+ long totalHDFSBytesRead = 0;
+
+ TaskAttemptDetails result = new TaskAttemptDetails();
+
+ for(TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+ Map<String, TezCounter> localityCounter = attemptInfo.getCounter(DAGCounter.class.getName(),
+ counter.toString());
+
+ if (!localityCounter.isEmpty() &&
+ localityCounter.get(DAGCounter.class.getName()).getValue() > 0) {
+ totalTime += attemptInfo.getTimeTaken();
+ totalTasks++;
+
+ //get HDFSBytes read counter
+ Map<String, TezCounter> hdfsBytesReadCounter = attemptInfo.getCounter(FileSystemCounter
+ .class.getName(), FileSystemCounter.HDFS_BYTES_READ.name());
+ for(Map.Entry<String, TezCounter> entry : hdfsBytesReadCounter.entrySet()) {
+ totalHDFSBytesRead += entry.getValue().getValue();
+ }
+ }
+ }
+ if (totalTasks > 0) {
+ result.avgRuntime = (totalTime * 1.0f / totalTasks);
+ result.avgHDFSBytesRead = (totalHDFSBytesRead * 1.0f / totalTasks);
+ }
+ return result;
+ }
+
+ @Override public CSVResult getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override public String getName() {
+ return "Locality Analyzer";
+ }
+
+ @Override public String getDescription() {
+ return "Analyze for locality information (data local, rack local, off-rack)";
+ }
+
+ @Override public Configuration getConfiguration() {
+ return config;
+ }
+
+ /**
+ * Placeholder for task attempt details
+ */
+ static class TaskAttemptDetails {
+ float avgHDFSBytesRead;
+ float avgRuntime;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ LocalityAnalyzer analyzer = new LocalityAnalyzer(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
new file mode 100644
index 0000000..57e91c6
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Analyze the time taken by merge phase, shuffle phase, time taken to do realistic work etc in
+ * tasks.
+ *
+ * Just dump REDUCE_INPUT_GROUPS, REDUCE_INPUT_RECORDS, its ratio and SHUFFLE_BYTES for tasks
+ * grouped by vertices. Provide time taken as well. Just render it as a table for now.
+ *
+ */
+public class ShuffleTimeAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+ /**
+ * ratio of (total time taken by task - shuffle time) / (total time taken by task)
+ */
+ private static final String REAL_WORK_DONE_RATIO = "tez.shuffle-time-analyzer.real-work.done.ratio";
+ private static final float REAL_WORK_DONE_RATIO_DEFAULT = 0.5f;
+
+ /**
+ * Number of min records that needs to get in as reduce input records.
+ */
+ private static final String MIN_SHUFFLE_RECORDS = "tez.shuffle-time-analyzer.shuffle.min.records";
+ private static final long MIN_SHUFFLE_RECORDS_DEFAULT = 10000;
+
+ private static final String[] headers = { "vertexName", "taskAttemptId", "Node", "counterGroup",
+ "Comments", "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES",
+ "TotalTime", "Time_taken_to_receive_all_events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME",
+ "TimeTaken_For_Real_Task", "FIRST_EVENT_RECEIVED", "LAST_EVENT_RECEIVED",
+ "SHUFFLE_BYTES_DISK_DIRECT" };
+
+ private final CSVResult csvResult = new CSVResult(headers);
+
+ private final Configuration config;
+
+ private final float realWorkDoneRatio;
+ private final long minShuffleRecords;
+
+
+ public ShuffleTimeAnalyzer(Configuration config) {
+ this.config = config;
+
+ realWorkDoneRatio = config.getFloat
+ (REAL_WORK_DONE_RATIO, REAL_WORK_DONE_RATIO_DEFAULT);
+ minShuffleRecords = config.getLong(MIN_SHUFFLE_RECORDS, MIN_SHUFFLE_RECORDS_DEFAULT);
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+
+ for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+ for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+ //counter_group (basically source) --> counter
+ Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter
+ .REDUCE_INPUT_GROUPS.toString());
+ Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter
+ .REDUCE_INPUT_RECORDS.toString());
+
+ if (reduceInputGroups == null) {
+ continue;
+ }
+
+ for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+ String counterGroupName = entry.getKey();
+ long reduceInputGroupsVal = entry.getValue().getValue();
+ long reduceInputRecordsVal = (reduceInputRecords.get(counterGroupName) != null) ?
+ reduceInputRecords.get(counterGroupName).getValue() : 0;
+
+ if (reduceInputRecordsVal <= 0) {
+ continue;
+ }
+ float ratio = (reduceInputGroupsVal * 1.0f / reduceInputRecordsVal);
+
+ if (ratio > 0 && reduceInputRecordsVal > minShuffleRecords) {
+ List<String> result = Lists.newLinkedList();
+ result.add(vertexInfo.getVertexName());
+ result.add(attemptInfo.getTaskAttemptId());
+ result.add(attemptInfo.getNodeId());
+ result.add(counterGroupName);
+
+ //Real work done in the task
+ String comments = "";
+ String mergePhaseTime = getCounterValue(TaskCounter.MERGE_PHASE_TIME,
+ counterGroupName, attemptInfo);
+ String timeTakenForRealWork = "";
+ if (!Strings.isNullOrEmpty(mergePhaseTime)) {
+ long realWorkDone = attemptInfo.getTimeTaken() - Long.parseLong(mergePhaseTime);
+
+ if ((realWorkDone * 1.0f / attemptInfo.getTimeTaken()) < realWorkDoneRatio) {
+ comments = "Time taken in shuffle is more than the actual work being done in task. "
+ + " Check if source/destination machine is a slow node. Check if merge phase "
+ + "time is more to understand disk bottlenecks in this node. Check for skew";
+ }
+
+ timeTakenForRealWork = Long.toString(realWorkDone);
+ }
+ result.add(comments);
+
+ result.add(reduceInputGroupsVal + "");
+ result.add(reduceInputRecordsVal + "");
+ result.add("" + (1.0f * reduceInputGroupsVal / reduceInputRecordsVal));
+ result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES, counterGroupName, attemptInfo));
+
+ result.add(Long.toString(attemptInfo.getTimeTaken()));
+
+ //Total time taken for receiving all events from source tasks
+ result.add(getOverheadFromSourceTasks(counterGroupName, attemptInfo));
+ result.add(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName, attemptInfo));
+ result.add(getCounterValue(TaskCounter.SHUFFLE_PHASE_TIME, counterGroupName, attemptInfo));
+
+ result.add(timeTakenForRealWork);
+
+ result.add(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, counterGroupName, attemptInfo));
+ result.add(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, counterGroupName, attemptInfo));
+ result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT, counterGroupName, attemptInfo));
+
+ csvResult.addRecord(result.toArray(new String[result.size()]));
+ }
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Time taken to receive all events from source tasks
+ *
+ * @param counterGroupName
+ * @param attemptInfo
+ * @return String
+ */
+ private String getOverheadFromSourceTasks(String counterGroupName, TaskAttemptInfo attemptInfo) {
+ String firstEventReceived = getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED,
+ counterGroupName, attemptInfo);
+ String lastEventReceived = getCounterValue(TaskCounter.LAST_EVENT_RECEIVED,
+ counterGroupName, attemptInfo);
+
+ if (!Strings.isNullOrEmpty(firstEventReceived) && !Strings.isNullOrEmpty(lastEventReceived)) {
+ return Long.toString(Long.parseLong(lastEventReceived) - Long.parseLong(firstEventReceived));
+ } else {
+ return "";
+ }
+ }
+
+ private String getCounterValue(TaskCounter counter, String counterGroupName,
+ TaskAttemptInfo attemptInfo) {
+ Map<String, TezCounter> tezCounterMap = attemptInfo.getCounter(counter.toString());
+ if (tezCounterMap != null) {
+ for (Map.Entry<String, TezCounter> entry : tezCounterMap.entrySet()) {
+ String groupName = entry.getKey();
+ long val = entry.getValue().getValue();
+ if (groupName.equals(counterGroupName)) {
+ return Long.toString(val);
+ }
+ }
+ }
+ return "";
+ }
+
+ @Override
+ public CSVResult getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "Shuffle time analyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Analyze the time taken for shuffle, merge "
+ + "and the real work done in the task";
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ ShuffleTimeAnalyzer analyzer = new ShuffleTimeAnalyzer(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
new file mode 100644
index 0000000..067d871
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * <p/>
+ * Identify the skew (RECORD_INPUT_GROUPS / REDUCE_INPUT_RECORDS) ratio for all task attempts
+ * and report if they are below a certain threshold.
+ * <p/>
+ * <p/>
+ * - Case 1: Ratio of (reduce_input_groups / reduce_input_records) < 0.2 && SHUFFLE_BYTES > 1 GB
+ * per task attempt from a source. This means couple of keys having too many records. Either
+ * partitioning is wrong, or we need to increase memory limit for this vertex.
+ * <p/>
+ * - Case 2: Ratio of (reduce_input_groups / reduce_input_records) > 0.6 & Number of reduce input
+ * records in task attempt is closer to say 60% of overall number of records
+ * in vertex level & numTasks in vertex is greater than 1. This might have any number of reducer
+ * groups. This means that, partitioning is wrong (can also consider reducing number of tasks
+ * for that vertex). In some cases, too many reducers are launched and this can help find those.
+ * <p/>
+ * - Case 3: Ratio of (reduce_input_groups / reduce_input_records) is between 0.2 & 0.6 per task
+ * attempt & numTasks is greater than 1 & SHUFFLE_BYTES > 1 GB per task attempt from a
+ * source. This means, may be consider increasing parallelism based on the task attempt runtime.
+ * <p/>
+ */
+public class SkewAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+ /**
+ * Amount of bytes that was sent as shuffle bytes from source. If it is below this threshold,
+ * it would not be considered for analysis.
+ */
+ private static final String SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE = "tez.skew-analyzer.shuffle"
+ + ".bytes.per.source";
+ private static final long SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT = 900 * 1024 * 1024l;
+
+ //Min reducer input group : reducer keys ratio for computation
+ private static final String ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO = "tez.skew-analyzer.shuffle.key"
+ + ".group.min.ratio";
+ private static final float ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO_DEFAULT = 0.2f;
+
+ //Max reducer input group : reducer keys ratio for computation
+ private static final String ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO = "tez.skew-analyzer.shuffle.key"
+ + ".group.max.ratio";
+ private static final float ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT = 0.4f;
+
+
+
+ private static final String[] headers = { "vertexName", "taskAttemptId", "counterGroup", "node",
+ "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES", "timeTaken",
+ "observation" };
+
+ private final CSVResult csvResult = new CSVResult(headers);
+
+ private final Configuration config;
+
+ private final float minRatio;
+ private final float maxRatio;
+ private final long maxShuffleBytesPerSource;
+
+ public SkewAnalyzer(Configuration config) {
+ this.config = config;
+ maxRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO,
+ ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT);
+ minRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO,
+ ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO_DEFAULT);
+ maxShuffleBytesPerSource = config.getLong(SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE,
+ SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT);
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+ Preconditions.checkArgument(dagInfo != null, "DAG can't be null");
+ analyzeReducers(dagInfo);
+ }
+
+ private void analyzeReducers(DagInfo dagInfo) {
+ for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+ for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+ analyzeGroupSkewPerSource(attemptInfo);
+ analyzeRecordSkewPerSource(attemptInfo);
+ analyzeForParallelism(attemptInfo);
+ }
+ }
+ }
+
+ /**
+ * Analyze scenario where couple keys are having too many records per source
+ *
+ * @param attemptInfo
+ */
+ private void analyzeGroupSkewPerSource(TaskAttemptInfo attemptInfo) {
+
+ //counter_group (basically source) --> counter
+ Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter
+ .REDUCE_INPUT_GROUPS.toString());
+ Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter
+ .REDUCE_INPUT_RECORDS.toString());
+ Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString());
+
+
+ //tez counter for every source
+ for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+ if (entry.getKey().equals(TaskCounter.class.getName())) {
+ //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up
+ // getting TaskCounter details as well.
+ continue;
+ }
+
+ String counterGroup = entry.getKey();
+ long inputGroupsCount = entry.getValue().getValue();
+ long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords
+ .get(counterGroup).getValue() : 0;
+ long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ? shuffleBytes.get
+ (counterGroup).getValue() : 0;
+
+ float ratio = (inputGroupsCount * 1.0f / inputRecordsCount);
+
+ //Case 1: Couple of keys having too many records per source.
+ if (shuffleBytesPerSource > maxShuffleBytesPerSource) {
+ if (ratio < minRatio) {
+ List<String> result = Lists.newLinkedList();
+ result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
+ result.add(attemptInfo.getTaskAttemptId());
+ result.add(counterGroup);
+ result.add(attemptInfo.getNodeId());
+ result.add(inputGroupsCount + "");
+ result.add(inputRecordsCount + "");
+ result.add(ratio + "");
+ result.add(shuffleBytesPerSource + "");
+ result.add(attemptInfo.getTimeTaken() + "");
+ result.add("Please check partitioning. Otherwise consider increasing memLimit");
+
+ csvResult.addRecord(result.toArray(new String[result.size()]));
+ }
+ }
+ }
+ }
+
+ /**
+ * Analyze scenario where one task is getting > 60% of the vertex level records
+ *
+ * @param attemptInfo
+ */
+ private void analyzeRecordSkewPerSource(TaskAttemptInfo attemptInfo) {
+
+ Map<String, TezCounter> vertexLevelReduceInputRecords =
+ attemptInfo.getTaskInfo().getVertexInfo()
+ .getCounter(TaskCounter.REDUCE_INPUT_RECORDS.toString());
+
+ int vertexNumTasks = attemptInfo.getTaskInfo().getVertexInfo().getNumTasks();
+
+ //counter_group (basically source) --> counter
+ Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter
+ .REDUCE_INPUT_GROUPS.toString());
+ Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter
+ .REDUCE_INPUT_RECORDS.toString());
+ Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString());
+
+
+ //tez counter for every source
+ for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+ if (entry.getKey().equals(TaskCounter.class.getName())) {
+ //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up
+ // getting TaskCounter details as well.
+ continue;
+ }
+
+ String counterGroup = entry.getKey();
+ long inputGroupsCount = entry.getValue().getValue();
+ long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords
+ .get(counterGroup).getValue() : 0;
+ long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ?shuffleBytes.get
+ (counterGroup).getValue() : 0;
+ long vertexLevelInputRecordsCount = (vertexLevelReduceInputRecords.get(counterGroup) !=
+ null) ?
+ vertexLevelReduceInputRecords.get(counterGroup).getValue() : 0;
+
+ float ratio = (inputRecordsCount * 1.0f / vertexLevelInputRecordsCount);
+
+ if (vertexNumTasks > 1) {
+ if (ratio > maxRatio) {
+ //input records > 60% of vertex level record count
+ if (inputRecordsCount > (vertexLevelInputRecordsCount * 0.60)) {
+ List<String> result = Lists.newLinkedList();
+ result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
+ result.add(attemptInfo.getTaskAttemptId());
+ result.add(counterGroup);
+ result.add(attemptInfo.getNodeId());
+ result.add(inputGroupsCount + "");
+ result.add(inputRecordsCount + "");
+ result.add(ratio + "");
+ result.add(shuffleBytesPerSource + "");
+ result.add(attemptInfo.getTimeTaken() + "");
+ result.add("Some task attempts are getting > 60% of reduce input records. "
+ + "Consider adjusting parallelism & check partition logic");
+
+ csvResult.addRecord(result.toArray(new String[result.size()]));
+
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Analyze scenario where a vertex would need to increase parallelism
+ *
+ * @param attemptInfo
+ */
+ private void analyzeForParallelism(TaskAttemptInfo attemptInfo) {
+
+ //counter_group (basically source) --> counter
+ Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter
+ .REDUCE_INPUT_GROUPS.toString());
+ Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter
+ .REDUCE_INPUT_RECORDS.toString());
+ Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString());
+
+ //tez counter for every source
+ for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+ if (entry.getKey().equals(TaskCounter.class.getName())) {
+ //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up
+ // getting TaskCounter details as well.
+ continue;
+ }
+
+ String counterGroup = entry.getKey();
+ long inputGroupsCount = entry.getValue().getValue();
+ long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords
+ .get(counterGroup).getValue() : 0;
+ long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ? shuffleBytes.get
+ (counterGroup).getValue() : 0;
+
+ float ratio = (inputGroupsCount * 1.0f / inputRecordsCount);
+
+ //Case 3: Shuffle_Bytes > 1 GB. Ratio between 0.2 & < 0.6. Consider increasing
+ // parallelism based on task runtime.
+ if (shuffleBytesPerSource > SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT) {
+ if (ratio > minRatio && ratio < maxRatio) {
+ //couple of keys have too many records. Classic case of partition issue.
+ List<String> result = Lists.newLinkedList();
+ result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
+ result.add(attemptInfo.getTaskAttemptId());
+ result.add(counterGroup);
+ result.add(attemptInfo.getNodeId());
+ result.add(inputGroupsCount + "");
+ result.add(inputRecordsCount + "");
+ result.add(ratio + "");
+ result.add(shuffleBytesPerSource + "");
+ result.add(attemptInfo.getTimeTaken() + "");
+ result.add("Consider increasing parallelism.");
+
+ csvResult.addRecord(result.toArray(new String[result.size()]));
+ }
+ }
+ }
+
+
+ }
+
+
+ @Override
+ public CSVResult getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "Skew Analyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Analyzer reducer skews by mining reducer task counters";
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return null;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ SkewAnalyzer analyzer = new SkewAnalyzer(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
new file mode 100644
index 0000000..a810a8a
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * This will provide the set of nodes participated in the DAG in descending order of task execution
+ * time.
+ * <p/>
+ * Combine it with other counters to understand slow nodes better.
+ */
+public class SlowNodeAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+ private static final Log LOG = LogFactory.getLog(SlowNodeAnalyzer.class);
+
+ private static final String[] headers = { "nodeName", "noOfTasksExecuted", "noOfKilledTasks",
+ "noOfFailedTasks", "avgSucceededTaskExecutionTime", "avgKilledTaskExecutionTime",
+ "avgFailedTaskExecutionTime", "avgHDFSBytesRead", "avgHDFSBytesWritten",
+ "avgFileBytesRead", "avgFileBytesWritten", "avgGCTimeMillis", "avgCPUTimeMillis" };
+
+ private final CSVResult csvResult = new CSVResult(headers);
+
+ private final Configuration config;
+
+ public SlowNodeAnalyzer(Configuration config) {
+ this.config = config;
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+ Multimap<String, TaskAttemptInfo> nodeDetails = dagInfo.getNodeDetails();
+ for (String nodeName : nodeDetails.keySet()) {
+ List<String> record = Lists.newLinkedList();
+
+ Collection<TaskAttemptInfo> taskAttemptInfos = nodeDetails.get(nodeName);
+
+ record.add(nodeName);
+ record.add(taskAttemptInfos.size() + "");
+ record.add(getNumberOfTasks(taskAttemptInfos, TaskAttemptState.KILLED) + "");
+ record.add(getNumberOfTasks(taskAttemptInfos, TaskAttemptState.FAILED) + "");
+
+ Iterable<TaskAttemptInfo> succeedTasks = getFilteredTaskAttempts(taskAttemptInfos,
+ TaskAttemptState.SUCCEEDED);
+ record.add(getAvgTaskExecutionTime(succeedTasks) + "");
+
+ Iterable<TaskAttemptInfo> killedTasks = getFilteredTaskAttempts(taskAttemptInfos,
+ TaskAttemptState.KILLED);
+ record.add(getAvgTaskExecutionTime(killedTasks) + "");
+
+ Iterable<TaskAttemptInfo> failedTasks = getFilteredTaskAttempts(taskAttemptInfos,
+ TaskAttemptState.FAILED);
+ record.add(getAvgTaskExecutionTime(failedTasks) + "");
+
+ record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+ .getName(), FileSystemCounter.HDFS_BYTES_READ.name()) + "");
+ record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+ .getName(), FileSystemCounter.HDFS_BYTES_WRITTEN.name()) + "");
+ record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+ .getName(), FileSystemCounter.FILE_BYTES_READ.name()) + "");
+ record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+ .getName(), FileSystemCounter.FILE_BYTES_WRITTEN.name()) + "");
+ record.add(getAvgCounter(taskAttemptInfos, TaskCounter.class
+ .getName(), TaskCounter.GC_TIME_MILLIS.name()) + "");
+ record.add(getAvgCounter(taskAttemptInfos, TaskCounter.class
+ .getName(), TaskCounter.CPU_MILLISECONDS.name()) + "");
+
+ csvResult.addRecord(record.toArray(new String[record.size()]));
+ }
+ }
+
+ private Iterable<TaskAttemptInfo> getFilteredTaskAttempts(Collection<TaskAttemptInfo>
+ taskAttemptInfos, final TaskAttemptState status) {
+ return Iterables.filter(taskAttemptInfos, new
+ Predicate<TaskAttemptInfo>() {
+ @Override public boolean apply(TaskAttemptInfo input) {
+ return input.getStatus().equalsIgnoreCase(status.toString());
+ }
+ });
+ }
+
+ private float getAvgTaskExecutionTime(Iterable<TaskAttemptInfo> taskAttemptInfos) {
+ long totalTime = 0;
+ int size = 0;
+ for (TaskAttemptInfo attemptInfo : taskAttemptInfos) {
+ totalTime += attemptInfo.getTimeTaken();
+ size++;
+ }
+ return (size > 0) ? (totalTime * 1.0f / size) : 0;
+ }
+
+ private int getNumberOfTasks(Collection<TaskAttemptInfo> taskAttemptInfos, TaskAttemptState
+ status) {
+ int tasks = 0;
+ for (TaskAttemptInfo attemptInfo : taskAttemptInfos) {
+ if (attemptInfo.getStatus().equalsIgnoreCase(status.toString())) {
+ tasks++;
+ }
+ }
+ return tasks;
+ }
+
+ private float getAvgCounter(Collection<TaskAttemptInfo> taskAttemptInfos, String
+ counterGroupName, String counterName) {
+ long total = 0;
+ int taskCount = 0;
+ for (TaskAttemptInfo attemptInfo : taskAttemptInfos) {
+ TezCounters tezCounters = attemptInfo.getTezCounters();
+ TezCounter counter = tezCounters.findCounter(counterGroupName, counterName);
+ if (counter != null) {
+ total += counter.getValue();
+ taskCount++;
+ } else {
+ LOG.info("Could not find counterGroupName=" + counterGroupName + ", counter=" +
+ counterName + " in " + attemptInfo);
+ }
+ }
+ return (taskCount > 0) ? (total * 1.0f / taskCount) : 0;
+ }
+
+ @Override
+ public CSVResult getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "Slow Node Analyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Analyze node details for the DAG.").append("\n");
+ sb.append("This could be used to find out the set of nodes where the tasks are taking more "
+ + "time on average.").append("\n");
+ sb.append("This could be used to find out the set of nodes where the tasks are taking more "
+ + "time on average and to understand whether too many tasks got scheduled on a node.")
+ .append("\n");
+ sb.append("One needs to combine the task execution time with other metrics like bytes "
+ + "read/written etc to get better idea of bad nodes. In order to understand the slow "
+ + "nodes due to network, it might be worthwhile to consider the shuffle performance "
+ + "analyzer tool in tez-tools").append("\n");
+ return sb.toString();
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ SlowNodeAnalyzer analyzer = new SlowNodeAnalyzer(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
new file mode 100644
index 0000000..d2474ad
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+
+/**
+ * Analyze slow tasks in the DAG. Top 100 tasks are listed by default.
+ *
+ * <p/>
+ * //TODO: We do not get counters for killed task attempts yet.
+ */
+public class SlowTaskIdentifier extends TezAnalyzerBase implements Analyzer {
+
+ private static final String[] headers = { "vertexName", "taskAttemptId",
+ "Node", "taskDuration", "Status", "diagnostics",
+ "NoOfInputs" };
+
+ private final CSVResult csvResult;
+
+ private static final String NO_OF_TASKS = "tez.slow-task-analyzer.task.count";
+ private static final int NO_OF_TASKS_DEFAULT = 100;
+
+ private final Configuration config;
+
+ public SlowTaskIdentifier(Configuration config) {
+ this.config = config;
+ this.csvResult = new CSVResult(headers);
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+ List<TaskAttemptInfo> taskAttempts = Lists.newArrayList();
+ for(VertexInfo vertexInfo : dagInfo.getVertices()) {
+ taskAttempts.addAll(vertexInfo.getTaskAttempts());
+ }
+
+ //sort them by runtime in descending order
+ Collections.sort(taskAttempts, new Comparator<TaskAttemptInfo>() {
+ @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+ return (o1.getTimeTaken() > o2.getTimeTaken()) ? -1 :
+ ((o1.getTimeTaken() == o2.getTimeTaken()) ?
+ 0 : 1);
+ }
+ });
+
+ int limit = Math.min(taskAttempts.size(),
+ Math.max(0, config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT)));
+
+ if (limit == 0) {
+ return;
+ }
+
+ for (int i = 0; i < limit - 1; i++) {
+ List<String> record = Lists.newLinkedList();
+ record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getVertexName());
+ record.add(taskAttempts.get(i).getTaskAttemptId());
+ record.add(taskAttempts.get(i).getContainer().getHost());
+ record.add(taskAttempts.get(i).getTimeTaken() + "");
+ record.add(taskAttempts.get(i).getStatus());
+ record.add(taskAttempts.get(i).getDiagnostics());
+ record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getInputEdges().size() + "");
+
+ csvResult.addRecord(record.toArray(new String[record.size()]));
+ }
+
+ }
+
+ @Override
+ public CSVResult getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "Slow Task Identifier";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Identifies slow tasks in the DAG";
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ SlowTaskIdentifier analyzer = new SlowTaskIdentifier(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
new file mode 100644
index 0000000..33f2421
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Identify the slowest vertex in the DAG.
+ */
+public class SlowestVertexAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+ private static final String[] headers = { "vertexName", "taskAttempts", "totalTime",
+ "shuffleTime", "shuffleTime_Max", "LastEventReceived", "LastEventReceivedFrom",
+ "75thPercentile", "95thPercentile", "98thPercentile", "Median",
+ "observation", "comments" };
+
+ private final CSVResult csvResult = new CSVResult(headers);
+
+ private final Configuration config;
+ private final MetricRegistry metrics = new MetricRegistry();
+ private Histogram taskAttemptRuntimeHistorgram;
+
+ private final static String MAX_VERTEX_RUNTIME = "tez.slowest-vertex-analyzer.max.vertex.runtime";
+ private final static long MAX_VERTEX_RUNTIME_DEFAULT = 100000;
+
+ private final long vertexRuntimeThreshold;
+
+ public SlowestVertexAnalyzer(Configuration config) {
+ this.config = config;
+ this.vertexRuntimeThreshold = Math.max(1, config.getLong(MAX_VERTEX_RUNTIME,
+ MAX_VERTEX_RUNTIME_DEFAULT));
+
+ }
+
+ private long getTaskRuntime(VertexInfo vertexInfo) {
+ TaskInfo firstTaskToStart = vertexInfo.getFirstTaskToStart();
+ TaskInfo lastTaskToFinish = vertexInfo.getLastTaskToFinish();
+
+ DagInfo dagInfo = vertexInfo.getDagInfo();
+ long totalTime = ((lastTaskToFinish == null) ?
+ dagInfo.getFinishTime() : lastTaskToFinish.getFinishTime()) -
+ ((firstTaskToStart == null) ? dagInfo.getStartTime() : firstTaskToStart.getStartTime());
+ return totalTime;
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+
+ for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+ String vertexName = vertexInfo.getVertexName();
+ if (vertexInfo.getFirstTaskToStart() == null || vertexInfo.getLastTaskToFinish() == null) {
+ continue;
+ }
+
+ long totalTime = getTaskRuntime(vertexInfo);
+
+ long slowestLastEventTime = Long.MIN_VALUE;
+ String maxSourceName = "";
+ taskAttemptRuntimeHistorgram = metrics.histogram(vertexName);
+
+
+ for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+
+ taskAttemptRuntimeHistorgram.update(attemptInfo.getTimeTaken());
+
+ //Get the last event received from the incoming vertices
+ Map<String, TezCounter> lastEventReceivedMap = attemptInfo.getCounter(
+ TaskCounter.LAST_EVENT_RECEIVED.toString());
+
+ for (Map.Entry<String, TezCounter> entry : lastEventReceivedMap.entrySet()) {
+ if (entry.getKey().equals(TaskCounter.class.getName())) {
+ //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up
+ // getting TaskCounter details as well.
+ continue;
+ }
+ //Find the slowest last event received
+ if (entry.getValue().getValue() > slowestLastEventTime) {
+ slowestLastEventTime = entry.getValue().getValue();
+ maxSourceName = entry.getKey();
+ }
+ }
+ }
+
+ long shuffleMax = Long.MIN_VALUE;
+ String shuffleMaxSource = "";
+ for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+ //Get the last event received from the incoming vertices
+ Map<String, TezCounter> lastEventReceivedMap = attemptInfo.getCounter(
+ TaskCounter.SHUFFLE_PHASE_TIME.toString());
+
+ for (Map.Entry<String, TezCounter> entry : lastEventReceivedMap.entrySet()) {
+ if (entry.getKey().equals(TaskCounter.class.getName())) {
+ //ignore. TODO: hack for taskcounter issue
+ continue;
+ }
+ //Find the slowest last event received
+ if (entry.getValue().getValue() > shuffleMax) {
+ shuffleMax = entry.getValue().getValue();
+ shuffleMaxSource = entry.getKey();
+ }
+ }
+ }
+
+ String comments = "";
+
+ List<String> record = Lists.newLinkedList();
+ record.add(vertexName);
+ record.add(vertexInfo.getTaskAttempts().size() + "");
+ record.add(totalTime + "");
+ record.add(Math.max(0, shuffleMax) + "");
+ record.add(shuffleMaxSource);
+ record.add(Math.max(0, slowestLastEventTime) + "");
+ record.add(maxSourceName);
+ //Finding out real_work done at vertex level might be meaningless (as it is quite posisble
+ // that it went to starvation).
+
+ StringBuilder sb = new StringBuilder();
+ double percentile75 = taskAttemptRuntimeHistorgram.getSnapshot().get75thPercentile();
+ double percentile95 = taskAttemptRuntimeHistorgram.getSnapshot().get95thPercentile();
+ double percentile98 = taskAttemptRuntimeHistorgram.getSnapshot().get98thPercentile();
+ double percentile99 = taskAttemptRuntimeHistorgram.getSnapshot().get99thPercentile();
+ double medianAttemptRuntime = taskAttemptRuntimeHistorgram.getSnapshot().getMedian();
+
+ record.add("75th=" + percentile75);
+ record.add("95th=" + percentile95);
+ record.add("98th=" + percentile98);
+ record.add("median=" + medianAttemptRuntime);
+
+ if (percentile75 / percentile99 < 0.5) {
+ //looks like some straggler task is there.
+ sb.append("Looks like some straggler task is there");
+ }
+
+ record.add(sb.toString());
+
+ if (totalTime > 0 && vertexInfo.getTaskAttempts().size() > 0) {
+ if ((shuffleMax * 1.0f / totalTime) > 0.5) {
+ if ((slowestLastEventTime * 1.0f / totalTime) > 0.5) {
+ comments = "This vertex is slow due to its dependency on parent. Got a lot delayed last"
+ + " event received";
+ } else {
+ comments =
+ "Spending too much time on shuffle. Check shuffle bytes from previous vertex";
+ }
+ } else {
+ if (totalTime > vertexRuntimeThreshold) { //greater than X seconds.
+ comments = "Concentrate on this vertex (totalTime > " + vertexRuntimeThreshold
+ + " seconds)";
+ }
+ }
+ }
+
+ record.add(comments);
+ csvResult.addRecord(record.toArray(new String[record.size()]));
+ }
+ }
+
+
+ @Override
+ public CSVResult getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "SlowVertexAnalyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Identify the slowest vertex in the DAG, which needs to be looked into first";
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ SlowestVertexAnalyzer analyzer = new SlowestVertexAnalyzer(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
new file mode 100644
index 0000000..d69ca23
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Find out tasks which have more than 1 spill (ADDITIONAL_SPILL_COUNT).
+ * <p/>
+ * Accompany this with OUTPUT_BYTES (> 1 GB data written)
+ */
+public class SpillAnalyzerImpl extends TezAnalyzerBase implements Analyzer {
+
+ private static final String[] headers = { "vertexName", "taskAttemptId",
+ "Node", "counterGroupName",
+ "spillCount", "taskDuration",
+ "OUTPUT_BYTES", "OUTPUT_RECORDS",
+ "SPILLED_RECORDS", "Recommendation" };
+
+ private final CSVResult csvResult;
+
+ /**
+ * Minimum output bytes that should be chunrned out by a task
+ */
+ private static final String OUTPUT_BYTES_THRESHOLD = "tez.spill-analyzer.min.output.bytes"
+ + ".threshold";
+ private static long OUTPUT_BYTES_THRESHOLD_DEFAULT = 1 * 1024 * 1024 * 1024l;
+
+ private final long minOutputBytesPerTask;
+
+ private final Configuration config;
+
+ public SpillAnalyzerImpl(Configuration config) {
+ this.config = config;
+ minOutputBytesPerTask = Math.max(0, config.getLong(OUTPUT_BYTES_THRESHOLD,
+ OUTPUT_BYTES_THRESHOLD_DEFAULT));
+ this.csvResult = new CSVResult(headers);
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+ for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+ String vertexName = vertexInfo.getVertexName();
+
+ for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+ //Get ADDITIONAL_SPILL_COUNT, OUTPUT_BYTES for every source
+ Map<String, TezCounter> spillCountMap =
+ attemptInfo.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name());
+ Map<String, TezCounter> spilledRecordsMap =
+ attemptInfo.getCounter(TaskCounter.SPILLED_RECORDS.name());
+ Map<String, TezCounter> outputRecordsMap =
+ attemptInfo.getCounter(TaskCounter.OUTPUT_RECORDS.name());
+
+ Map<String, TezCounter> outputBytesMap =
+ attemptInfo.getCounter(TaskCounter.OUTPUT_BYTES.name());
+
+ for (Map.Entry<String, TezCounter> entry : spillCountMap.entrySet()) {
+ String source = entry.getKey();
+ long spillCount = entry.getValue().getValue();
+ long outBytes = outputBytesMap.get(source).getValue();
+
+ long outputRecords = outputRecordsMap.get(source).getValue();
+ long spilledRecords = spilledRecordsMap.get(source).getValue();
+
+ if (spillCount > 1 && outBytes > minOutputBytesPerTask) {
+ List<String> recorList = Lists.newLinkedList();
+ recorList.add(vertexName);
+ recorList.add(attemptInfo.getTaskAttemptId());
+ recorList.add(attemptInfo.getNodeId());
+ recorList.add(source);
+ recorList.add(spillCount + "");
+ recorList.add(attemptInfo.getTimeTaken() + "");
+ recorList.add(outBytes + "");
+ recorList.add(outputRecords + "");
+ recorList.add(spilledRecords + "");
+ recorList.add("Consider increasing " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB
+ + ". Try increasing container size.");
+
+ csvResult.addRecord(recorList.toArray(new String[recorList.size()]));
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public CSVResult getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "SpillAnalyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Analyze spill details in the task";
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ SpillAnalyzerImpl analyzer = new SpillAnalyzerImpl(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
new file mode 100644
index 0000000..070294f
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.TreeMultiset;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Analyze concurrent tasks running in every vertex at regular intervals.
+ */
+public class TaskConcurrencyAnalyzer extends TezAnalyzerBase implements Analyzer {
+
+ private static final String[] headers = { "time", "vertexName", "concurrentTasksRunning" };
+
+ private final CSVResult csvResult;
+ private final Configuration config;
+
+ public TaskConcurrencyAnalyzer(Configuration conf) {
+ this.csvResult = new CSVResult(headers);
+ this.config = conf;
+ }
+
+ private enum EventType {START, FINISH}
+
+ static class TimeInfo {
+ EventType eventType;
+ long timestamp;
+ int concurrentTasks;
+
+ public TimeInfo(EventType eventType, long timestamp) {
+ this.eventType = eventType;
+ this.timestamp = timestamp;
+ }
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+
+ //For each vertex find the concurrent tasks running at any point
+ for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+ List<TaskAttemptInfo> taskAttempts =
+ Lists.newLinkedList(vertexInfo.getTaskAttempts(true, null));
+
+ String vertexName = vertexInfo.getVertexName();
+
+ /**
+ * - Get sorted multi-set of timestamps (S1, S2,...E1, E2..). Possible to have multiple
+ * tasks starting/ending at same time.
+ * - Walk through the set
+ * - Increment concurrent tasks when start event is encountered
+ * - Decrement concurrent tasks when start event is encountered
+ */
+ TreeMultiset<TimeInfo> timeInfoSet = TreeMultiset.create(new Comparator<TimeInfo>() {
+ @Override public int compare(TimeInfo o1, TimeInfo o2) {
+ return (o1.timestamp < o2.timestamp) ? -1 :
+ ((o1.timestamp == o2.timestamp) ? 0 : 1);
+ }
+ });
+
+ for (TaskAttemptInfo attemptInfo : taskAttempts) {
+ TimeInfo startTimeInfo = new TimeInfo(EventType.START, attemptInfo.getStartTime());
+ TimeInfo stopTimeInfo = new TimeInfo(EventType.FINISH, attemptInfo.getFinishTime());
+
+ timeInfoSet.add(startTimeInfo);
+ timeInfoSet.add(stopTimeInfo);
+ }
+
+ //Compute concurrent tasks in the list now.
+ int concurrentTasks = 0;
+ for(TimeInfo timeInfo : timeInfoSet.elementSet()) {
+ switch (timeInfo.eventType) {
+ case START:
+ concurrentTasks += timeInfoSet.count(timeInfo);
+ break;
+ case FINISH:
+ concurrentTasks -= timeInfoSet.count(timeInfo);
+ break;
+ default:
+ break;
+ }
+ timeInfo.concurrentTasks = concurrentTasks;
+ addToResult(vertexName, timeInfo.timestamp, timeInfo.concurrentTasks);
+ }
+ }
+ }
+
+ private void addToResult(String vertexName, long currentTime, int concurrentTasks) {
+ String[] record = { currentTime + "", vertexName, concurrentTasks + "" };
+ csvResult.addRecord(record);
+ }
+
+ @Override
+ public CSVResult getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "TaskConcurrencyAnalyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Analyze how many tasks were running in every vertex at given point in time. This "
+ + "would be helpful in understanding whether any starvation was there or not.";
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ TaskConcurrencyAnalyzer analyzer = new TaskConcurrencyAnalyzer(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
new file mode 100644
index 0000000..73e731a
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import java.io.File;
+import java.util.Iterator;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Tool;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.Result;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.ATSImportTool;
+import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.SimpleHistoryParser;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+
+import com.google.common.base.Preconditions;
+
+public abstract class TezAnalyzerBase extends Configured implements Tool, Analyzer {
+
+
+ private static final String EVENT_FILE_NAME = "eventFileName";
+ private static final String OUTPUT_DIR = "outputDir";
+ private static final String SAVE_RESULTS = "saveResults";
+ private static final String DAG_ID = "dagId";
+ private static final String FROM_SIMPLE_HISTORY = "fromSimpleHistory";
+ private static final String HELP = "help";
+
+ private static final int SEPARATOR_WIDTH = 80;
+ private static final int MIN_COL_WIDTH = 12;
+
+ private String outputDir;
+ private boolean saveResults = false;
+
+ @SuppressWarnings("static-access")
+ private static Options buildOptions() {
+ Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID)
+ .withDescription("DagId that needs to be analyzed").hasArg().isRequired(true).create();
+
+ Option outputDirOption = OptionBuilder.withArgName(OUTPUT_DIR).withLongOpt(OUTPUT_DIR)
+ .withDescription("Directory to write outputs to.").hasArg().isRequired(false).create();
+
+ Option saveResults = OptionBuilder.withArgName(SAVE_RESULTS).withLongOpt(SAVE_RESULTS)
+ .withDescription("Saves results to output directory (optional)")
+ .hasArg(false).isRequired(false).create();
+
+ Option eventFileNameOption = OptionBuilder.withArgName(EVENT_FILE_NAME).withLongOpt
+ (EVENT_FILE_NAME)
+ .withDescription("File with event data for the DAG").hasArg()
+ .isRequired(false).create();
+
+ Option fromSimpleHistoryOption = OptionBuilder.withArgName(FROM_SIMPLE_HISTORY).withLongOpt
+ (FROM_SIMPLE_HISTORY)
+ .withDescription("Event data from Simple History logging. Must also specify event file")
+ .isRequired(false).create();
+
+ Option help = OptionBuilder.withArgName(HELP).withLongOpt
+ (HELP)
+ .withDescription("print help")
+ .isRequired(false).create();
+
+ Options opts = new Options();
+ opts.addOption(dagIdOption);
+ opts.addOption(outputDirOption);
+ opts.addOption(saveResults);
+ opts.addOption(eventFileNameOption);
+ opts.addOption(fromSimpleHistoryOption);
+ opts.addOption(help);
+ return opts;
+ }
+
+ protected String getOutputDir() {
+ return outputDir;
+ }
+
+ private void printUsage() {
+ System.err.println("Analyzer base options are");
+ Options options = buildOptions();
+ for (Object obj : options.getOptions()) {
+ Option option = (Option) obj;
+ System.err.println(option.getArgName() + " : " + option.getDescription());
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ //Parse downloaded contents
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = new GnuParser().parse(buildOptions(), args);
+ } catch (ParseException e) {
+ System.err.println("Invalid options on command line");
+ printUsage();
+ return -1;
+ }
+ saveResults = cmdLine.hasOption(SAVE_RESULTS);
+
+ if(cmdLine.hasOption(HELP)) {
+ printUsage();
+ return 0;
+ }
+
+ outputDir = cmdLine.getOptionValue(OUTPUT_DIR);
+ if (outputDir == null) {
+ outputDir = System.getProperty("user.dir");
+ }
+
+ File file = null;
+ if (cmdLine.hasOption(EVENT_FILE_NAME)) {
+ file = new File(cmdLine.getOptionValue(EVENT_FILE_NAME));
+ }
+
+ String dagId = cmdLine.getOptionValue(DAG_ID);
+
+ DagInfo dagInfo = null;
+
+ if (file == null) {
+ if (cmdLine.hasOption(FROM_SIMPLE_HISTORY)) {
+ System.err.println("Event file name must be specified when using simple history");
+ printUsage();
+ return -2;
+ }
+ // using ATS - try to download directly
+ String[] importArgs = { "--dagId=" + dagId, "--downloadDir=" + outputDir };
+
+ int result = ATSImportTool.process(importArgs);
+ if (result != 0) {
+ System.err.println("Error downloading data from ATS");
+ return -3;
+ }
+
+ //Parse ATS data and verify results
+ //Parse downloaded contents
+ file = new File(outputDir
+ + Path.SEPARATOR + dagId
+ + Path.SEPARATOR + dagId + ".zip");
+ }
+
+ Preconditions.checkState(file != null);
+ if (!cmdLine.hasOption(FROM_SIMPLE_HISTORY)) {
+ ATSFileParser parser = new ATSFileParser(file);
+ dagInfo = parser.getDAGData(dagId);
+ } else {
+ SimpleHistoryParser parser = new SimpleHistoryParser(file);
+ dagInfo = parser.getDAGData(dagId);
+ }
+ Preconditions.checkState(dagInfo.getDagId().equals(dagId));
+ analyze(dagInfo);
+ Result result = getResult();
+ if (saveResults && (result instanceof CSVResult)) {
+ String fileName = outputDir + File.separator
+ + this.getClass().getName() + "_" + dagInfo.getDagId() + ".csv";
+ ((CSVResult) result).dumpToFile(fileName);
+ System.out.println("Saved results in " + fileName);
+ }
+ return 0;
+ }
+
+ public void printResults() throws TezException {
+ Result result = getResult();
+ if (result instanceof CSVResult) {
+ String[] headers = ((CSVResult) result).getHeaders();
+
+ StringBuilder formatBuilder = new StringBuilder();
+ int size = Math.max(MIN_COL_WIDTH, SEPARATOR_WIDTH / headers.length);
+ for (int i = 0; i < headers.length; i++) {
+ formatBuilder.append("%-").append(size).append("s ");
+ }
+ String format = formatBuilder.toString();
+
+ StringBuilder separator = new StringBuilder();
+ for (int i = 0; i < SEPARATOR_WIDTH; i++) {
+ separator.append("-");
+ }
+
+ System.out.println(separator);
+ System.out.println(String.format(format.toString(), (String[]) headers));
+ System.out.println(separator);
+
+ Iterator<String[]> recordsIterator = ((CSVResult) result).getRecordsIterator();
+ while (recordsIterator.hasNext()) {
+ String line = String.format(format, (String[]) recordsIterator.next());
+ System.out.println(line);
+ }
+ System.out.println(separator);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
new file mode 100644
index 0000000..06b8983
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.utils.Utils;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Identify a set of vertices which fall in the critical path in a DAG.
+ */
+public class VertexLevelCriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
+ private final Configuration config;
+
+ private static final String[] headers = { "CriticalPath", "Score" };
+
+ private final CSVResult csvResult;
+
+ private static final String DOT_FILE_DIR = "tez.critical-path.analyzer.dot.output.loc";
+ private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory
+
+ private final String dotFileLocation;
+
+ private static final String CONNECTOR = "-->";
+
+ public VertexLevelCriticalPathAnalyzer(Configuration config) {
+ this.config = config;
+ this.csvResult = new CSVResult(headers);
+ this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT);
+ }
+
+ @Override public void analyze(DagInfo dagInfo) throws TezException {
+ Map<String, Long> result = Maps.newLinkedHashMap();
+ getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result);
+
+ Map<String, Long> sortedByValues = sortByValues(result);
+ for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) {
+ List<String> record = Lists.newLinkedList();
+ record.add(entry.getKey());
+ record.add(entry.getValue() + "");
+ csvResult.addRecord(record.toArray(new String[record.size()]));
+ }
+
+ String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + ".dot";
+ try {
+ List<String> criticalVertices = null;
+ if (!sortedByValues.isEmpty()) {
+ String criticalPath = sortedByValues.keySet().iterator().next();
+ criticalVertices = getVertexNames(criticalPath);
+ } else {
+ criticalVertices = Lists.newLinkedList();
+ }
+ Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices);
+ } catch (IOException e) {
+ throw new TezException(e);
+ }
+ }
+
+ @Override
+ public CSVResult getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "CriticalPathAnalyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Analyze vertex level critical path of the DAG";
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ private static Map<String, Long> sortByValues(Map<String, Long> result) {
+ //Sort result by time in reverse order
+ final Ordering<String> reversValueOrdering =
+ Ordering.natural().reverse().nullsLast().onResultOf(Functions.forMap(result, null));
+ Map<String, Long> orderedMap = ImmutableSortedMap.copyOf(result, reversValueOrdering);
+ return orderedMap;
+ }
+
+ private static void getCriticalPath(String predecessor, VertexInfo dest, long time,
+ Map<String, Long> result) {
+ String destVertexName = (dest != null) ? (dest.getVertexName()) : "";
+
+ if (dest != null) {
+ time += dest.getTimeTaken();
+ predecessor += destVertexName + CONNECTOR;
+
+ for (VertexInfo incomingVertex : dest.getInputVertices()) {
+ getCriticalPath(predecessor, incomingVertex, time, result);
+ }
+
+ result.put(predecessor, time);
+ }
+ }
+
+ private static List<String> getVertexNames(String criticalPath) {
+ if (Strings.isNullOrEmpty(criticalPath)) {
+ return Lists.newLinkedList();
+ }
+ return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split
+ (criticalPath));
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ VertexLevelCriticalPathAnalyzer analyzer = new VertexLevelCriticalPathAnalyzer(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}