You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/14 04:44:49 UTC

[hive] branch master updated: HIVE-23423 : Check of disabling hash aggregation ignores grouping set ( Gopal V via Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f40d7c  HIVE-23423 : Check of disabling hash aggregation ignores grouping set ( Gopal V via Ashutosh Chauhan)
9f40d7c is described below

commit 9f40d7cc1d889aa3079f3f494cf810fabe326e44
Author: Gopal V <go...@apache.org>
AuthorDate: Wed May 13 21:42:29 2020 -0700

    HIVE-23423 : Check of disabling hash aggregation ignores grouping set ( Gopal V via Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../hive/ql/exec/vector/VectorGroupByOperator.java |  21 ++-
 .../ql/exec/vector/TestVectorGroupByOperator.java  | 207 +++++++++++++++++++--
 2 files changed, 209 insertions(+), 19 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index f104c13..d4d18ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -645,9 +645,26 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
           LOG.debug(String.format("checkHashModeEfficiency: HT:%d RC:%d MIN:%d",
               numEntriesHashTable, sumBatchSize, (long)(sumBatchSize * minReductionHashAggr)));
         }
-        if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) {
+        /*
+         * The grouping sets expand the hash sizes by producing intermediate keys. 3 grouping sets
+         * of (),(col1),(col1,col2), will turn 10 rows into 30 rows. If the col1 has an nDV of 2 and
+         * col2 has nDV of 5, then this turns into a maximum of 1+3+(2*5) or 14 keys into the
+         * hashtable.
+         * 
+         * So you get 10 rows in and 14 rows out, which is a reduction of ~2x vs Streaming mode,
+         * but it is an increase if the grouping-set is not accounted for.
+         * 
+         * For performance, it is definitely better to send 14 rows out to shuffle and not 30.
+         * 
+         * Particularly if the same nDVs are repeated for a thousand rows, this would send a
+         * thousand rows via streaming to a single reducer which owns the empty grouping set,
+         * instead of sending 1 from the hash.
+         * 
+         */
+        final int groupingExpansion = (groupingSets != null) ? groupingSets.length : 1;
+        final long intermediateKeyCount = sumBatchSize * groupingExpansion;
+        if (numEntriesHashTable > intermediateKeyCount * minReductionHashAggr) {
           flush(true);
-
           changeToStreamingMode();
         }
       }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
