You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2016/11/28 23:34:45 UTC
tez git commit: TEZ-3547. Add TaskAssignment Analyzer (Dharmesh
Kakadia via rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 501a351d5 -> ad7604dd6
TEZ-3547. Add TaskAssignment Analyzer (Dharmesh Kakadia via rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ad7604dd
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ad7604dd
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ad7604dd
Branch: refs/heads/master
Commit: ad7604dd6e69cb9afed04fbcfd3d75219ce5330c
Parents: 501a351
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Nov 29 05:04:06 2016 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue Nov 29 05:04:06 2016 +0530
----------------------------------------------------------------------
.../tez/analyzer/plugins/AnalyzerDriver.java | 2 +
.../plugins/TaskAssignmentAnalyzer.java | 103 +++++++++++++++++++
2 files changed, 105 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ad7604dd/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
index 57b21cb..2273155 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
@@ -44,6 +44,8 @@ public class AnalyzerDriver {
"Print slow task details in a DAG");
pgd.addClass("SpillAnalyzer", SpillAnalyzerImpl.class,
"Print spill details in a DAG");
+ pgd.addClass("TaskAssignmentAnalyzer", TaskAssignmentAnalyzer.class,
+ "Print task-to-node assignment details of a DAG");
pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class,
"Print the task concurrency details in a DAG");
pgd.addClass("VertexLevelCriticalPathAnalyzer", VertexLevelCriticalPathAnalyzer.class,
http://git-wip-us.apache.org/repos/asf/tez/blob/ad7604dd/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java
new file mode 100644
index 0000000..ce6fa41
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.Result;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Get the Task assignments on different nodes of the cluster.
+ */
+public class TaskAssignmentAnalyzer extends TezAnalyzerBase
+ implements Analyzer {
+ private final String[] headers = { "vertex", "node", "numTasks", "load" };
+ private final Configuration config;
+ private final CSVResult csvResult;
+
+ public TaskAssignmentAnalyzer(Configuration config) {
+ this.config = config;
+ csvResult = new CSVResult(headers);
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+ Map<String, Integer> map = new HashMap<>();
+ for (VertexInfo vertex : dagInfo.getVertices()) {
+ map.clear();
+ for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
+ Integer previousValue = map.get(attempt.getNodeId());
+ map.put(attempt.getNodeId(),
+ previousValue == null ? 1 : previousValue + 1);
+ }
+ double mean = vertex.getTaskAttempts().size() / Math.max(1.0, map.size());
+ for (Map.Entry<String, Integer> assignment : map.entrySet()) {
+ addARecord(vertex.getVertexName(), assignment.getKey(),
+ assignment.getValue(), assignment.getValue() * 100 / mean);
+ }
+ }
+ }
+
+ private void addARecord(String vertexName, String node, int numTasks,
+ double load) {
+ String[] record = new String[4];
+ record[0] = vertexName;
+ record[1] = node;
+ record[2] = String.valueOf(numTasks);
+ record[3] = String.format("%.2f", load);
+ csvResult.addRecord(record);
+ }
+
+ @Override
+ public Result getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "Task Assignment Analyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Get the Task assignments on different nodes of the cluster";
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ TaskAssignmentAnalyzer analyzer = new TaskAssignmentAnalyzer(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}