You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2019/11/08 08:09:10 UTC

[hive] branch master updated: HIVE-22448: CBO: Expand the multiple count distinct with a group-by key (Jesus Camacho Rodriguez, reviewed by Vineet Garg)

This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e7d2cd2  HIVE-22448: CBO: Expand the multiple count distinct with a group-by key (Jesus Camacho Rodriguez, reviewed by Vineet Garg)
e7d2cd2 is described below

commit e7d2cd2c05abf4ac52393095cf93c74833f640df
Author: Jesus Camacho Rodriguez <jc...@apache.org>
AuthorDate: Fri Nov 1 18:54:48 2019 -0700

    HIVE-22448: CBO: Expand the multiple count distinct with a group-by key (Jesus Camacho Rodriguez, reviewed by Vineet Garg)
    
    Close apache/hive#838
---
 .../test/resources/testconfiguration.properties    |   1 +
 .../rules/HiveExpandDistinctAggregatesRule.java    |  96 ++++----
 .../queries/clientpositive/multigroupbydistinct.q  |  60 +++++
 .../clientpositive/llap/limit_pushdown.q.out       |  57 +++--
 .../clientpositive/llap/limit_pushdown3.q.out      |  49 ++--
 .../clientpositive/llap/multigroupbydistinct.q.out | 271 +++++++++++++++++++++
 .../llap/offset_limit_ppd_optimizer.q.out          |  55 +++--
 .../llap/reduce_deduplicate_distinct.q.out         | 254 ++++++++++++-------
 .../spark/auto_join18_multi_distinct.q.out         |  44 ++--
 .../spark/join18_multi_distinct.q.out              |  44 ++--
 .../clientpositive/spark/limit_pushdown.q.out      |  55 +++--
 11 files changed, 723 insertions(+), 263 deletions(-)

diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index e60c4c5..50dcf40 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -640,6 +640,7 @@ minillaplocal.query.files=\
   mm_loaddata.q,\
   mm_loaddata_split_change.q,\
   mrr.q,\
+  multigroupbydistinct.q,\
   multiMapJoin1.q,\
   multiMapJoin2.q,\
   multi_in_clause.q,\
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveExpandDistinctAggregatesRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveExpandDistinctAggregatesRule.java
index 103d5e1..e8b2c37 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveExpandDistinctAggregatesRule.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveExpandDistinctAggregatesRule.java
@@ -16,21 +16,22 @@
  */
 package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
 
+import com.google.common.base.Preconditions;
 import java.math.BigDecimal;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import java.util.stream.Collectors;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Aggregate.Group;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.metadata.RelColumnOrigin;
@@ -44,7 +45,6 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.Util;
 import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
