You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/02 19:12:52 UTC

[hive] branch master updated: HIVE-23356 : Hash aggregation is always disabled while processing querys with grouping sets expressions. (Qiang Kang via Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fd99df  HIVE-23356 : Hash aggregation is always disabled while processing querys with grouping sets expressions. (Qiang Kang via Ashutosh Chauhan)
0fd99df is described below

commit 0fd99df99dc07540d8818d179bcdcb2972f09752
Author: Qiang Kang <tk...@gmail.com>
AuthorDate: Sat May 2 12:12:06 2020 -0700

    HIVE-23356 : Hash aggregation is always disabled while processing querys with grouping sets expressions. (Qiang Kang via Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../hadoop/hive/ql/exec/GroupByOperator.java       |  17 +--
 .../apache/hadoop/hive/ql/exec/TestOperators.java  | 131 +++++++++++++++++++++
 2 files changed, 140 insertions(+), 8 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 7220f33..b94e3fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -739,20 +739,21 @@ public class GroupByOperator extends Operator<GroupByDesc> implements IConfigure
       // if hash aggregation is not behaving properly, disable it
       if (numRowsInput == numRowsCompareHashAggr) {
         numRowsCompareHashAggr += groupbyMapAggrInterval;
+        long numRowsProcessed = groupingSetsPresent ? numRowsInput * groupingSets.size() : numRowsInput;
         // map-side aggregation should reduce the entries by at-least half
-        if (numRowsHashTbl > numRowsInput * minReductionHashAggr) {
+        if (numRowsHashTbl > numRowsProcessed * minReductionHashAggr) {
           LOG.warn("Disable Hash Aggr: #hash table = " + numRowsHashTbl
-              + " #total = " + numRowsInput + " reduction = " + 1.0
-              * (numRowsHashTbl / numRowsInput) + " minReduction = "
-              + minReductionHashAggr);
+              + " #numRowsInput = " + numRowsInput + " reduction = " + 1.0 * (numRowsHashTbl / numRowsProcessed)
+              + " minReduction = " + minReductionHashAggr + " groupingSetsPresent = " + groupingSetsPresent
+              + " numRowsProcessed = " + numRowsProcessed);
           flushHashTable(true);
           hashAggr = false;
         } else {
           if (LOG.isTraceEnabled()) {
-            LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl
-                + " #total = " + numRowsInput + " reduction = " + 1.0
-                * (numRowsHashTbl / numRowsInput) + " minReduction = "
-                + minReductionHashAggr);
+            LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl + " #numRowsInput = " + numRowsInput
+                + " reduction = " + 1.0 * (numRowsHashTbl / numRowsProcessed) + " minReduction = "
+                + minReductionHashAggr + " groupingSetsPresent = " + groupingSetsPresent + " numRowsProcessed = "
+                + numRowsProcessed);
           }
         }
       }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
index 8a0606b..3c0a7eb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
@@ -32,14 +32,18 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.IOContextMap;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
 import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.type.ExprNodeTypeCheck;
 import org.apache.hadoop.hive.ql.parse.type.TypeCheckProcFactory;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -49,6 +53,7 @@ import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -525,4 +530,130 @@ public class TestOperators {
     assertEquals(5,
         convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, null).getMaxExecutorsOverSubscribeMemory());
   }
