You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2012/11/02 13:00:27 UTC
svn commit: r1404933 - in /hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/test/org/apache/hadoop/hive/ql/hooks/
ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/
Author: namit
Date: Fri Nov 2 12:00:26 2012
New Revision: 1404933
URL: http://svn.apache.org/viewvc?rev=1404933&view=rev
Log:
HIVE-3570 Add/fix facility to collect operator specific statisticsin hive + add hash-in/hash-out
counter for GroupBy Optr (Satadru Pan via namit)
Added:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/OptrStatGroupByHook.java
hive/trunk/ql/src/test/queries/clientpositive/optrstat_groupby.q
hive/trunk/ql/src/test/results/clientpositive/optrstat_groupby.q.out
Modified:
hive/trunk/build-common.xml
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
Modified: hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hive/trunk/build-common.xml?rev=1404933&r1=1404932&r2=1404933&view=diff
==============================================================================
--- hive/trunk/build-common.xml (original)
+++ hive/trunk/build-common.xml Fri Nov 2 12:00:26 2012
@@ -57,7 +57,7 @@
<property name="test.output" value="true"/>
<property name="test.junit.output.format" value="xml"/>
<property name="test.junit.output.usefile" value="true"/>
- <property name="minimr.query.files" value="input16_cc.q,scriptfile1.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q"/>
+ <property name="minimr.query.files" value="input16_cc.q,scriptfile1.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q"/>
<property name="minimr.query.negative.files" value="cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q" />
<property name="test.silent" value="true"/>
<property name="hadoopVersion" value="${hadoop.version.ant-internal}"/>
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1404933&r1=1404932&r2=1404933&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Nov 2 12:00:26 2012
@@ -58,12 +58,12 @@ import org.apache.hadoop.hive.serde2.laz
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -78,10 +78,11 @@ public class GroupByOperator extends Ope
private static final Log LOG = LogFactory.getLog(GroupByOperator.class
.getName());
-
private static final long serialVersionUID = 1L;
private static final int NUMROWSESTIMATESIZE = 1000;
+ public static final String counterNameHashOut = "COUNT_HASH_OUT";
+
protected transient ExprNodeEvaluator[] keyFields;
protected transient ObjectInspector[] keyObjectInspectors;
@@ -1068,6 +1069,12 @@ public class GroupByOperator extends Ope
public void closeOp(boolean abort) throws HiveException {
if (!abort) {
try {
+ // put the hash related stats in statsMap if applicable, so that they
+ // are sent to jt as counters
+ if (hashAggr) {
+ incrCounter(counterNameHashOut, numRowsHashTbl);
+ }
+
// If there is no grouping key and no row came to this operator
if (firstRow && (keyFields.length == 0)) {
firstRow = false;
@@ -1127,6 +1134,13 @@ public class GroupByOperator extends Ope
}
}
+ @Override
+ protected List<String> getAdditionalCounters() {
+ List<String> ctrList = new ArrayList<String>();
+ ctrList.add(getWrappedCounterName(counterNameHashOut));
+ return ctrList;
+ }
+
// Group by contains the columns needed - no need to aggregate from children
public List<String> genColLists(
HashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1404933&r1=1404932&r2=1404933&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Nov 2 12:00:26 2012
@@ -548,11 +548,9 @@ public abstract class Operator<T extends
state = State.CLOSE;
LOG.info(id + " finished. closing... ");
- if (counterNameToEnum != null) {
- incrCounter(numInputRowsCntr, inputRows);
- incrCounter(numOutputRowsCntr, outputRows);
- incrCounter(timeTakenCntr, totalTime);
- }
+ incrCounter(numInputRowsCntr, inputRows);
+ incrCounter(numOutputRowsCntr, outputRows);
+ incrCounter(timeTakenCntr, totalTime);
LOG.info(id + " forwarded " + cntr + " rows");
@@ -766,10 +764,8 @@ public abstract class Operator<T extends
throws HiveException {
if ((++outputRows % 1000) == 0) {
- if (counterNameToEnum != null) {
- incrCounter(numOutputRowsCntr, outputRows);
- outputRows = 0;
- }
+ incrCounter(numOutputRowsCntr, outputRows);
+ outputRows = 0;
}
if (isLogInfoEnabled) {
@@ -1107,16 +1103,13 @@ public abstract class Operator<T extends
*/
private void preProcessCounter() {
inputRows++;
-
- if (counterNameToEnum != null) {
- if ((inputRows % 1000) == 0) {
- incrCounter(numInputRowsCntr, inputRows);
- incrCounter(timeTakenCntr, totalTime);
- inputRows = 0;
- totalTime = 0;
- }
- beginTime = System.currentTimeMillis();
+ if ((inputRows % 1000) == 0) {
+ incrCounter(numInputRowsCntr, inputRows);
+ incrCounter(timeTakenCntr, totalTime);
+ inputRows = 0;
+ totalTime = 0;
}
+ beginTime = System.currentTimeMillis();
}
/**
@@ -1135,7 +1128,11 @@ public abstract class Operator<T extends
* @param amount
*/
protected void incrCounter(String name, long amount) {
- String counterName = "CNTR_NAME_" + getOperatorId() + "_" + name;
+ if(counterNameToEnum == null) {
+ return;
+ }
+
+ String counterName = getWrappedCounterName(name);
ProgressCounter pc = counterNameToEnum.get(counterName);
// Currently, we maintain fixed number of counters per plan - in case of a
@@ -1161,6 +1158,10 @@ public abstract class Operator<T extends
return operatorId;
}
+ public final String getWrappedCounterName(String ctrName) {
+ return String.format(counterNameFormat, getOperatorId(), ctrName);
+ }
+
public void initOperatorId() {
setOperatorId(getName() + "_" + this.id);
}
@@ -1216,7 +1217,7 @@ public abstract class Operator<T extends
return false;
}
- String counterName = "CNTR_NAME_" + getOperatorId() + "_" + fatalErrorCntr;
+ String counterName = getWrappedCounterName(fatalErrorCntr);
ProgressCounter pc = counterNameToEnum.get(counterName);
// Currently, we maintain fixed number of counters per plan - in case of a
@@ -1292,14 +1293,16 @@ public abstract class Operator<T extends
protected static String numOutputRowsCntr = "NUM_OUTPUT_ROWS";
protected static String timeTakenCntr = "TIME_TAKEN";
protected static String fatalErrorCntr = "FATAL_ERROR";
+ private static String counterNameFormat = "CNTR_NAME_%s_%s";
public void initializeCounters() {
initOperatorId();
counterNames = new ArrayList<String>();
- counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + numInputRowsCntr);
- counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + numOutputRowsCntr);
- counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + timeTakenCntr);
- counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + fatalErrorCntr);
+ counterNames.add(getWrappedCounterName(numInputRowsCntr));
+ counterNames.add(getWrappedCounterName(numOutputRowsCntr));
+ counterNames.add(getWrappedCounterName(timeTakenCntr));
+ counterNames.add(getWrappedCounterName(fatalErrorCntr));
+ /* getAdditionalCounter should return Wrapped counters */
List<String> newCntrs = getAdditionalCounters();
if (newCntrs != null) {
counterNames.addAll(newCntrs);
@@ -1308,9 +1311,11 @@ public abstract class Operator<T extends
/*
* By default, the list is empty - if an operator wants to add more counters,
- * it should override this method and provide the new list.
+ * it should override this method and provide the new list. Counter names returned
+ * by this method should be wrapped counter names (i.e the strings should be passed
+ * through getWrappedCounterName).
*/
- private List<String> getAdditionalCounters() {
+ protected List<String> getAdditionalCounters() {
return null;
}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/OptrStatGroupByHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/OptrStatGroupByHook.java?rev=1404933&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/OptrStatGroupByHook.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/OptrStatGroupByHook.java Fri Nov 2 12:00:26 2012
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
+public class OptrStatGroupByHook implements ExecuteWithHookContext {
+
+ public void run(HookContext hookContext) {
+ HiveConf conf = hookContext.getConf();
+
+ List<TaskRunner> completedTasks = hookContext.getCompleteTaskList();
+
+ boolean enableProgress = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS);
+
+ /** For each task visit the opeartor tree and and if the operator is GROUPBY
+ * then print the HASH_OUT Optr level stat value.
+ **/
+ if (completedTasks != null) {
+ for (TaskRunner taskRunner : completedTasks) {
+ Task<? extends Serializable> task = taskRunner.getTask();
+ if (task.isMapRedTask() && !task.isMapRedLocalTask()) {
+ Set<Operator<? extends OperatorDesc>> optrSet = getOptrsForTask(task);
+ for (Operator<? extends OperatorDesc> optr : optrSet) {
+ if (optr.getType() == OperatorType.GROUPBY) {
+ printCounterValue(optr.getCounters());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void printCounterValue(HashMap<String, Long> ctrs) {
+ for (String ctrName : ctrs.keySet()) {
+ if (ctrName.contains("HASH_OUT")) {
+ SessionState.getConsole().printError(ctrName+"="+ctrs.get(ctrName));
+ }
+ }
+ }
+
+ private Set<Operator<? extends OperatorDesc>> getOptrsForTask(
+ Task<? extends Serializable> task) {
+
+ Collection<Operator<? extends OperatorDesc>> topOptrs = task.getTopOperators();
+ Set<Operator<? extends OperatorDesc>> allOptrs =
+ new HashSet<Operator<? extends OperatorDesc>>();
+ Queue<Operator<? extends OperatorDesc>> opsToVisit =
+ new LinkedList<Operator<? extends OperatorDesc>>();
+ if(topOptrs != null) {
+ opsToVisit.addAll(topOptrs);
+ addChildOptrs(opsToVisit, allOptrs);
+ }
+
+ return allOptrs;
+ }
+
+ private void addChildOptrs(
+ Queue<Operator<? extends OperatorDesc>> opsToVisit,
+ Set<Operator<? extends OperatorDesc>> opsVisited) {
+
+ if(opsToVisit == null || opsVisited == null) {
+ return;
+ }
+
+ while (opsToVisit.peek() != null) {
+ Operator<? extends OperatorDesc> op = opsToVisit.remove();
+ opsVisited.add(op);
+ if (op.getChildOperators() != null) {
+ for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
+ if (!opsVisited.contains(childOp)) {
+ opsToVisit.add(childOp);
+ }
+ }
+ }
+ }
+ }
+}
Added: hive/trunk/ql/src/test/queries/clientpositive/optrstat_groupby.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/optrstat_groupby.q?rev=1404933&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/optrstat_groupby.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/optrstat_groupby.q Fri Nov 2 12:00:26 2012
@@ -0,0 +1,6 @@
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.OptrStatGroupByHook;
+SET hive.exec.mode.local.auto=false;
+SET hive.task.progress=true;
+-- This test executes the OptrStatGroupBy hook which prints the optr level
+-- stats of GROUPBY optr present is the plan of below query
+SELECT count(1) FROM src;
Added: hive/trunk/ql/src/test/results/clientpositive/optrstat_groupby.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/optrstat_groupby.q.out?rev=1404933&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/optrstat_groupby.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/optrstat_groupby.q.out Fri Nov 2 12:00:26 2012
@@ -0,0 +1,8 @@
+PREHOOK: query: -- This test executes the OptrStatGroupBy hook which prints the optr level
+-- stats of GROUPBY optr present is the plan of below query
+SELECT count(1) FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+CNTR_NAME_GBY_2_COUNT_HASH_OUT=1
+500