@@ -58,7 +58,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.math.IntMath;
@@ -112,7 +111,7 @@ public final class HiveExpandDistinctAggregatesRule extends RelOptRule {
   public void onMatch(RelOptRuleCall call) {
     final Aggregate aggregate = call.rel(0);
     int numCountDistinct = getNumCountDistinctCall(aggregate);
-    if (numCountDistinct == 0) {
+    if (numCountDistinct == 0 || aggregate.getGroupType() != Group.SIMPLE) {
       return;
     }
 
@@ -121,7 +120,8 @@ public final class HiveExpandDistinctAggregatesRule extends RelOptRule {
     int nonDistinctCount = 0;
     List<List<Integer>> argListList = new ArrayList<List<Integer>>();
     Set<List<Integer>> argListSets = new LinkedHashSet<List<Integer>>();
-    Set<Integer> positions = new HashSet<>();
+    ImmutableBitSet.Builder newGroupSet = ImmutableBitSet.builder();
+    newGroupSet.addAll(aggregate.getGroupSet());
     for (AggregateCall aggCall : aggregate.getAggCallList()) {
       if (!aggCall.isDistinct()) {
         ++nonDistinctCount;
@@ -130,33 +130,27 @@ public final class HiveExpandDistinctAggregatesRule extends RelOptRule {
       ArrayList<Integer> argList = new ArrayList<Integer>();
       for (Integer arg : aggCall.getArgList()) {
         argList.add(arg);
-        positions.add(arg);
+        newGroupSet.set(arg);
       }
       // Aggr checks for sorted argList.
       argListList.add(argList);
       argListSets.add(argList);
     }
-    Util.permAssert(argListSets.size() > 0, "containsDistinctCall lied");
+    Preconditions.checkArgument(argListSets.size() > 0, "containsDistinctCall lied");
 
-    if (numCountDistinct > 1 && numCountDistinct == aggregate.getAggCallList().size()
-        && aggregate.getGroupSet().isEmpty()) {
+    if (numCountDistinct > 1 && numCountDistinct == aggregate.getAggCallList().size()) {
       LOG.debug("Trigger countDistinct rewrite. numCountDistinct is " + numCountDistinct);
       // now positions contains all the distinct positions, i.e., $5, $4, $6
       // we need to first sort them as group by set
       // and then get their position later, i.e., $4->1, $5->2, $6->3
       cluster = aggregate.getCluster();
       rexBuilder = cluster.getRexBuilder();
-      RelNode converted = null;
-      List<Integer> sourceOfForCountDistinct = new ArrayList<>();
-      sourceOfForCountDistinct.addAll(positions);
-      Collections.sort(sourceOfForCountDistinct);
       try {
-        converted = convert(aggregate, argListList, sourceOfForCountDistinct);
+        call.transformTo(convert(aggregate, argListList, newGroupSet.build()));
       } catch (CalciteSemanticException e) {
         LOG.debug(e.toString());
         throw new RuntimeException(e);
       }
-      call.transformTo(converted);
       return;
     }
 
@@ -200,19 +194,23 @@ public final class HiveExpandDistinctAggregatesRule extends RelOptRule {
    * (department_id, gender, education_level))subq;
    * @throws CalciteSemanticException 
    */
-  private RelNode convert(Aggregate aggregate, List<List<Integer>> argList, List<Integer> sourceOfForCountDistinct) throws CalciteSemanticException {
+  private RelNode convert(Aggregate aggregate, List<List<Integer>> argList, ImmutableBitSet newGroupSet)
+      throws CalciteSemanticException {
     // we use this map to map the position of argList to the position of grouping set
     Map<Integer, Integer> map = new HashMap<>();
     List<List<Integer>> cleanArgList = new ArrayList<>();
-    final Aggregate groupingSets = createGroupingSets(aggregate, argList, cleanArgList, map, sourceOfForCountDistinct);
-    return createCount(groupingSets, argList, cleanArgList, map, sourceOfForCountDistinct);
+    final Aggregate groupingSets = createGroupingSets(aggregate, argList, cleanArgList, map, newGroupSet);
+    return createCount(groupingSets, argList, cleanArgList, map, aggregate.getGroupSet(), newGroupSet);
   }
 
-  private int getGroupingIdValue(List<Integer> list, List<Integer> sourceOfForCountDistinct,
+  private int getGroupingIdValue(List<Integer> list, ImmutableBitSet originalGroupSet, ImmutableBitSet newGroupSet,
           int groupCount) {
     int ind = IntMath.pow(2, groupCount) - 1;
+    for (int pos : originalGroupSet) {
+      ind &= ~(1 << groupCount - newGroupSet.indexOf(pos) - 1);
+    }
     for (int i : list) {
-      ind &= ~(1 << groupCount - sourceOfForCountDistinct.indexOf(i) - 1);
+      ind &= ~(1 << groupCount - newGroupSet.indexOf(i) - 1);
     }
     return ind;
   }
@@ -222,28 +220,28 @@ public final class HiveExpandDistinctAggregatesRule extends RelOptRule {
    * @param argList: the original argList in aggregate
    * @param cleanArgList: the new argList without duplicates
    * @param map: the mapping from the original argList to the new argList
-   * @param sourceOfForCountDistinct: the sorted positions of groupset
+   * @param newGroupSet: the sorted positions of groupset
    * @return
    * @throws CalciteSemanticException
    */
   private RelNode createCount(Aggregate aggr, List<List<Integer>> argList,
       List<List<Integer>> cleanArgList, Map<Integer, Integer> map,
-      List<Integer> sourceOfForCountDistinct) throws CalciteSemanticException {
-    List<RexNode> originalInputRefs = Lists.transform(aggr.getRowType().getFieldList(),
-        new Function<RelDataTypeField, RexNode>() {
-          @Override
-          public RexNode apply(RelDataTypeField input) {
-            return new RexInputRef(input.getIndex(), input.getType());
-          }
-        });
+      ImmutableBitSet originalGroupSet, ImmutableBitSet newGroupSet) throws CalciteSemanticException {
+    final List<RexNode> originalInputRefs = aggr.getRowType().getFieldList()
+        .stream()
+        .map(input -> new RexInputRef(input.getIndex(), input.getType()))
+        .collect(Collectors.toList());
     final List<RexNode> gbChildProjLst = Lists.newArrayList();
     // for singular arg, count should not include null
     // e.g., count(case when i=1 and department_id is not null then 1 else null end) as c0, 
     // for non-singular args, count can include null, i.e. (,) is counted as 1
     for (List<Integer> list : cleanArgList) {
-      RexNode condition = rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, originalInputRefs
-          .get(originalInputRefs.size() - 1), rexBuilder.makeExactLiteral(new BigDecimal(
-          getGroupingIdValue(list, sourceOfForCountDistinct, aggr.getGroupCount()))));
+      RexNode condition = rexBuilder.makeCall(
+          SqlStdOperatorTable.EQUALS,
+          originalInputRefs.get(originalInputRefs.size() - 1),
+          rexBuilder.makeExactLiteral(
+              new BigDecimal(
+                  getGroupingIdValue(list, originalGroupSet, newGroupSet, aggr.getGroupCount()))));
       if (list.size() == 1) {
         int pos = list.get(0);
         RexNode notNull = rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL,
@@ -257,6 +255,10 @@ public final class HiveExpandDistinctAggregatesRule extends RelOptRule {
       gbChildProjLst.add(when);
     }
 
+    for (int pos : originalGroupSet) {
+      gbChildProjLst.add(originalInputRefs.get(newGroupSet.indexOf(pos)));
+    }
+
     // create the project before GB
     RelNode gbInputRel = HiveProject.create(aggr, gbChildProjLst, null);
 
@@ -269,23 +271,25 @@ public final class HiveExpandDistinctAggregatesRule extends RelOptRule {
           TypeInfoFactory.longTypeInfo, i, aggFnRetType);
       aggregateCalls.add(aggregateCall);
     }
+    ImmutableBitSet groupSet =
+        ImmutableBitSet.range(cleanArgList.size(), cleanArgList.size() + originalGroupSet.cardinality());
     Aggregate aggregate = new HiveAggregate(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), gbInputRel,
-        ImmutableBitSet.of(), null, aggregateCalls);
+        groupSet, null, aggregateCalls);
 
     // create the project after GB. For those repeated values, e.g., select
     // count(distinct x, y), count(distinct y, x), we find the correct mapping.
     if (map.isEmpty()) {
       return aggregate;
     } else {
-      List<RexNode> originalAggrRefs = Lists.transform(aggregate.getRowType().getFieldList(),
-          new Function<RelDataTypeField, RexNode>() {
-            @Override
-            public RexNode apply(RelDataTypeField input) {
-              return new RexInputRef(input.getIndex(), input.getType());
-            }
-          });
+      final List<RexNode> originalAggrRefs = aggregate.getRowType().getFieldList()
+          .stream()
+          .map(input -> new RexInputRef(input.getIndex(), input.getType()))
+          .collect(Collectors.toList());
       final List<RexNode> projLst = Lists.newArrayList();
       int index = 0;
+      for (int i = 0; i < groupSet.cardinality(); i++) {
+        projLst.add(originalAggrRefs.get(index++));
+      }
       for (int i = 0; i < argList.size(); i++) {
         if (map.containsKey(i)) {
           projLst.add(originalAggrRefs.get(map.get(i)));
@@ -302,18 +306,18 @@ public final class HiveExpandDistinctAggregatesRule extends RelOptRule {
    * @param argList: the original argList in aggregate
    * @param cleanArgList: the new argList without duplicates
    * @param map: the mapping from the original argList to the new argList
-   * @param sourceOfForCountDistinct: the sorted positions of groupset
+   * @param groupSet: new group set
    * @return
    */
   private Aggregate createGroupingSets(Aggregate aggregate, List<List<Integer>> argList,
       List<List<Integer>> cleanArgList, Map<Integer, Integer> map,
-      List<Integer> sourceOfForCountDistinct) {
-    final ImmutableBitSet groupSet = ImmutableBitSet.of(sourceOfForCountDistinct);
+      ImmutableBitSet groupSet) {
     final List<ImmutableBitSet> origGroupSets = new ArrayList<>();
 
     for (int i = 0; i < argList.size(); i++) {
       List<Integer> list = argList.get(i);
-      ImmutableBitSet bitSet = ImmutableBitSet.of(list);
+      ImmutableBitSet bitSet = aggregate.getGroupSet().union(
+          ImmutableBitSet.of(list));
       int prev = origGroupSets.indexOf(bitSet);
       if (prev == -1) {
         origGroupSets.add(bitSet);
@@ -323,7 +327,7 @@ public final class HiveExpandDistinctAggregatesRule extends RelOptRule {
       }
     }
     // Calcite expects the grouping sets sorted and without duplicates
-    Collections.sort(origGroupSets, ImmutableBitSet.COMPARATOR);
+    origGroupSets.sort(ImmutableBitSet.COMPARATOR);
 
     List<AggregateCall> aggregateCalls = new ArrayList<AggregateCall>();
     // Create GroupingID column
diff --git a/ql/src/test/queries/clientpositive/multigroupbydistinct.q b/ql/src/test/queries/clientpositive/multigroupbydistinct.q
new file mode 100644
index 0000000..dbd81ba
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/multigroupbydistinct.q
@@ -0,0 +1,60 @@
+create table tabw4intcols (x integer, y integer, z integer, a integer);
+insert into tabw4intcols values (1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3), (4, 4, 4, 4),
+  (1, 2, 1, 2), (2, 3, 2, 3), (3, 4, 3, 4), (4, 1, 4, 1),
+  (1, 2, 3, 4), (4, 3, 2, 1), (1, 2, 3, 4), (4, 3, 2, 1);
+
+explain cbo
+select z, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z;
+
+select z, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z;
+
+explain cbo
+select z, x, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z, x;
+
+select z, x, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z, x;
+
+explain cbo
+select x, z, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z, x;
+
+select x, z, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z, x;
+
+explain cbo
+select x, a, y, count(distinct z)
+from tabw4intcols
+group by a, x, y;
+
+select x, a, y, count(distinct z)
+from tabw4intcols
+group by a, x, y;
+
+explain cbo
+select x, count(distinct y), z, count(distinct a)
+from tabw4intcols
+group by z, x;
+
+select x, count(distinct y), z, count(distinct a)
+from tabw4intcols
+group by z, x;
+
+explain cbo
+select count(distinct y), x, z, count(distinct a)
+from tabw4intcols
+group by z, x;
+
+select count(distinct y), x, z, count(distinct a)
+from tabw4intcols
+group by z, x;
+
+drop table tabw4intcols;
diff --git a/ql/src/test/results/clientpositive/llap/limit_pushdown.q.out b/ql/src/test/results/clientpositive/llap/limit_pushdown.q.out
index 041bb28..23038f0 100644
--- a/ql/src/test/results/clientpositive/llap/limit_pushdown.q.out
+++ b/ql/src/test/results/clientpositive/llap/limit_pushdown.q.out
@@ -734,42 +734,49 @@ STAGE PLANS:
                   Statistics: Num rows: 12288 Data size: 1779850 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: ctinyint (type: tinyint), cstring1 (type: string), cstring2 (type: string)
-                    outputColumnNames: ctinyint, cstring1, cstring2
+                    outputColumnNames: _col0, _col1, _col2
                     Statistics: Num rows: 12288 Data size: 1779850 Basic stats: COMPLETE Column stats: COMPLETE
                     Group By Operator
-                      aggregations: count(DISTINCT cstring1), count(DISTINCT cstring2)
-                      keys: ctinyint (type: tinyint), cstring1 (type: string), cstring2 (type: string)
+                      keys: _col0 (type: tinyint), _col1 (type: string), _col2 (type: string), 0L (type: bigint)
                       minReductionHashAggr: 0.0
                       mode: hash
-                      outputColumnNames: _col0, _col1, _col2, _col3, _col4
-                      Statistics: Num rows: 12288 Data size: 1976458 Basic stats: COMPLETE Column stats: COMPLETE
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 24576 Data size: 3756114 Basic stats: COMPLETE Column stats: COMPLETE
                       Reduce Output Operator
-                        key expressions: _col0 (type: tinyint), _col1 (type: string), _col2 (type: string)
-                        sort order: +++
+                        key expressions: _col0 (type: tinyint), _col1 (type: string), _col2 (type: string), _col3 (type: bigint)
+                        sort order: ++++
                         Map-reduce partition columns: _col0 (type: tinyint)
-                        Statistics: Num rows: 12288 Data size: 1976458 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.3
-            Execution mode: llap
+                        Statistics: Num rows: 24576 Data size: 3756114 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0)
-                keys: KEY._col0 (type: tinyint)
+                keys: KEY._col0 (type: tinyint), KEY._col1 (type: string), KEY._col2 (type: string), KEY._col3 (type: bigint)
                 mode: mergepartial
-                outputColumnNames: _col0, _col1, _col2
-                Statistics: Num rows: 131 Data size: 2492 Basic stats: COMPLETE Column stats: COMPLETE
-                Limit
-                  Number of rows: 20
-                  Statistics: Num rows: 20 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE
-                  File Output Operator
-                    compressed: false
-                    Statistics: Num rows: 20 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE
-                    table:
-                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                outputColumnNames: _col0, _col1, _col2, _col3
+                Statistics: Num rows: 24576 Data size: 3756114 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: CASE WHEN (((_col3 = 1L) and _col1 is not null)) THEN (1) ELSE (null) END (type: int), CASE WHEN (((_col3 = 2L) and _col2 is not null)) THEN (1) ELSE (null) END (type: int), _col0 (type: tinyint)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 24576 Data size: 3756114 Basic stats: COMPLETE Column stats: COMPLETE
+                  Group By Operator
+                    aggregations: count(_col0), count(_col1)
+                    keys: _col2 (type: tinyint)
+                    mode: complete
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 131 Data size: 2492 Basic stats: COMPLETE Column stats: COMPLETE
+                    Limit
+                      Number of rows: 20
+                      Statistics: Num rows: 20 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 20 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE
+                        table:
+                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
diff --git a/ql/src/test/results/clientpositive/llap/limit_pushdown3.q.out b/ql/src/test/results/clientpositive/llap/limit_pushdown3.q.out
index 2d2b6eb..ebf6567 100644
--- a/ql/src/test/results/clientpositive/llap/limit_pushdown3.q.out
+++ b/ql/src/test/results/clientpositive/llap/limit_pushdown3.q.out
@@ -804,38 +804,45 @@ STAGE PLANS:
                   Statistics: Num rows: 12288 Data size: 1779850 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: ctinyint (type: tinyint), cstring1 (type: string), cstring2 (type: string)
-                    outputColumnNames: ctinyint, cstring1, cstring2
+                    outputColumnNames: _col0, _col1, _col2
                     Statistics: Num rows: 12288 Data size: 1779850 Basic stats: COMPLETE Column stats: COMPLETE
                     Group By Operator
-                      aggregations: count(DISTINCT cstring1), count(DISTINCT cstring2)
-                      keys: ctinyint (type: tinyint), cstring1 (type: string), cstring2 (type: string)
+                      keys: _col0 (type: tinyint), _col1 (type: string), _col2 (type: string), 0L (type: bigint)
                       minReductionHashAggr: 0.0
                       mode: hash
-                      outputColumnNames: _col0, _col1, _col2, _col3, _col4
-                      Statistics: Num rows: 12288 Data size: 1976458 Basic stats: COMPLETE Column stats: COMPLETE
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 24576 Data size: 3756114 Basic stats: COMPLETE Column stats: COMPLETE
                       Reduce Output Operator
-                        key expressions: _col0 (type: tinyint), _col1 (type: string), _col2 (type: string)
-                        sort order: +++
+                        key expressions: _col0 (type: tinyint), _col1 (type: string), _col2 (type: string), _col3 (type: bigint)
+                        sort order: ++++
                         Map-reduce partition columns: _col0 (type: tinyint)
-                        Statistics: Num rows: 12288 Data size: 1976458 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.3
-            Execution mode: llap
+                        Statistics: Num rows: 24576 Data size: 3756114 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0)
-                keys: KEY._col0 (type: tinyint)
+                keys: KEY._col0 (type: tinyint), KEY._col1 (type: string), KEY._col2 (type: string), KEY._col3 (type: bigint)
                 mode: mergepartial
-                outputColumnNames: _col0, _col1, _col2
-                Statistics: Num rows: 131 Data size: 2492 Basic stats: COMPLETE Column stats: COMPLETE
-                Reduce Output Operator
-                  key expressions: _col0 (type: tinyint)
-                  sort order: +
-                  Statistics: Num rows: 131 Data size: 2492 Basic stats: COMPLETE Column stats: COMPLETE
-                  TopN Hash Memory Usage: 0.3
-                  value expressions: _col1 (type: bigint), _col2 (type: bigint)
+                outputColumnNames: _col0, _col1, _col2, _col3
+                Statistics: Num rows: 24576 Data size: 3756114 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: CASE WHEN (((_col3 = 1L) and _col1 is not null)) THEN (1) ELSE (null) END (type: int), CASE WHEN (((_col3 = 2L) and _col2 is not null)) THEN (1) ELSE (null) END (type: int), _col0 (type: tinyint)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 24576 Data size: 3756114 Basic stats: COMPLETE Column stats: COMPLETE
+                  Group By Operator
+                    aggregations: count(_col0), count(_col1)
+                    keys: _col2 (type: tinyint)
+                    mode: complete
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 131 Data size: 2492 Basic stats: COMPLETE Column stats: COMPLETE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: tinyint)
+                      sort order: +
+                      Statistics: Num rows: 131 Data size: 2492 Basic stats: COMPLETE Column stats: COMPLETE
+                      TopN Hash Memory Usage: 0.3
+                      value expressions: _col1 (type: bigint), _col2 (type: bigint)
         Reducer 3 
             Execution mode: vectorized, llap
             Reduce Operator Tree:
diff --git a/ql/src/test/results/clientpositive/llap/multigroupbydistinct.q.out b/ql/src/test/results/clientpositive/llap/multigroupbydistinct.q.out
new file mode 100644
index 0000000..6710ab2
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/multigroupbydistinct.q.out
@@ -0,0 +1,271 @@
+PREHOOK: query: create table tabw4intcols (x integer, y integer, z integer, a integer)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tabw4intcols
+POSTHOOK: query: create table tabw4intcols (x integer, y integer, z integer, a integer)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tabw4intcols
+PREHOOK: query: insert into tabw4intcols values (1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3), (4, 4, 4, 4),
+  (1, 2, 1, 2), (2, 3, 2, 3), (3, 4, 3, 4), (4, 1, 4, 1),
+  (1, 2, 3, 4), (4, 3, 2, 1), (1, 2, 3, 4), (4, 3, 2, 1)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tabw4intcols
+POSTHOOK: query: insert into tabw4intcols values (1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3), (4, 4, 4, 4),
+  (1, 2, 1, 2), (2, 3, 2, 3), (3, 4, 3, 4), (4, 1, 4, 1),
+  (1, 2, 3, 4), (4, 3, 2, 1), (1, 2, 3, 4), (4, 3, 2, 1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tabw4intcols
+POSTHOOK: Lineage: tabw4intcols.a SCRIPT []
+POSTHOOK: Lineage: tabw4intcols.x SCRIPT []
+POSTHOOK: Lineage: tabw4intcols.y SCRIPT []
+POSTHOOK: Lineage: tabw4intcols.z SCRIPT []
+PREHOOK: query: explain cbo
+select z, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+POSTHOOK: query: explain cbo
+select z, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+CBO PLAN:
+HiveAggregate(group=[{2}], agg#0=[count($0)], agg#1=[count($1)])
+  HiveProject($f0=[CASE(AND(=($3, 1), IS NOT NULL($1)), 1, null:INTEGER)], $f1=[CASE(AND(=($3, 2), IS NOT NULL($2)), 1, null:INTEGER)], $f2=[$0])
+    HiveAggregate(group=[{0, 1, 2}], groups=[[{0, 1}, {0, 2}]], GROUPING__ID=[GROUPING__ID()])
+      HiveProject($f0=[$2], $f1=[$1], $f2=[$3])
+        HiveTableScan(table=[[default, tabw4intcols]], table:alias=[tabw4intcols])
+
+PREHOOK: query: select z, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+POSTHOOK: query: select z, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+1	2	2
+2	2	3
+3	3	2
+4	2	2
+PREHOOK: query: explain cbo
+select z, x, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z, x
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+POSTHOOK: query: explain cbo
+select z, x, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z, x
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+CBO PLAN:
+HiveAggregate(group=[{2, 3}], agg#0=[count($0)], agg#1=[count($1)])
+  HiveProject($f0=[CASE(AND(=($4, 1), IS NOT NULL($2)), 1, null:INTEGER)], $f1=[CASE(AND(=($4, 2), IS NOT NULL($3)), 1, null:INTEGER)], $f2=[$0], $f3=[$1])
+    HiveAggregate(group=[{0, 1, 2, 3}], groups=[[{0, 1, 2}, {0, 1, 3}]], GROUPING__ID=[GROUPING__ID()])
+      HiveProject($f0=[$2], $f1=[$0], $f2=[$1], $f3=[$3])
+        HiveTableScan(table=[[default, tabw4intcols]], table:alias=[tabw4intcols])
+
+PREHOOK: query: select z, x, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z, x
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+POSTHOOK: query: select z, x, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z, x
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+1	1	2	2
+2	2	2	2
+2	4	1	1
+3	1	1	1
+3	3	2	2
+4	4	2	2
+PREHOOK: query: explain cbo
+select x, z, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z, x
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+POSTHOOK: query: explain cbo
+select x, z, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z, x
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+CBO PLAN:
+HiveProject(x=[$1], z=[$0], _o__c2=[$2], _o__c3=[$3])
+  HiveAggregate(group=[{2, 3}], agg#0=[count($0)], agg#1=[count($1)])
+    HiveProject($f0=[CASE(AND(=($4, 1), IS NOT NULL($2)), 1, null:INTEGER)], $f1=[CASE(AND(=($4, 2), IS NOT NULL($3)), 1, null:INTEGER)], $f2=[$0], $f3=[$1])
+      HiveAggregate(group=[{0, 1, 2, 3}], groups=[[{0, 1, 2}, {0, 1, 3}]], GROUPING__ID=[GROUPING__ID()])
+        HiveProject($f0=[$2], $f1=[$0], $f2=[$1], $f3=[$3])
+          HiveTableScan(table=[[default, tabw4intcols]], table:alias=[tabw4intcols])
+
+PREHOOK: query: select x, z, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z, x
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+POSTHOOK: query: select x, z, count(distinct y), count(distinct a)
+from tabw4intcols
+group by z, x
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+1	1	2	2
+2	2	2	2
+4	2	1	1
+1	3	1	1
+3	3	2	2
+4	4	2	2
+PREHOOK: query: explain cbo
+select x, a, y, count(distinct z)
+from tabw4intcols
+group by a, x, y
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+POSTHOOK: query: explain cbo
+select x, a, y, count(distinct z)
+from tabw4intcols
+group by a, x, y
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+CBO PLAN:
+HiveProject(x=[$0], a=[$2], y=[$1], _o__c3=[$3])
+  HiveAggregate(group=[{0, 1, 3}], agg#0=[count($2)])
+    HiveProject(x=[$0], y=[$1], z=[$2], a=[$3])
+      HiveAggregate(group=[{0, 1, 2, 3}])
+        HiveTableScan(table=[[default, tabw4intcols]], table:alias=[tabw4intcols])
+
+PREHOOK: query: select x, a, y, count(distinct z)
+from tabw4intcols
+group by a, x, y
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+POSTHOOK: query: select x, a, y, count(distinct z)
+from tabw4intcols
+group by a, x, y
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+1	1	1	1
+1	2	2	1
+1	4	2	1
+2	2	2	1
+2	3	3	1
+3	3	3	1
+3	4	4	1
+4	1	1	1
+4	1	3	1
+4	4	4	1
+PREHOOK: query: explain cbo
+select x, count(distinct y), z, count(distinct a)
+from tabw4intcols
+group by z, x
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+POSTHOOK: query: explain cbo
+select x, count(distinct y), z, count(distinct a)
+from tabw4intcols
+group by z, x
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+CBO PLAN:
+HiveProject(x=[$1], _o__c1=[$2], z=[$0], _o__c3=[$3])
+  HiveAggregate(group=[{2, 3}], agg#0=[count($0)], agg#1=[count($1)])
+    HiveProject($f0=[CASE(AND(=($4, 1), IS NOT NULL($2)), 1, null:INTEGER)], $f1=[CASE(AND(=($4, 2), IS NOT NULL($3)), 1, null:INTEGER)], $f2=[$0], $f3=[$1])
+      HiveAggregate(group=[{0, 1, 2, 3}], groups=[[{0, 1, 2}, {0, 1, 3}]], GROUPING__ID=[GROUPING__ID()])
+        HiveProject($f0=[$2], $f1=[$0], $f2=[$1], $f3=[$3])
+          HiveTableScan(table=[[default, tabw4intcols]], table:alias=[tabw4intcols])
+
+PREHOOK: query: select x, count(distinct y), z, count(distinct a)
+from tabw4intcols
+group by z, x
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+POSTHOOK: query: select x, count(distinct y), z, count(distinct a)
+from tabw4intcols
+group by z, x
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+1	2	1	2
+2	2	2	2
+4	1	2	1
+1	1	3	1
+3	2	3	2
+4	2	4	2
+PREHOOK: query: explain cbo
+select count(distinct y), x, z, count(distinct a)
+from tabw4intcols
+group by z, x
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+POSTHOOK: query: explain cbo
+select count(distinct y), x, z, count(distinct a)
+from tabw4intcols
+group by z, x
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+CBO PLAN:
+HiveProject(_o__c0=[$2], x=[$1], z=[$0], _o__c3=[$3])
+  HiveAggregate(group=[{2, 3}], agg#0=[count($0)], agg#1=[count($1)])
+    HiveProject($f0=[CASE(AND(=($4, 1), IS NOT NULL($2)), 1, null:INTEGER)], $f1=[CASE(AND(=($4, 2), IS NOT NULL($3)), 1, null:INTEGER)], $f2=[$0], $f3=[$1])
+      HiveAggregate(group=[{0, 1, 2, 3}], groups=[[{0, 1, 2}, {0, 1, 3}]], GROUPING__ID=[GROUPING__ID()])
+        HiveProject($f0=[$2], $f1=[$0], $f2=[$1], $f3=[$3])
+          HiveTableScan(table=[[default, tabw4intcols]], table:alias=[tabw4intcols])
+
+PREHOOK: query: select count(distinct y), x, z, count(distinct a)
+from tabw4intcols
+group by z, x
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct y), x, z, count(distinct a)
+from tabw4intcols
+group by z, x
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tabw4intcols
+#### A masked pattern was here ####
+2	1	1	2
+2	2	2	2
+1	4	2	1
+1	1	3	1
+2	3	3	2
+2	4	4	2
+PREHOOK: query: drop table tabw4intcols
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@tabw4intcols
+PREHOOK: Output: default@tabw4intcols
+POSTHOOK: query: drop table tabw4intcols
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@tabw4intcols
+POSTHOOK: Output: default@tabw4intcols
diff --git a/ql/src/test/results/clientpositive/llap/offset_limit_ppd_optimizer.q.out b/ql/src/test/results/clientpositive/llap/offset_limit_ppd_optimizer.q.out
index 1e0aa93..34c572e 100644
--- a/ql/src/test/results/clientpositive/llap/offset_limit_ppd_optimizer.q.out
+++ b/ql/src/test/results/clientpositive/llap/offset_limit_ppd_optimizer.q.out
@@ -741,43 +741,50 @@ STAGE PLANS:
                   Statistics: Num rows: 12288 Data size: 1779850 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: ctinyint (type: tinyint), cstring1 (type: string), cstring2 (type: string)
-                    outputColumnNames: ctinyint, cstring1, cstring2
+                    outputColumnNames: _col0, _col1, _col2
                     Statistics: Num rows: 12288 Data size: 1779850 Basic stats: COMPLETE Column stats: COMPLETE
                     Group By Operator
-                      aggregations: count(DISTINCT cstring1), count(DISTINCT cstring2)
-                      keys: ctinyint (type: tinyint), cstring1 (type: string), cstring2 (type: string)
+                      keys: _col0 (type: tinyint), _col1 (type: string), _col2 (type: string), 0L (type: bigint)
                       minReductionHashAggr: 0.0
                       mode: hash
-                      outputColumnNames: _col0, _col1, _col2, _col3, _col4
-                      Statistics: Num rows: 12288 Data size: 1976458 Basic stats: COMPLETE Column stats: COMPLETE
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 24576 Data size: 3756114 Basic stats: COMPLETE Column stats: COMPLETE
                       Reduce Output Operator
-                        key expressions: _col0 (type: tinyint), _col1 (type: string), _col2 (type: string)
-                        sort order: +++
+                        key expressions: _col0 (type: tinyint), _col1 (type: string), _col2 (type: string), _col3 (type: bigint)
+                        sort order: ++++
                         Map-reduce partition columns: _col0 (type: tinyint)
-                        Statistics: Num rows: 12288 Data size: 1976458 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.3
+                        Statistics: Num rows: 24576 Data size: 3756114 Basic stats: COMPLETE Column stats: COMPLETE
             Execution mode: llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0)
-                keys: KEY._col0 (type: tinyint)
+                keys: KEY._col0 (type: tinyint), KEY._col1 (type: string), KEY._col2 (type: string), KEY._col3 (type: bigint)
                 mode: mergepartial
-                outputColumnNames: _col0, _col1, _col2
-                Statistics: Num rows: 131 Data size: 2492 Basic stats: COMPLETE Column stats: COMPLETE
-                Limit
-                  Number of rows: 20
-                  Offset of rows: 10
-                  Statistics: Num rows: 20 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE
-                  File Output Operator
-                    compressed: false
-                    Statistics: Num rows: 20 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE
-                    table:
-                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                outputColumnNames: _col0, _col1, _col2, _col3
+                Statistics: Num rows: 24576 Data size: 3756114 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: CASE WHEN (((_col3 = 1L) and _col1 is not null)) THEN (1) ELSE (null) END (type: int), CASE WHEN (((_col3 = 2L) and _col2 is not null)) THEN (1) ELSE (null) END (type: int), _col0 (type: tinyint)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 24576 Data size: 3756114 Basic stats: COMPLETE Column stats: COMPLETE
+                  Group By Operator
+                    aggregations: count(_col0), count(_col1)
+                    keys: _col2 (type: tinyint)
+                    mode: complete
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 131 Data size: 2492 Basic stats: COMPLETE Column stats: COMPLETE
+                    Limit
+                      Number of rows: 20
+                      Offset of rows: 10
+                      Statistics: Num rows: 20 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 20 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE
+                        table:
+                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
diff --git a/ql/src/test/results/clientpositive/llap/reduce_deduplicate_distinct.q.out b/ql/src/test/results/clientpositive/llap/reduce_deduplicate_distinct.q.out
index 2bacc42..bd96597 100644
--- a/ql/src/test/results/clientpositive/llap/reduce_deduplicate_distinct.q.out
+++ b/ql/src/test/results/clientpositive/llap/reduce_deduplicate_distinct.q.out
@@ -48,38 +48,46 @@ STAGE PLANS:
                   Statistics: Num rows: 5 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: id (type: int), key (type: int), name (type: int)
-                    outputColumnNames: id, key, name
+                    outputColumnNames: _col0, _col1, _col2
                     Statistics: Num rows: 5 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE
                     Group By Operator
-                      aggregations: count(DISTINCT key), count(DISTINCT name)
-                      keys: id (type: int), key (type: int), name (type: int)
-                      minReductionHashAggr: 0.6
+                      keys: _col0 (type: int), _col1 (type: int), _col2 (type: int), 0L (type: bigint)
+                      minReductionHashAggr: 0.0
                       mode: hash
-                      outputColumnNames: _col0, _col1, _col2, _col3, _col4
-                      Statistics: Num rows: 2 Data size: 56 Basic stats: COMPLETE Column stats: COMPLETE
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 5 Data size: 100 Basic stats: COMPLETE Column stats: COMPLETE
                       Reduce Output Operator
-                        key expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int)
-                        sort order: +++
+                        key expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int), _col3 (type: bigint)
+                        sort order: ++++
                         Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 2 Data size: 56 Basic stats: COMPLETE Column stats: COMPLETE
-            Execution mode: llap
+                        Statistics: Num rows: 5 Data size: 100 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: vectorized, llap
             LLAP IO: no inputs
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0)
-                keys: KEY._col0 (type: int)
+                keys: KEY._col0 (type: int), KEY._col1 (type: int), KEY._col2 (type: int), KEY._col3 (type: bigint)
                 mode: mergepartial
-                outputColumnNames: _col0, _col1, _col2
-                Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
-                  table:
-                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                outputColumnNames: _col0, _col1, _col2, _col3
+                Statistics: Num rows: 5 Data size: 100 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: CASE WHEN (((_col3 = 1L) and _col1 is not null)) THEN (1) ELSE (null) END (type: int), CASE WHEN (((_col3 = 2L) and _col2 is not null)) THEN (1) ELSE (null) END (type: int), _col0 (type: int)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 5 Data size: 100 Basic stats: COMPLETE Column stats: COMPLETE
+                  Group By Operator
+                    aggregations: count(_col0), count(_col1)
+                    keys: _col2 (type: int)
+                    mode: complete
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                      table:
+                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -148,7 +156,7 @@ STAGE PLANS:
             Execution mode: vectorized, llap
             LLAP IO: no inputs
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 keys: KEY._col0 (type: int), KEY._col1 (type: int), KEY._col2 (type: int)
@@ -156,33 +164,41 @@ STAGE PLANS:
                 outputColumnNames: _col0, _col1, _col2
                 Statistics: Num rows: 2 Data size: 24 Basic stats: COMPLETE Column stats: COMPLETE
                 Group By Operator
-                  aggregations: count(DISTINCT _col1), count(DISTINCT _col2)
-                  keys: _col0 (type: int), _col1 (type: int), _col2 (type: int)
-                  minReductionHashAggr: 0.5
+                  keys: _col0 (type: int), _col1 (type: int), _col2 (type: int), 0L (type: bigint)
+                  minReductionHashAggr: 0.0
                   mode: hash
-                  outputColumnNames: _col0, _col1, _col2, _col3, _col4
-                  Statistics: Num rows: 1 Data size: 28 Basic stats: COMPLETE Column stats: COMPLETE
+                  outputColumnNames: _col0, _col1, _col2, _col3
+                  Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
                   Reduce Output Operator
-                    key expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int)
-                    sort order: +++
+                    key expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int), _col3 (type: bigint)
+                    sort order: ++++
                     Map-reduce partition columns: _col0 (type: int)
-                    Statistics: Num rows: 1 Data size: 28 Basic stats: COMPLETE Column stats: COMPLETE
+                    Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
         Reducer 3 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0)
-                keys: KEY._col0 (type: int)
+                keys: KEY._col0 (type: int), KEY._col1 (type: int), KEY._col2 (type: int), KEY._col3 (type: bigint)
                 mode: mergepartial
-                outputColumnNames: _col0, _col1, _col2
-                Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
-                  table:
-                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                outputColumnNames: _col0, _col1, _col2, _col3
+                Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: CASE WHEN (((_col3 = 1L) and _col1 is not null)) THEN (1) ELSE (null) END (type: int), CASE WHEN (((_col3 = 2L) and _col2 is not null)) THEN (1) ELSE (null) END (type: int), _col0 (type: int)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
+                  Group By Operator
+                    aggregations: count(_col0), count(_col1)
+                    keys: _col2 (type: int)
+                    mode: complete
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                      table:
+                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -238,7 +254,7 @@ STAGE PLANS:
                     outputColumnNames: id, key, name
                     Statistics: Num rows: 5 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE
                     Group By Operator
-                      keys: id (type: int), key (type: int), name (type: int)
+                      keys: id (type: int), name (type: int), key (type: int)
                       minReductionHashAggr: 0.6
                       mode: hash
                       outputColumnNames: _col0, _col1, _col2
@@ -251,7 +267,7 @@ STAGE PLANS:
             Execution mode: vectorized, llap
             LLAP IO: no inputs
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 keys: KEY._col0 (type: int), KEY._col1 (type: int), KEY._col2 (type: int)
@@ -259,33 +275,41 @@ STAGE PLANS:
                 outputColumnNames: _col0, _col1, _col2
                 Statistics: Num rows: 2 Data size: 24 Basic stats: COMPLETE Column stats: COMPLETE
                 Group By Operator
-                  aggregations: count(DISTINCT _col2), count(DISTINCT _col1)
-                  keys: _col0 (type: int), _col2 (type: int), _col1 (type: int)
-                  minReductionHashAggr: 0.5
+                  keys: _col0 (type: int), _col1 (type: int), _col2 (type: int), 0L (type: bigint)
+                  minReductionHashAggr: 0.0
                   mode: hash
-                  outputColumnNames: _col0, _col1, _col2, _col3, _col4
-                  Statistics: Num rows: 1 Data size: 28 Basic stats: COMPLETE Column stats: COMPLETE
+                  outputColumnNames: _col0, _col1, _col2, _col3
+                  Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
                   Reduce Output Operator
-                    key expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int)
-                    sort order: +++
+                    key expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int), _col3 (type: bigint)
+                    sort order: ++++
                     Map-reduce partition columns: _col0 (type: int)
-                    Statistics: Num rows: 1 Data size: 28 Basic stats: COMPLETE Column stats: COMPLETE
+                    Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
         Reducer 3 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0)
-                keys: KEY._col0 (type: int)
+                keys: KEY._col0 (type: int), KEY._col1 (type: int), KEY._col2 (type: int), KEY._col3 (type: bigint)
                 mode: mergepartial
-                outputColumnNames: _col0, _col1, _col2
-                Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
-                  table:
-                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                outputColumnNames: _col0, _col1, _col2, _col3
+                Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: CASE WHEN (((_col3 = 1L) and _col1 is not null)) THEN (1) ELSE (null) END (type: int), CASE WHEN (((_col3 = 2L) and _col2 is not null)) THEN (1) ELSE (null) END (type: int), _col0 (type: int)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
+                  Group By Operator
+                    aggregations: count(_col0), count(_col1)
+                    keys: _col2 (type: int)
+                    mode: complete
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                      table:
+                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -329,6 +353,7 @@ STAGE PLANS:
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -354,7 +379,7 @@ STAGE PLANS:
             Execution mode: vectorized, llap
             LLAP IO: no inputs
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 keys: KEY._col0 (type: int), KEY._col1 (type: int), KEY._col2 (type: int)
@@ -362,22 +387,46 @@ STAGE PLANS:
                 outputColumnNames: _col0, _col1, _col2
                 Statistics: Num rows: 2 Data size: 24 Basic stats: COMPLETE Column stats: COMPLETE
                 Group By Operator
-                  aggregations: count(DISTINCT _col1), count(DISTINCT _col2)
-                  keys: _col0 (type: int), _col1 (type: int), _col2 (type: int)
-                  minReductionHashAggr: 0.5
+                  keys: _col0 (type: int), _col1 (type: int), _col2 (type: int), 0L (type: bigint)
+                  minReductionHashAggr: 0.0
                   mode: hash
-                  outputColumnNames: _col0, _col1, _col2, _col3, _col4
-                  Statistics: Num rows: 1 Data size: 28 Basic stats: COMPLETE Column stats: COMPLETE
+                  outputColumnNames: _col0, _col1, _col2, _col3
+                  Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
                   Reduce Output Operator
-                    key expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int)
-                    sort order: +++
-                    Map-reduce partition columns: _col0 (type: int)
-                    Statistics: Num rows: 1 Data size: 28 Basic stats: COMPLETE Column stats: COMPLETE
+                    key expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int), _col3 (type: bigint)
+                    sort order: ++++
+                    Map-reduce partition columns: _col0 (type: int), _col1 (type: int), _col2 (type: int), _col3 (type: bigint)
+                    Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
         Reducer 3 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0)
+                keys: KEY._col0 (type: int), KEY._col1 (type: int), KEY._col2 (type: int), KEY._col3 (type: bigint)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1, _col2, _col3
+                Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: CASE WHEN (((_col3 = 1L) and _col1 is not null)) THEN (1) ELSE (null) END (type: int), CASE WHEN (((_col3 = 2L) and _col2 is not null)) THEN (1) ELSE (null) END (type: int), _col0 (type: int)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
+                  Group By Operator
+                    aggregations: count(_col0), count(_col1)
+                    keys: _col2 (type: int)
+                    minReductionHashAggr: 0.5
+                    mode: hash
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: int)
+                      sort order: +
+                      Map-reduce partition columns: _col0 (type: int)
+                      Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                      value expressions: _col1 (type: bigint), _col2 (type: bigint)
+        Reducer 4 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0), count(VALUE._col1)
                 keys: KEY._col0 (type: int)
                 mode: mergepartial
                 outputColumnNames: _col0, _col1, _col2
@@ -432,6 +481,7 @@ STAGE PLANS:
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -444,7 +494,7 @@ STAGE PLANS:
                     outputColumnNames: id, key, name
                     Statistics: Num rows: 5 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE
                     Group By Operator
-                      keys: id (type: int), key (type: int), name (type: int)
+                      keys: id (type: int), name (type: int), key (type: int)
                       minReductionHashAggr: 0.6
                       mode: hash
                       outputColumnNames: _col0, _col1, _col2
@@ -457,7 +507,7 @@ STAGE PLANS:
             Execution mode: vectorized, llap
             LLAP IO: no inputs
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 keys: KEY._col0 (type: int), KEY._col1 (type: int), KEY._col2 (type: int)
@@ -465,22 +515,46 @@ STAGE PLANS:
                 outputColumnNames: _col0, _col1, _col2
                 Statistics: Num rows: 2 Data size: 24 Basic stats: COMPLETE Column stats: COMPLETE
                 Group By Operator
-                  aggregations: count(DISTINCT _col2), count(DISTINCT _col1)
-                  keys: _col0 (type: int), _col2 (type: int), _col1 (type: int)
-                  minReductionHashAggr: 0.5
+                  keys: _col0 (type: int), _col1 (type: int), _col2 (type: int), 0L (type: bigint)
+                  minReductionHashAggr: 0.0
                   mode: hash
-                  outputColumnNames: _col0, _col1, _col2, _col3, _col4
-                  Statistics: Num rows: 1 Data size: 28 Basic stats: COMPLETE Column stats: COMPLETE
+                  outputColumnNames: _col0, _col1, _col2, _col3
+                  Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
                   Reduce Output Operator
-                    key expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int)
-                    sort order: +++
-                    Map-reduce partition columns: _col0 (type: int)
-                    Statistics: Num rows: 1 Data size: 28 Basic stats: COMPLETE Column stats: COMPLETE
+                    key expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int), _col3 (type: bigint)
+                    sort order: ++++
+                    Map-reduce partition columns: _col0 (type: int), _col1 (type: int), _col2 (type: int), _col3 (type: bigint)
+                    Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
         Reducer 3 
-            Execution mode: llap
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: int), KEY._col1 (type: int), KEY._col2 (type: int), KEY._col3 (type: bigint)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1, _col2, _col3
+                Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: CASE WHEN (((_col3 = 1L) and _col1 is not null)) THEN (1) ELSE (null) END (type: int), CASE WHEN (((_col3 = 2L) and _col2 is not null)) THEN (1) ELSE (null) END (type: int), _col0 (type: int)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 2 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE
+                  Group By Operator
+                    aggregations: count(_col0), count(_col1)
+                    keys: _col2 (type: int)
+                    minReductionHashAggr: 0.5
+                    mode: hash
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: int)
+                      sort order: +
+                      Map-reduce partition columns: _col0 (type: int)
+                      Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                      value expressions: _col1 (type: bigint), _col2 (type: bigint)
+        Reducer 4 
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0)
+                aggregations: count(VALUE._col0), count(VALUE._col1)
                 keys: KEY._col0 (type: int)
                 mode: mergepartial
                 outputColumnNames: _col0, _col1, _col2
