You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/04/06 19:59:29 UTC
hive git commit: HIVE-16341: Tez Task Execution Summary has incorrect
input record counts on some operators (Jason Dere, reviewed by Gopal V)
Repository: hive
Updated Branches:
refs/heads/master ecabdd3e7 -> bc248642a
HIVE-16341: Tez Task Execution Summary has incorrect input record counts on some operators (Jason Dere, reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bc248642
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bc248642
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bc248642
Branch: refs/heads/master
Commit: bc248642a89cce0c6adc456af3452f0f6ae09774
Parents: ecabdd3
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Apr 6 12:58:16 2017 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Apr 6 12:58:16 2017 -0700
----------------------------------------------------------------------
.../hive/ql/exec/tez/monitoring/DAGSummary.java | 55 ++++++++++++++++----
1 file changed, 45 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bc248642/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
index 1400be4..6982fb9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
@@ -11,6 +11,7 @@ import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
@@ -59,24 +60,58 @@ class DAGSummary implements PrintSummary {
this.hiveCounters = hiveCounters(dagClient);
}
+ private long hiveInputRecordsFromTezCounters(String vertexName, String inputVertexName) {
+ // Get the counters for the input vertex.
+ Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+ statusOptions.add(StatusGetOpts.GET_COUNTERS);
+ VertexStatus inputVertexStatus = vertexStatus(statusOptions, inputVertexName);
+ final TezCounters inputVertexCounters = inputVertexStatus.getVertexCounters();
+
+ // eg, group name TaskCounter_Map_7_OUTPUT_Reducer_8, counter name OUTPUT_RECORDS
+ String groupName = formattedName("TaskCounter", inputVertexName, vertexName);
+ String counterName = "OUTPUT_RECORDS";
+
+ // Do not create counter if it does not exist -
+ // instead fall back to default behavior for determining input records.
+ TezCounter tezCounter = inputVertexCounters.getGroup(groupName).findCounter(counterName, false);
+ if (tezCounter == null) {
+ return -1;
+ } else {
+ return tezCounter.getValue();
+ }
+ }
+
+ private long hiveInputRecordsFromHiveCounters(String inputVertexName) {
+ // The record count from these counters may not be correct if the input vertex has
+ // edges to more than one vertex, since this value counts the records going to all
+ // destination vertices.
+
+ String intermediateRecordsCounterName = formattedName(
+ ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(),
+ inputVertexName
+ );
+ String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(),
+ inputVertexName);
+ return hiveCounterValue(intermediateRecordsCounterName) + hiveCounterValue(recordsOutCounterName);
+ }
+
private long hiveInputRecordsFromOtherVertices(String vertexName) {
List<Vertex> inputVerticesList = dag.getVertex(vertexName).getInputVertices();
long result = 0;
for (Vertex inputVertex : inputVerticesList) {
- String intermediateRecordsCounterName = formattedName(
- ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(),
- inputVertex.getName()
- );
- String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(),
- inputVertex.getName());
- result += (
- hiveCounterValue(intermediateRecordsCounterName)
- + hiveCounterValue(recordsOutCounterName)
- );
+ long inputVertexRecords = hiveInputRecordsFromTezCounters(vertexName, inputVertex.getName());
+ if (inputVertexRecords < 0) {
+ inputVertexRecords = hiveInputRecordsFromHiveCounters(inputVertex.getName());
+ }
+ result += inputVertexRecords;
}
return result;
}
+ private String formattedName(String counterName, String srcVertexName, String destVertexName) {
+ return String.format("%s_", counterName) + srcVertexName.replace(" ", "_") + "_OUTPUT_" + destVertexName.replace(" ", "_");
+ }
+
private String formattedName(String counterName, String vertexName) {
return String.format("%s_", counterName) + vertexName.replace(" ", "_");
}