+
+  @Test public void testHashGroupBy() throws HiveException {
+    InspectableObject[] input = constructHashAggrInputData(5, 3);
+    System.out.println("---------------Begin to Construct Groupby Desc-------------");
+    // 1. Build AggregationDesc
+    String aggregate = "MAX";
+    ExprNodeDesc inputColumn = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "col0", "table", false);
+    ArrayList<ExprNodeDesc> params = new ArrayList<ExprNodeDesc>();
+    params.add(inputColumn);
+    GenericUDAFEvaluator genericUDAFEvaluator =
+        SemanticAnalyzer.getGenericUDAFEvaluator(aggregate, params, null, false, false);
+    AggregationDesc agg =
+        new AggregationDesc(aggregate, genericUDAFEvaluator, params, false, GenericUDAFEvaluator.Mode.PARTIAL1);
+    ArrayList<AggregationDesc> aggs = new ArrayList<AggregationDesc>();
+    aggs.add(agg);
+
+    // 2. aggr keys
+    ExprNodeDesc key1 = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "col1", "table", false);
+    ExprNodeDesc key2 = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "col2", "table", false);
+    ArrayList<ExprNodeDesc> keys = new ArrayList<>();
+    keys.add(key1);
+    keys.add(key2);
+
+    // 3. outputCols
+    // @see org.apache.hadoop.hive.ql.exec.GroupByOperator.forward
+    // outputColumnNames, including: group by keys, agg evaluators output cols.
+    ArrayList<String> outputColumnNames = new ArrayList<String>();
+    for (int i = 0; i < keys.size() + aggs.size(); i++) {
+      outputColumnNames.add("_col" + i);
+    }
+    // 4. build GroupByDesc desc
+    GroupByDesc desc = new GroupByDesc();
+    desc.setOutputColumnNames(outputColumnNames);
+    desc.setAggregators(aggs);
+    desc.setKeys(keys);
+    desc.setMode(GroupByDesc.Mode.HASH);
+    desc.setMemoryThreshold(1.0f);
+    desc.setGroupByMemoryUsage(1.0f);
+    // minReductionHashAggr
+    desc.setMinReductionHashAggr(0.5f);
+
+    // 5. Configure hive conf and  Build group by operator
+    HiveConf hconf = new HiveConf();
+    HiveConf.setIntVar(hconf, HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL, 1);
+
+    // 6. test hash aggr without grouping sets
+    System.out.println("---------------Begin to test hash group by without grouping sets-------------");
+    int withoutGroupingSetsExpectSize = 3;
+    GroupByOperator op = new GroupByOperator(new CompilationOpContext());
+    op.setConf(desc);
+    testHashAggr(op, hconf, input, withoutGroupingSetsExpectSize);
+
+    // 7. test hash aggr with  grouping sets
+    System.out.println("---------------Begin to test hash group by with grouping sets------------");
+    int groupingSetsExpectSize = 6;
+
+    desc.setGroupingSetsPresent(true);
+    ArrayList<Long> groupingSets = new ArrayList<>();
+    // groupingSets
+    groupingSets.add(1L);
+    groupingSets.add(2L);
+    desc.setListGroupingSets(groupingSets);
+    // add grouping sets dummy key
+    ExprNodeDesc groupingSetDummyKey = new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 0L);
+    keys.add(groupingSetDummyKey);
+    desc.setKeys(keys);
+    // groupingSet Position
+    desc.setGroupingSetPosition(2);
+    op = new GroupByOperator(new CompilationOpContext());
+    op.setConf(desc);
+    testHashAggr(op, hconf, input, groupingSetsExpectSize);
+  }
+
+  private void testHashAggr(GroupByOperator op, HiveConf hconf, InspectableObject[] r, int expectOutputSize)
+      throws HiveException {
+    // 1. Collect operator to observe the output of the group by operator
+    CollectDesc cd = new CollectDesc(expectOutputSize + 10);
+    CollectOperator cdop = (CollectOperator) OperatorFactory.getAndMakeChild(cd, op);
+    op.initialize(hconf, new ObjectInspector[] { r[0].oi });
+    // 2. Evaluate on rows and check hashAggr flag
+    for (int i = 0; i < r.length; i++) {
+      op.process(r[i].o, 0);
+    }
+    op.close(false);
+    InspectableObject io = new InspectableObject();
+    int output = 0;
+    // 3. Print group by results
+    do {
+      cdop.retrieve(io);
+      if (io.o != null) {
+        System.out.println("io.o = " + io.o);
+        output++;
+      }
+    } while (io.o != null);
+    // 4. Check partial result size
+    assertEquals(expectOutputSize, output);
+  }
+
+  private InspectableObject[] constructHashAggrInputData(int rowNum, int rowNumWithSameKeys) {
+    InspectableObject[] r;
+    r = new InspectableObject[rowNum];
+    ArrayList<String> names = new ArrayList<String>(3);
+    names.add("col0");
+    names.add("col1");
+    names.add("col2");
+    ArrayList<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>(3);
+    objectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+    objectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+    objectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+    // 3 rows with the same col1, col2
+    for (int i = 0; i < rowNum; i++) {
+      ArrayList<String> data = new ArrayList<String>();
+      data.add("" + i);
+      data.add("" + (i < rowNumWithSameKeys ? -1 : i));
+      data.add("" + (i < rowNumWithSameKeys ? -1 : i));
+      try {
+        r[i] = new InspectableObject();
+        r[i].o = data;
+        r[i].oi = ObjectInspectorFactory.getStandardStructObjectInspector(names, objectInspectors);
+      } catch (Throwable e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return r;
+  }
+
 }