diff --git a/ql/src/test/results/clientpositive/spark/auto_join18_multi_distinct.q.out b/ql/src/test/results/clientpositive/spark/auto_join18_multi_distinct.q.out
index 042e4fe..df7f9bc 100644
--- a/ql/src/test/results/clientpositive/spark/auto_join18_multi_distinct.q.out
+++ b/ql/src/test/results/clientpositive/spark/auto_join18_multi_distinct.q.out
@@ -76,20 +76,20 @@ STAGE PLANS:
                   Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: key (type: string), value (type: string)
-                    outputColumnNames: key, value
+                    outputColumnNames: _col0, _col1
                     Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
                     Group By Operator
-                      aggregations: count(DISTINCT value), count(DISTINCT key)
-                      keys: key (type: string), value (type: string)
+                      keys: _col0 (type: string), _col1 (type: string), 0L (type: bigint)
                       minReductionHashAggr: 0.99
                       mode: hash
-                      outputColumnNames: _col0, _col1, _col2, _col3
-                      Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                      outputColumnNames: _col0, _col1, _col2
+                      Statistics: Num rows: 50 Data size: 382 Basic stats: COMPLETE Column stats: NONE
                       Reduce Output Operator
-                        key expressions: _col0 (type: string), _col1 (type: string)
-                        sort order: ++
+                        key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: bigint)
+                        sort order: +++
                         Map-reduce partition columns: _col0 (type: string)
