You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/14 04:44:49 UTC
[hive] branch master updated: HIVE-23423 : Check of disabling hash
aggregation ignores grouping set ( Gopal V via Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9f40d7c HIVE-23423 : Check of disabling hash aggregation ignores grouping set ( Gopal V via Ashutosh Chauhan)
9f40d7c is described below
commit 9f40d7cc1d889aa3079f3f494cf810fabe326e44
Author: Gopal V <go...@apache.org>
AuthorDate: Wed May 13 21:42:29 2020 -0700
HIVE-23423 : Check of disabling hash aggregation ignores grouping set ( Gopal V via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../hive/ql/exec/vector/VectorGroupByOperator.java | 21 ++-
.../ql/exec/vector/TestVectorGroupByOperator.java | 207 +++++++++++++++++++--
2 files changed, 209 insertions(+), 19 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index f104c13..d4d18ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -645,9 +645,26 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
LOG.debug(String.format("checkHashModeEfficiency: HT:%d RC:%d MIN:%d",
numEntriesHashTable, sumBatchSize, (long)(sumBatchSize * minReductionHashAggr)));
}
- if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) {
+ /*
+ * The grouping sets expand the hash sizes by producing intermediate keys. 3 grouping sets
+ * of (),(col1),(col1,col2), will turn 10 rows into 30 rows. If the col1 has an nDV of 2 and
+ * col2 has nDV of 5, then this turns into a maximum of 1+3+(2*5) or 14 keys into the
+ * hashtable.
+ *
+ * So you get 10 rows in and 14 rows out, which is a reduction of ~2x vs Streaming mode,
+ * but it is an increase if the grouping-set is not accounted for.
+ *
+ * For performance, it is definitely better to send 14 rows out to shuffle and not 30.
+ *
+ * Particularly if the same nDVs are repeated for a thousand rows, this would send a
+ * thousand rows via streaming to a single reducer which owns the empty grouping set,
+ * instead of sending 1 from the hash.
+ *
+ */
+ final int groupingExpansion = (groupingSets != null) ? groupingSets.length : 1;
+ final long intermediateKeyCount = sumBatchSize * groupingExpansion;
+ if (numEntriesHashTable > intermediateKeyCount * minReductionHashAggr) {
flush(true);
-
changeToStreamingMode();
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
index e8586fc..12df385 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -111,9 +112,10 @@ public class TestVectorGroupByOperator {
String column,
TypeInfo typeInfo) {
- ExprNodeDesc inputColumn = buildColumnDesc(ctx, column, typeInfo);
- ArrayList<ExprNodeDesc> params = new ArrayList<ExprNodeDesc>();
+ TypeInfo[] typeInfos = new TypeInfo[] {typeInfo};
+ ArrayList<ExprNodeDesc> params = new ArrayList<ExprNodeDesc>(1);
+ ExprNodeDesc inputColumn = buildColumnDesc(ctx, column, typeInfo);
params.add(inputColumn);
AggregationDesc agg = new AggregationDesc();
@@ -121,10 +123,7 @@ public class TestVectorGroupByOperator {
agg.setMode(mode);
agg.setParameters(params);
- TypeInfo[] typeInfos = new TypeInfo[] { typeInfo };
-
final GenericUDAFEvaluator evaluator;
- PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
try {
switch (aggregate) {
case "count":
@@ -232,14 +231,13 @@ public class TestVectorGroupByOperator {
return new Pair<GroupByDesc,VectorGroupByDesc>(desc, vectorDesc);
}
-
private static Pair<GroupByDesc,VectorGroupByDesc> buildKeyGroupByDesc(
VectorizationContext ctx,
String aggregate,
String column,
TypeInfo dataTypeInfo,
- String key,
- TypeInfo keyTypeInfo) {
+ String[] keys,
+ TypeInfo[] keyTypeInfos) {
Pair<GroupByDesc,VectorGroupByDesc> pair =
buildGroupByDescType(ctx, aggregate, GenericUDAFEvaluator.Mode.PARTIAL1, column, dataTypeInfo);
@@ -247,10 +245,14 @@ public class TestVectorGroupByOperator {
VectorGroupByDesc vectorDesc = pair.snd;
vectorDesc.setProcessingMode(ProcessingMode.HASH);
- ExprNodeDesc keyExp = buildColumnDesc(ctx, key, keyTypeInfo);
- ArrayList<ExprNodeDesc> keys = new ArrayList<ExprNodeDesc>();
- keys.add(keyExp);
- desc.setKeys(keys);
+ ArrayList<ExprNodeDesc> keyExprs = new ArrayList<ExprNodeDesc>(keys.length);
+ for (int i = 0; i < keys.length; i++) {
+ final String key = keys[i];
+ final TypeInfo keyTypeInfo = keyTypeInfos[i];
+ final ExprNodeDesc keyExp = buildColumnDesc(ctx, key, keyTypeInfo);
+ keyExprs.add(keyExp);
+ }
+ desc.setKeys(keyExprs);
desc.getOutputColumnNames().add("_col1");
@@ -269,7 +271,8 @@ public class TestVectorGroupByOperator {
Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, "max",
"Value", TypeInfoFactory.longTypeInfo,
- "Key", TypeInfoFactory.longTypeInfo);
+ new String[] {"Key"},
+ new TypeInfo[] {TypeInfoFactory.longTypeInfo});
GroupByDesc desc = pair.fst;
VectorGroupByDesc vectorDesc = pair.snd;
@@ -359,7 +362,8 @@ public class TestVectorGroupByOperator {
Pair<GroupByDesc, VectorGroupByDesc> pair = buildKeyGroupByDesc(ctx, "max",
"Value", TypeInfoFactory.longTypeInfo,
- "Key", TypeInfoFactory.longTypeInfo);
+ new String[] {"Key"},
+ new TypeInfo[] {TypeInfoFactory.longTypeInfo});
GroupByDesc desc = pair.fst;
VectorGroupByDesc vectorDesc = pair.snd;
@@ -441,6 +445,170 @@ public class TestVectorGroupByOperator {
}
@Test
+ public void testRollupAggregation() throws HiveException {
+
+ List<String> mapColumnNames = new ArrayList<String>();
+ mapColumnNames.add("k1");
+ mapColumnNames.add("k2");
+ mapColumnNames.add("v");
+ VectorizationContext ctx = new VectorizationContext("name", mapColumnNames);
+
+ // select count(v) from name group by rollup (k1,k2);
+
+ Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, "count",
+ "v", TypeInfoFactory.longTypeInfo,
+ new String[] { "k1", "k2" },
+ new TypeInfo[] {TypeInfoFactory.longTypeInfo, TypeInfoFactory.longTypeInfo});
+ GroupByDesc desc = pair.fst;
+ VectorGroupByDesc vectorDesc = pair.snd;
+
+ desc.setGroupingSetsPresent(true);
+ ArrayList<Long> groupingSets = new ArrayList<>();
+ // groupingSets
+ groupingSets.add(0L);
+ groupingSets.add(1L);
+ groupingSets.add(2L);
+ desc.setListGroupingSets(groupingSets);
+ // add grouping sets dummy key
+ ExprNodeDesc groupingSetDummyKey = new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 0L);
+ // this only works because we used an arraylist in buildKeyGroupByDesc
+ // don't do this in actual compiler
+ desc.getKeys().add(groupingSetDummyKey);
+ // groupingSet Position
+ desc.setGroupingSetPosition(2);
+
+ CompilationOpContext cCtx = new CompilationOpContext();
+
+ desc.setMinReductionHashAggr(0.5f);
+ // Set really low check interval setting
+ hconf.set("hive.groupby.mapaggr.checkinterval", "10");
+ hconf.set("hive.vectorized.groupby.checkinterval", "10");
+
+ Operator<? extends OperatorDesc> groupByOp = OperatorFactory.get(cCtx, desc);
+
+ VectorGroupByOperator vgo =
+ (VectorGroupByOperator) Vectorizer.vectorizeGroupByOperator(groupByOp, ctx, vectorDesc);
+
+ FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo);
+ vgo.initialize(hconf, null);
+
+ this.outputRowCount = 0;
+ out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() {
+ @Override
+ public void inspectRow(Object row, int tag) throws HiveException {
+ ++outputRowCount;
+ }
+ });
+
+ // k1 has nDV of 2
+ Iterable<Object> k1 = new Iterable<Object>() {
+ @Override
+ public Iterator<Object> iterator() {
+ return new Iterator<Object>() {
+ int value = 0;
+ int ndv = 2;
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public Object next() {
+ value = (value + 1) % ndv;
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ }
+ };
+ }
+ };
+
+ // ndv of 5
+ Iterable<Object> k2 = new Iterable<Object>() {
+ @Override
+ public Iterator<Object> iterator() {
+ return new Iterator<Object>() {
+ int value = 0;
+ int ndv = 6;
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public Object next() {
+ value = (value + 1) % ndv;
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ }
+ };
+ }
+ };
+
+ // just return 1, we're running "count"
+ Iterable<Object> v = new Iterable<Object>() {
+ @Override
+ public Iterator<Object> iterator() {
+ return new Iterator<Object>() {
+ int value = 0;
+ int ndv = 1;
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public Object next() {
+ value = (value + 1) % ndv;
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ }
+ };
+ }
+ };
+
+ // vrb of 1 row each
+ FakeVectorRowBatchFromObjectIterables data = new FakeVectorRowBatchFromObjectIterables(
+ 2,
+ new String[] {"long", "long", "long", "long"},
+ k1,
+ k2,
+ v,
+ v); // output col
+
+ long countRowsProduced = 0;
+ for (VectorizedRowBatch unit: data) {
+ // after 24 rows, we'd have seen all the keys
+ // find 14 keys in the hashmap
+ // but 24*0.5 = 12
+ // won't turn off hash mode because of the 3 grouping sets
+ // if it turns off the hash mode, we'd get 14 + 3*(100-24) rows
+ countRowsProduced += unit.size;
+ vgo.process(unit, 0);
+
+ if (countRowsProduced >= 100) {
+ break;
+ }
+
+ }
+ vgo.close(false);
+ // all groupings
+ // 10 keys generates 14 rows with the rollup
+ assertEquals(1+3+10, outputRowCount);
+ }
+
+ @Test
public void testMaxHTEntriesFlush() throws HiveException {
List<String> mapColumnNames = new ArrayList<String>();
@@ -450,7 +618,8 @@ public class TestVectorGroupByOperator {
Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, "max",
"Value", TypeInfoFactory.longTypeInfo,
- "Key", TypeInfoFactory.longTypeInfo);
+ new String[] {"Key"},
+ new TypeInfo[] {TypeInfoFactory.longTypeInfo});
GroupByDesc desc = pair.fst;
VectorGroupByDesc vectorDesc = pair.snd;
@@ -2784,7 +2953,9 @@ public class TestVectorGroupByOperator {
Set<Object> keys = new HashSet<Object>();
Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, aggregateName, "Value",
- TypeInfoFactory.longTypeInfo, "Key", TypeInfoFactory.longTypeInfo);
+ TypeInfoFactory.longTypeInfo,
+ new String[] {"Key"},
+ new TypeInfo[] {TypeInfoFactory.longTypeInfo});
GroupByDesc desc = pair.fst;
VectorGroupByDesc vectorDesc = pair.snd;
@@ -2856,7 +3027,9 @@ public class TestVectorGroupByOperator {
Set<Object> keys = new HashSet<Object>();
Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, aggregateName, "Value",
- dataTypeInfo, "Key", TypeInfoFactory.stringTypeInfo);
+ dataTypeInfo,
+ new String[] {"Key"},
+ new TypeInfo[] {TypeInfoFactory.stringTypeInfo});
GroupByDesc desc = pair.fst;
VectorGroupByDesc vectorDesc = pair.snd;