You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/04/06 19:59:29 UTC

hive git commit: HIVE-16341: Tez Task Execution Summary has incorrect input record counts on some operators (Jason Dere, reviewed by Gopal V)

Repository: hive
Updated Branches:
  refs/heads/master ecabdd3e7 -> bc248642a


HIVE-16341: Tez Task Execution Summary has incorrect input record counts on some operators (Jason Dere, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bc248642
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bc248642
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bc248642

Branch: refs/heads/master
Commit: bc248642a89cce0c6adc456af3452f0f6ae09774
Parents: ecabdd3
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Apr 6 12:58:16 2017 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Apr 6 12:58:16 2017 -0700

----------------------------------------------------------------------
 .../hive/ql/exec/tez/monitoring/DAGSummary.java | 55 ++++++++++++++++----
 1 file changed, 45 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bc248642/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
index 1400be4..6982fb9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
@@ -11,6 +11,7 @@ import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
@@ -59,24 +60,58 @@ class DAGSummary implements PrintSummary {
     this.hiveCounters = hiveCounters(dagClient);
   }
 
+  private long hiveInputRecordsFromTezCounters(String vertexName, String inputVertexName) {
+    // Get the counters for the input vertex.
+    Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+    statusOptions.add(StatusGetOpts.GET_COUNTERS);
+    VertexStatus inputVertexStatus = vertexStatus(statusOptions, inputVertexName);
+    final TezCounters inputVertexCounters = inputVertexStatus.getVertexCounters();
+
+    // eg, group name TaskCounter_Map_7_OUTPUT_Reducer_8, counter name OUTPUT_RECORDS
+    String groupName = formattedName("TaskCounter", inputVertexName, vertexName);
+    String counterName = "OUTPUT_RECORDS";
+
+    // Do not create counter if it does not exist -
+    // instead fall back to default behavior for determining input records.
+    TezCounter tezCounter = inputVertexCounters.getGroup(groupName).findCounter(counterName, false);
+    if (tezCounter == null) {
+      return -1;
+    } else {
+      return tezCounter.getValue();
+    }
+  }
+
+  private long hiveInputRecordsFromHiveCounters(String inputVertexName) {
+    // The record count from these counters may not be correct if the input vertex has
+    // edges to more than one vertex, since this value counts the records going to all
+    // destination vertices.
+
+    String intermediateRecordsCounterName = formattedName(
+        ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(),
+        inputVertexName
+    );
+    String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(),
+        inputVertexName);
+    return hiveCounterValue(intermediateRecordsCounterName) + hiveCounterValue(recordsOutCounterName);
+  }
+
   private long hiveInputRecordsFromOtherVertices(String vertexName) {
     List<Vertex> inputVerticesList = dag.getVertex(vertexName).getInputVertices();
     long result = 0;
     for (Vertex inputVertex : inputVerticesList) {
-      String intermediateRecordsCounterName = formattedName(
-          ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(),
-          inputVertex.getName()
-      );
-      String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(),
-          inputVertex.getName());
-      result += (
-          hiveCounterValue(intermediateRecordsCounterName)
-              + hiveCounterValue(recordsOutCounterName)
-      );
+      long inputVertexRecords = hiveInputRecordsFromTezCounters(vertexName, inputVertex.getName());
+      if (inputVertexRecords < 0) {
+        inputVertexRecords = hiveInputRecordsFromHiveCounters(inputVertex.getName());
+      }
+      result += inputVertexRecords;
     }
     return result;
   }
 
+  private String formattedName(String counterName, String srcVertexName, String destVertexName) {
+    return String.format("%s_", counterName) + srcVertexName.replace(" ", "_") + "_OUTPUT_" + destVertexName.replace(" ", "_");
+  }
+
   private String formattedName(String counterName, String vertexName) {
     return String.format("%s_", counterName) + vertexName.replace(" ", "_");
   }