-                        Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                        Statistics: Num rows: 50 Data size: 382 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: vectorized
         Reducer 2 
             Execution mode: vectorized
             Reduce Operator Tree:
@@ -145,19 +145,29 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 6 
+            Execution mode: vectorized
             Reduce Operator Tree:
               Group By Operator
-                aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0)
-                keys: KEY._col0 (type: string)
+                keys: KEY._col0 (type: string), KEY._col1 (type: string), KEY._col2 (type: bigint)
                 mode: mergepartial
                 outputColumnNames: _col0, _col1, _col2
-                Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: _col0 (type: string)
-                  sort order: +
-                  Map-reduce partition columns: _col0 (type: string)
-                  Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
-                  value expressions: _col1 (type: bigint), _col2 (type: bigint)
+                Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: CASE WHEN (((_col2 = 0L) and _col1 is not null)) THEN (1) ELSE (null) END (type: int), CASE WHEN (((_col2 = 1L) and _col0 is not null)) THEN (1) ELSE (null) END (type: int), _col0 (type: string)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                  Group By Operator
+                    aggregations: count(_col0), count(_col1)
+                    keys: _col2 (type: string)
+                    mode: complete
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: string)
+                      sort order: +
+                      Map-reduce partition columns: _col0 (type: string)
+                      Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col1 (type: bigint), _col2 (type: bigint)
 
   Stage: Stage-0
     Fetch Operator