index e8586fc..12df385 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -111,9 +112,10 @@ public class TestVectorGroupByOperator {
       String column,
       TypeInfo typeInfo) {
 
-    ExprNodeDesc inputColumn = buildColumnDesc(ctx, column, typeInfo);
 
-    ArrayList<ExprNodeDesc> params = new ArrayList<ExprNodeDesc>();
+    TypeInfo[] typeInfos = new TypeInfo[] {typeInfo};
+    ArrayList<ExprNodeDesc> params = new ArrayList<ExprNodeDesc>(1);
+    ExprNodeDesc inputColumn = buildColumnDesc(ctx, column, typeInfo);
     params.add(inputColumn);
 
     AggregationDesc agg = new AggregationDesc();
@@ -121,10 +123,7 @@ public class TestVectorGroupByOperator {
     agg.setMode(mode);
     agg.setParameters(params);
 
-    TypeInfo[] typeInfos = new TypeInfo[] { typeInfo };
-
     final GenericUDAFEvaluator evaluator;
-    PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
     try {
       switch (aggregate) {
       case "count":
@@ -232,14 +231,13 @@ public class TestVectorGroupByOperator {
     return new Pair<GroupByDesc,VectorGroupByDesc>(desc, vectorDesc);
   }
 
-
   private static Pair<GroupByDesc,VectorGroupByDesc> buildKeyGroupByDesc(
       VectorizationContext ctx,
       String aggregate,
       String column,
       TypeInfo dataTypeInfo,
-      String key,
-      TypeInfo keyTypeInfo) {
+      String[] keys,
+      TypeInfo[] keyTypeInfos) {
 
     Pair<GroupByDesc,VectorGroupByDesc> pair =
         buildGroupByDescType(ctx, aggregate, GenericUDAFEvaluator.Mode.PARTIAL1, column, dataTypeInfo);
@@ -247,10 +245,14 @@ public class TestVectorGroupByOperator {
     VectorGroupByDesc vectorDesc = pair.snd;
     vectorDesc.setProcessingMode(ProcessingMode.HASH);
 
-    ExprNodeDesc keyExp = buildColumnDesc(ctx, key, keyTypeInfo);
-    ArrayList<ExprNodeDesc> keys = new ArrayList<ExprNodeDesc>();
-    keys.add(keyExp);
-    desc.setKeys(keys);
+    ArrayList<ExprNodeDesc> keyExprs = new ArrayList<ExprNodeDesc>(keys.length);
+    for (int i = 0; i < keys.length; i++) {
+      final String key = keys[i];
+      final TypeInfo keyTypeInfo = keyTypeInfos[i];
+      final ExprNodeDesc keyExp = buildColumnDesc(ctx, key, keyTypeInfo);
+      keyExprs.add(keyExp);
+    }
+    desc.setKeys(keyExprs);
 
     desc.getOutputColumnNames().add("_col1");
 
@@ -269,7 +271,8 @@ public class TestVectorGroupByOperator {
 
     Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, "max",
         "Value", TypeInfoFactory.longTypeInfo,
-        "Key", TypeInfoFactory.longTypeInfo);
+        new String[] {"Key"},
+        new TypeInfo[] {TypeInfoFactory.longTypeInfo});
     GroupByDesc desc = pair.fst;
     VectorGroupByDesc vectorDesc = pair.snd;
 
@@ -359,7 +362,8 @@ public class TestVectorGroupByOperator {
 
       Pair<GroupByDesc, VectorGroupByDesc> pair = buildKeyGroupByDesc(ctx, "max",
         "Value", TypeInfoFactory.longTypeInfo,
-        "Key", TypeInfoFactory.longTypeInfo);
+        new String[] {"Key"},
+        new TypeInfo[] {TypeInfoFactory.longTypeInfo});
       GroupByDesc desc = pair.fst;
       VectorGroupByDesc vectorDesc = pair.snd;
 
@@ -441,6 +445,170 @@ public class TestVectorGroupByOperator {
   }
 
   @Test
+  public void testRollupAggregation() throws HiveException {
+
+    List<String> mapColumnNames = new ArrayList<String>();
+    mapColumnNames.add("k1");
+    mapColumnNames.add("k2");
+    mapColumnNames.add("v");
+    VectorizationContext ctx = new VectorizationContext("name", mapColumnNames);
+
+    // select count(v) from name group by rollup (k1,k2);
+
+    Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, "count",
+        "v", TypeInfoFactory.longTypeInfo,
+        new String[] { "k1", "k2" },
+        new TypeInfo[] {TypeInfoFactory.longTypeInfo, TypeInfoFactory.longTypeInfo});
+    GroupByDesc desc = pair.fst;
+    VectorGroupByDesc vectorDesc = pair.snd;
+
+    desc.setGroupingSetsPresent(true);
+    ArrayList<Long> groupingSets = new ArrayList<>();
+    // groupingSets
+    groupingSets.add(0L);
+    groupingSets.add(1L);
+    groupingSets.add(2L);
+    desc.setListGroupingSets(groupingSets);
+    // add grouping sets dummy key
+    ExprNodeDesc groupingSetDummyKey = new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 0L);
+    // this only works because we used an arraylist in buildKeyGroupByDesc
+    // don't do this in actual compiler
+    desc.getKeys().add(groupingSetDummyKey);
+    // groupingSet Position
+    desc.setGroupingSetPosition(2);
+
+    CompilationOpContext cCtx = new CompilationOpContext();
+
+    desc.setMinReductionHashAggr(0.5f);
+    // Set really low check interval setting
+    hconf.set("hive.groupby.mapaggr.checkinterval", "10");
+    hconf.set("hive.vectorized.groupby.checkinterval", "10");
+
+    Operator<? extends OperatorDesc> groupByOp = OperatorFactory.get(cCtx, desc);
+
+    VectorGroupByOperator vgo =
+        (VectorGroupByOperator) Vectorizer.vectorizeGroupByOperator(groupByOp, ctx, vectorDesc);
+
+    FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo);
+    vgo.initialize(hconf, null);
+
+    this.outputRowCount = 0;
+    out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() {
+      @Override
+      public void inspectRow(Object row, int tag) throws HiveException {
+        ++outputRowCount;
+      }
+    });
+
+    // k1 has nDV of 2
+    Iterable<Object> k1 = new Iterable<Object>() {
+      @Override
+      public Iterator<Object> iterator() {
+        return new Iterator<Object>() {
+          int value = 0;
+          int ndv = 2;
+
+          @Override
+          public boolean hasNext() {
+            return true;
+          }
+
+          @Override
+          public Object next() {
+            value = (value + 1) % ndv;
+            return value;
+          }
+
+          @Override
+          public void remove() {
+          }
+        };
+      }
+    };
+
+    // ndv of 5
+    Iterable<Object> k2 = new Iterable<Object>() {
+      @Override
+      public Iterator<Object> iterator() {
+        return new Iterator<Object>() {
+          int value = 0;
+          int ndv = 6;
+
+          @Override
+          public boolean hasNext() {
+            return true;
+          }
+
+          @Override
+          public Object next() {
+            value = (value + 1) % ndv;
+            return value;
+          }
+
+          @Override
+          public void remove() {
+          }
+        };
+      }
+    };
+
+    // just return 1, we're running "count"
+    Iterable<Object> v = new Iterable<Object>() {
+      @Override
+      public Iterator<Object> iterator() {
+        return new Iterator<Object>() {
+          int value = 0;
+          int ndv = 1;
+
+          @Override
+          public boolean hasNext() {
+            return true;
+          }
+
+          @Override
+          public Object next() {
+            value = (value + 1) % ndv;
+            return value;
+          }
+
+          @Override
+          public void remove() {
+          }
+        };
+      }
+    };
+
+    // vrb of 1 row each
+    FakeVectorRowBatchFromObjectIterables data = new FakeVectorRowBatchFromObjectIterables(
+        2,
+        new String[] {"long", "long", "long", "long"},
+        k1,
+        k2,
+        v,
+        v); // output col
+
+    long countRowsProduced = 0;
+    for (VectorizedRowBatch unit: data) {
+      // after 24 rows, we'd have seen all the keys
+      // find 14 keys in the hashmap
+      // but 24*0.5 = 12
+      // won't turn off hash mode because of the 3 grouping sets
+      // if it turns off the hash mode, we'd get 14 + 3*(100-24) rows
+      countRowsProduced += unit.size;
+      vgo.process(unit,  0);
+
+      if (countRowsProduced >= 100) {
+        break;
+      }
+
+    }
+    vgo.close(false);
+    // all groupings
+    // 10 keys generates 14 rows with the rollup
+    assertEquals(1+3+10, outputRowCount);
+  }
+
+  @Test
   public void testMaxHTEntriesFlush() throws HiveException {
 
     List<String> mapColumnNames = new ArrayList<String>();
@@ -450,7 +618,8 @@ public class TestVectorGroupByOperator {
 
     Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, "max",
         "Value", TypeInfoFactory.longTypeInfo,
-        "Key", TypeInfoFactory.longTypeInfo);
+        new String[] {"Key"},
+        new TypeInfo[] {TypeInfoFactory.longTypeInfo});
     GroupByDesc desc = pair.fst;
     VectorGroupByDesc vectorDesc = pair.snd;
 
@@ -2784,7 +2953,9 @@ public class TestVectorGroupByOperator {
     Set<Object> keys = new HashSet<Object>();
 
     Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, aggregateName, "Value",
-        TypeInfoFactory.longTypeInfo, "Key", TypeInfoFactory.longTypeInfo);
+        TypeInfoFactory.longTypeInfo,
+        new String[] {"Key"},
+        new TypeInfo[] {TypeInfoFactory.longTypeInfo});
     GroupByDesc desc = pair.fst;
     VectorGroupByDesc vectorDesc = pair.snd;
 
@@ -2856,7 +3027,9 @@ public class TestVectorGroupByOperator {
     Set<Object> keys = new HashSet<Object>();
 
     Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, aggregateName, "Value",
-       dataTypeInfo, "Key", TypeInfoFactory.stringTypeInfo);
+       dataTypeInfo,
+       new String[] {"Key"},
+       new TypeInfo[] {TypeInfoFactory.stringTypeInfo});
     GroupByDesc desc = pair.fst;
     VectorGroupByDesc vectorDesc = pair.snd;