You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2016/11/28 23:34:45 UTC

tez git commit: TEZ-3547. Add TaskAssignment Analyzer (Dharmesh Kakadia via rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 501a351d5 -> ad7604dd6


TEZ-3547. Add TaskAssignment Analyzer (Dharmesh Kakadia via rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ad7604dd
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ad7604dd
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ad7604dd

Branch: refs/heads/master
Commit: ad7604dd6e69cb9afed04fbcfd3d75219ce5330c
Parents: 501a351
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Nov 29 05:04:06 2016 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue Nov 29 05:04:06 2016 +0530

----------------------------------------------------------------------
 .../tez/analyzer/plugins/AnalyzerDriver.java    |   2 +
 .../plugins/TaskAssignmentAnalyzer.java         | 103 +++++++++++++++++++
 2 files changed, 105 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ad7604dd/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
index 57b21cb..2273155 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
@@ -44,6 +44,8 @@ public class AnalyzerDriver {
           "Print slow task details in a DAG");
       pgd.addClass("SpillAnalyzer", SpillAnalyzerImpl.class,
           "Print spill details in a DAG");
+      pgd.addClass("TaskAssignmentAnalyzer", TaskAssignmentAnalyzer.class,
+          "Print task-to-node assignment details of a DAG");
       pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class,
           "Print the task concurrency details in a DAG");
       pgd.addClass("VertexLevelCriticalPathAnalyzer", VertexLevelCriticalPathAnalyzer.class,

http://git-wip-us.apache.org/repos/asf/tez/blob/ad7604dd/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java
new file mode 100644
index 0000000..ce6fa41
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.Result;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Get the Task assignments on different nodes of the cluster.
+ */
+public class TaskAssignmentAnalyzer extends TezAnalyzerBase
+    implements Analyzer {
+  private final String[] headers = { "vertex", "node", "numTasks", "load" };
+  private final Configuration config;
+  private final CSVResult csvResult;
+
+  public TaskAssignmentAnalyzer(Configuration config) {
+    this.config = config;
+    csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    Map<String, Integer> map = new HashMap<>();
+    for (VertexInfo vertex : dagInfo.getVertices()) {
+      map.clear();
+      for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
+        Integer previousValue = map.get(attempt.getNodeId());
+        map.put(attempt.getNodeId(),
+            previousValue == null ? 1 : previousValue + 1);
+      }
+      double mean = vertex.getTaskAttempts().size() / Math.max(1.0, map.size());
+      for (Map.Entry<String, Integer> assignment : map.entrySet()) {
+        addARecord(vertex.getVertexName(), assignment.getKey(),
+            assignment.getValue(), assignment.getValue() * 100 / mean);
+      }
+    }
+  }
+
+  private void addARecord(String vertexName, String node, int numTasks,
+      double load) {
+    String[] record = new String[4];
+    record[0] = vertexName;
+    record[1] = node;
+    record[2] = String.valueOf(numTasks);
+    record[3] = String.format("%.2f", load);
+    csvResult.addRecord(record);
+  }
+
+  @Override
+  public Result getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Task Assignment Analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Get the Task assignments on different nodes of the cluster";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    TaskAssignmentAnalyzer analyzer = new TaskAssignmentAnalyzer(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}