diff --git a/ql/src/test/results/clientpositive/spark/join18_multi_distinct.q.out b/ql/src/test/results/clientpositive/spark/join18_multi_distinct.q.out
index 4347144..fe8d3f1 100644
--- a/ql/src/test/results/clientpositive/spark/join18_multi_distinct.q.out
+++ b/ql/src/test/results/clientpositive/spark/join18_multi_distinct.q.out
@@ -75,20 +75,20 @@ STAGE PLANS:
                   Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: key (type: string), value (type: string)
-                    outputColumnNames: key, value
+                    outputColumnNames: _col0, _col1
                     Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
                     Group By Operator
-                      aggregations: count(DISTINCT value), count(DISTINCT key)
-                      keys: key (type: string), value (type: string)
+                      keys: _col0 (type: string), _col1 (type: string), 0L (type: bigint)
                       minReductionHashAggr: 0.99
                       mode: hash
-                      outputColumnNames: _col0, _col1, _col2, _col3
-                      Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                      outputColumnNames: _col0, _col1, _col2
+                      Statistics: Num rows: 50 Data size: 382 Basic stats: COMPLETE Column stats: NONE
                       Reduce Output Operator
-                        key expressions: _col0 (type: string), _col1 (type: string)
-                        sort order: ++
+                        key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: bigint)
+                        sort order: +++
                         Map-reduce partition columns: _col0 (type: string)
