You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/02 19:12:52 UTC
[hive] branch master updated: HIVE-23356 : Hash aggregation is
always disabled while processing querys with grouping sets expressions.
(Qiang Kang via Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0fd99df HIVE-23356 : Hash aggregation is always disabled while processing querys with grouping sets expressions. (Qiang Kang via Ashutosh Chauhan)
0fd99df is described below
commit 0fd99df99dc07540d8818d179bcdcb2972f09752
Author: Qiang Kang <tk...@gmail.com>
AuthorDate: Sat May 2 12:12:06 2020 -0700
HIVE-23356 : Hash aggregation is always disabled while processing querys with grouping sets expressions. (Qiang Kang via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../hadoop/hive/ql/exec/GroupByOperator.java | 17 +--
.../apache/hadoop/hive/ql/exec/TestOperators.java | 131 +++++++++++++++++++++
2 files changed, 140 insertions(+), 8 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 7220f33..b94e3fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -739,20 +739,21 @@ public class GroupByOperator extends Operator<GroupByDesc> implements IConfigure
// if hash aggregation is not behaving properly, disable it
if (numRowsInput == numRowsCompareHashAggr) {
numRowsCompareHashAggr += groupbyMapAggrInterval;
+ long numRowsProcessed = groupingSetsPresent ? numRowsInput * groupingSets.size() : numRowsInput;
// map-side aggregation should reduce the entries by at-least half
- if (numRowsHashTbl > numRowsInput * minReductionHashAggr) {
+ if (numRowsHashTbl > numRowsProcessed * minReductionHashAggr) {
LOG.warn("Disable Hash Aggr: #hash table = " + numRowsHashTbl
- + " #total = " + numRowsInput + " reduction = " + 1.0
- * (numRowsHashTbl / numRowsInput) + " minReduction = "
- + minReductionHashAggr);
+ + " #numRowsInput = " + numRowsInput + " reduction = " + 1.0 * (numRowsHashTbl / numRowsProcessed)
+ + " minReduction = " + minReductionHashAggr + " groupingSetsPresent = " + groupingSetsPresent
+ + " numRowsProcessed = " + numRowsProcessed);
flushHashTable(true);
hashAggr = false;
} else {
if (LOG.isTraceEnabled()) {
- LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl
- + " #total = " + numRowsInput + " reduction = " + 1.0
- * (numRowsHashTbl / numRowsInput) + " minReduction = "
- + minReductionHashAggr);
+ LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl + " #numRowsInput = " + numRowsInput
+ + " reduction = " + 1.0 * (numRowsHashTbl / numRowsProcessed) + " minReduction = "
+ + minReductionHashAggr + " groupingSetsPresent = " + groupingSetsPresent + " numRowsProcessed = "
+ + numRowsProcessed);
}
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
index 8a0606b..3c0a7eb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
@@ -32,14 +32,18 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.IOContextMap;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.type.ExprNodeTypeCheck;
import org.apache.hadoop.hive.ql.parse.type.TypeCheckProcFactory;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -49,6 +53,7 @@ import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -525,4 +530,130 @@ public class TestOperators {
assertEquals(5,
convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, null).getMaxExecutorsOverSubscribeMemory());
}
+
+ @Test public void testHashGroupBy() throws HiveException {
+ InspectableObject[] input = constructHashAggrInputData(5, 3);
+ System.out.println("---------------Begin to Construct Groupby Desc-------------");
+ // 1. Build AggregationDesc
+ String aggregate = "MAX";
+ ExprNodeDesc inputColumn = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "col0", "table", false);
+ ArrayList<ExprNodeDesc> params = new ArrayList<ExprNodeDesc>();
+ params.add(inputColumn);
+ GenericUDAFEvaluator genericUDAFEvaluator =
+ SemanticAnalyzer.getGenericUDAFEvaluator(aggregate, params, null, false, false);
+ AggregationDesc agg =
+ new AggregationDesc(aggregate, genericUDAFEvaluator, params, false, GenericUDAFEvaluator.Mode.PARTIAL1);
+ ArrayList<AggregationDesc> aggs = new ArrayList<AggregationDesc>();
+ aggs.add(agg);
+
+ // 2. aggr keys
+ ExprNodeDesc key1 = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "col1", "table", false);
+ ExprNodeDesc key2 = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "col2", "table", false);
+ ArrayList<ExprNodeDesc> keys = new ArrayList<>();
+ keys.add(key1);
+ keys.add(key2);
+
+ // 3. outputCols
+ // @see org.apache.hadoop.hive.ql.exec.GroupByOperator.forward
+ // outputColumnNames, including: group by keys, agg evaluators output cols.
+ ArrayList<String> outputColumnNames = new ArrayList<String>();
+ for (int i = 0; i < keys.size() + aggs.size(); i++) {
+ outputColumnNames.add("_col" + i);
+ }
+ // 4. build GroupByDesc desc
+ GroupByDesc desc = new GroupByDesc();
+ desc.setOutputColumnNames(outputColumnNames);
+ desc.setAggregators(aggs);
+ desc.setKeys(keys);
+ desc.setMode(GroupByDesc.Mode.HASH);
+ desc.setMemoryThreshold(1.0f);
+ desc.setGroupByMemoryUsage(1.0f);
+ // minReductionHashAggr
+ desc.setMinReductionHashAggr(0.5f);
+
+ // 5. Configure hive conf and Build group by operator
+ HiveConf hconf = new HiveConf();
+ HiveConf.setIntVar(hconf, HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL, 1);
+
+ // 6. test hash aggr without grouping sets
+ System.out.println("---------------Begin to test hash group by without grouping sets-------------");
+ int withoutGroupingSetsExpectSize = 3;
+ GroupByOperator op = new GroupByOperator(new CompilationOpContext());
+ op.setConf(desc);
+ testHashAggr(op, hconf, input, withoutGroupingSetsExpectSize);
+
+ // 7. test hash aggr with grouping sets
+ System.out.println("---------------Begin to test hash group by with grouping sets------------");
+ int groupingSetsExpectSize = 6;
+
+ desc.setGroupingSetsPresent(true);
+ ArrayList<Long> groupingSets = new ArrayList<>();
+ // groupingSets
+ groupingSets.add(1L);
+ groupingSets.add(2L);
+ desc.setListGroupingSets(groupingSets);
+ // add grouping sets dummy key
+ ExprNodeDesc groupingSetDummyKey = new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 0L);
+ keys.add(groupingSetDummyKey);
+ desc.setKeys(keys);
+ // groupingSet Position
+ desc.setGroupingSetPosition(2);
+ op = new GroupByOperator(new CompilationOpContext());
+ op.setConf(desc);
+ testHashAggr(op, hconf, input, groupingSetsExpectSize);
+ }
+
+ private void testHashAggr(GroupByOperator op, HiveConf hconf, InspectableObject[] r, int expectOutputSize)
+ throws HiveException {
+ // 1. Collect operator to observe the output of the group by operator
+ CollectDesc cd = new CollectDesc(expectOutputSize + 10);
+ CollectOperator cdop = (CollectOperator) OperatorFactory.getAndMakeChild(cd, op);
+ op.initialize(hconf, new ObjectInspector[] { r[0].oi });
+ // 2. Evaluate on rows and check hashAggr flag
+ for (int i = 0; i < r.length; i++) {
+ op.process(r[i].o, 0);
+ }
+ op.close(false);
+ InspectableObject io = new InspectableObject();
+ int output = 0;
+ // 3. Print group by results
+ do {
+ cdop.retrieve(io);
+ if (io.o != null) {
+ System.out.println("io.o = " + io.o);
+ output++;
+ }
+ } while (io.o != null);
+ // 4. Check partial result size
+ assertEquals(expectOutputSize, output);
+ }
+
+ private InspectableObject[] constructHashAggrInputData(int rowNum, int rowNumWithSameKeys) {
+ InspectableObject[] r;
+ r = new InspectableObject[rowNum];
+ ArrayList<String> names = new ArrayList<String>(3);
+ names.add("col0");
+ names.add("col1");
+ names.add("col2");
+ ArrayList<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>(3);
+ objectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+ objectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+ objectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+ // 3 rows with the same col1, col2
+ for (int i = 0; i < rowNum; i++) {
+ ArrayList<String> data = new ArrayList<String>();
+ data.add("" + i);
+ data.add("" + (i < rowNumWithSameKeys ? -1 : i));
+ data.add("" + (i < rowNumWithSameKeys ? -1 : i));
+ try {
+ r[i] = new InspectableObject();
+ r[i].o = data;
+ r[i].oi = ObjectInspectorFactory.getStandardStructObjectInspector(names, objectInspectors);
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return r;
+ }
+
}