-                        Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                        Statistics: Num rows: 50 Data size: 382 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: vectorized
         Reducer 2 
             Execution mode: vectorized
             Reduce Operator Tree:
@@ -122,19 +122,29 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 5 
+            Execution mode: vectorized
             Reduce Operator Tree:
               Group By Operator
-                aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0)
-                keys: KEY._col0 (type: string)
+                keys: KEY._col0 (type: string), KEY._col1 (type: string), KEY._col2 (type: bigint)
                 mode: mergepartial
                 outputColumnNames: _col0, _col1, _col2
-                Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: _col0 (type: string)
-                  sort order: +
-                  Map-reduce partition columns: _col0 (type: string)
-                  Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
-                  value expressions: _col1 (type: bigint), _col2 (type: bigint)
+                Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: CASE WHEN (((_col2 = 0L) and _col1 is not null)) THEN (1) ELSE (null) END (type: int), CASE WHEN (((_col2 = 1L) and _col0 is not null)) THEN (1) ELSE (null) END (type: int), _col0 (type: string)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                  Group By Operator
+                    aggregations: count(_col0), count(_col1)
+                    keys: _col2 (type: string)
+                    mode: complete
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: string)
+                      sort order: +
+                      Map-reduce partition columns: _col0 (type: string)
+                      Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col1 (type: bigint), _col2 (type: bigint)
 
   Stage: Stage-0
     Fetch Operator
diff --git a/ql/src/test/results/clientpositive/spark/limit_pushdown.q.out b/ql/src/test/results/clientpositive/spark/limit_pushdown.q.out
index 693198e..8336176 100644
--- a/ql/src/test/results/clientpositive/spark/limit_pushdown.q.out
+++ b/ql/src/test/results/clientpositive/spark/limit_pushdown.q.out
@@ -704,39 +704,48 @@ STAGE PLANS:
                   Statistics: Num rows: 12288 Data size: 2907994 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: ctinyint (type: tinyint), cstring1 (type: string), cstring2 (type: string)
-                    outputColumnNames: ctinyint, cstring1, cstring2
+                    outputColumnNames: _col0, _col1, _col2
                     Statistics: Num rows: 12288 Data size: 2907994 Basic stats: COMPLETE Column stats: NONE
                     Group By Operator
-                      aggregations: count(DISTINCT cstring1), count(DISTINCT cstring2)
-                      keys: ctinyint (type: tinyint), cstring1 (type: string), cstring2 (type: string)
+                      keys: _col0 (type: tinyint), _col1 (type: string), _col2 (type: string), 0L (type: bigint)
                       minReductionHashAggr: 0.99
                       mode: hash
-                      outputColumnNames: _col0, _col1, _col2, _col3, _col4
-                      Statistics: Num rows: 12288 Data size: 2907994 Basic stats: COMPLETE Column stats: NONE
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 24576 Data size: 5815988 Basic stats: COMPLETE Column stats: NONE
                       Reduce Output Operator
-                        key expressions: _col0 (type: tinyint), _col1 (type: string), _col2 (type: string)
-                        sort order: +++
+                        key expressions: _col0 (type: tinyint), _col1 (type: string), _col2 (type: string), _col3 (type: bigint)
+                        sort order: ++++
                         Map-reduce partition columns: _col0 (type: tinyint)
-                        Statistics: Num rows: 12288 Data size: 2907994 Basic stats: COMPLETE Column stats: NONE
-                        TopN Hash Memory Usage: 0.3
+                        Statistics: Num rows: 24576 Data size: 5815988 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: vectorized
         Reducer 2 
+            Execution mode: vectorized
             Reduce Operator Tree:
               Group By Operator
-                aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0)
-                keys: KEY._col0 (type: tinyint)
+                keys: KEY._col0 (type: tinyint), KEY._col1 (type: string), KEY._col2 (type: string), KEY._col3 (type: bigint)
                 mode: mergepartial
-                outputColumnNames: _col0, _col1, _col2
-                Statistics: Num rows: 6144 Data size: 1453997 Basic stats: COMPLETE Column stats: NONE
-                Limit
-                  Number of rows: 20
-                  Statistics: Num rows: 20 Data size: 4720 Basic stats: COMPLETE Column stats: NONE
-                  File Output Operator
-                    compressed: false
-                    Statistics: Num rows: 20 Data size: 4720 Basic stats: COMPLETE Column stats: NONE
-                    table:
-                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                outputColumnNames: _col0, _col1, _col2, _col3
+                Statistics: Num rows: 12288 Data size: 2907994 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: CASE WHEN (((_col3 = 1L) and _col1 is not null)) THEN (1) ELSE (null) END (type: int), CASE WHEN (((_col3 = 2L) and _col2 is not null)) THEN (1) ELSE (null) END (type: int), _col0 (type: tinyint)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 12288 Data size: 2907994 Basic stats: COMPLETE Column stats: NONE
+                  Group By Operator
+                    aggregations: count(_col0), count(_col1)
+                    keys: _col2 (type: tinyint)
+                    mode: complete
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 6144 Data size: 1453997 Basic stats: COMPLETE Column stats: NONE
+                    Limit
+                      Number of rows: 20
+                      Statistics: Num rows: 20 Data size: 4720 Basic stats: COMPLETE Column stats: NONE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 20 Data size: 4720 Basic stats: COMPLETE Column stats: NONE
+                        table:
+                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator