You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2018/04/12 23:10:18 UTC

[6/6] impala git commit: IMPALA-6822: Add a query option to control shuffling by distinct exprs

IMPALA-6822: Add a query option to control shuffling by distinct exprs

IMPALA-4794 changed the distinct aggregation behavior to shuffling by
both grouping exprs and the distinct expr. It's slower in queries
where the NDVs of grouping exprs are high and data are uniformly
distributed among groups. This patch adds a query option controlling
this behavior, letting users switch to the old plan.

Change-Id: Icb4b4576fb29edd62cf4b4ba0719c0e0a2a5a8dc
Reviewed-on: http://gerrit.cloudera.org:8080/9949
Reviewed-by: Tianyi Wang <tw...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9a751f00
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9a751f00
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9a751f00

Branch: refs/heads/master
Commit: 9a751f00b8a399116c12a81e130a696b01eb1ba8
Parents: bc466c2
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Fri Apr 6 17:43:37 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Apr 12 22:01:35 2018 +0000

----------------------------------------------------------------------
 be/src/service/query-options.cc                 |   5 +
 be/src/service/query-options.h                  |   6 +-
 common/thrift/ImpalaInternalService.thrift      |   7 +
 common/thrift/ImpalaService.thrift              |   7 +
 .../impala/planner/DistributedPlanner.java      |  48 ++-
 .../org/apache/impala/planner/PlannerTest.java  |   5 +
 .../PlannerTest/shuffle-by-distinct-exprs.test  | 329 +++++++++++++++++++
 .../queries/QueryTest/distinct.test             |  30 ++
 tests/query_test/test_aggregation.py            |  41 ++-
 9 files changed, 454 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 3c56f89..b219a00 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -632,6 +632,11 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_exec_time_limit_s(time_limit);
         break;
       }
+      case TImpalaQueryOptions::SHUFFLE_DISTINCT_EXPRS: {
+        query_options->__set_shuffle_distinct_exprs(
+                iequals(value, "true") || iequals(value, "1"));
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 2280cff..82e04a1 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::EXEC_TIME_LIMIT_S + 1);\
+      TImpalaQueryOptions::SHUFFLE_DISTINCT_EXPRS + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS,\
@@ -129,6 +129,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(compute_stats_min_sample_size, COMPUTE_STATS_MIN_SAMPLE_SIZE,\
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(exec_time_limit_s, EXEC_TIME_LIMIT_S, TQueryOptionLevel::REGULAR)\
+  QUERY_OPT_FN(shuffle_distinct_exprs, SHUFFLE_DISTINCT_EXPRS,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
@@ -149,7 +151,7 @@ std::string DebugQueryOptions(const TQueryOptions& query_options);
 
 /// Bitmask for the values of TQueryOptions.
 /// TODO: Find a way to set the size based on the number of fields.
-typedef std::bitset<64> QueryOptionsMask;
+typedef std::bitset<65> QueryOptionsMask;
 
 /// Updates the query options in dst from those in src where the query option is set
 /// (i.e. src->__isset.PROPERTY is true) and the corresponding bit in mask is set. If

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index e96ed87..8cbc573 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -272,6 +272,13 @@ struct TQueryOptions {
   // not include time spent in planning, scheduling or admission control. A value of 0
   // means no time limit.
   63: optional i32 exec_time_limit_s = 0;
+
+  // When a query has both grouping and distinct exprs, impala can optionally include the
+  // distinct exprs in the hash exchange of the first aggregation phase to spread the data
+  // among more nodes. However, this plan requires another hash exchange on the grouping
+  // exprs in the second phase which is not required when omitting the distinct exprs in
+  // the first phase. Shuffling by both is better if the grouping exprs have low NDVs.
+  64: optional bool shuffle_distinct_exprs = true;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 356f5e5..e25bd60 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -283,6 +283,13 @@ enum TImpalaQueryOptions {
   // not include time spent in planning, scheduling or admission control. A value of 0
   // means no time limit.
   EXEC_TIME_LIMIT_S,
+
+  // When a query has both grouping and distinct exprs, impala can optionally include the
+  // distinct exprs in the hash exchange of the first aggregation phase to spread the data
+  // among more nodes. However, this plan requires another hash exchange on the grouping
+  // exprs in the second phase which is not required when omitting the distinct exprs in
+  // the first phase. Shuffling by both is better if the grouping exprs have low NDVs.
+  SHUFFLE_DISTINCT_EXPRS,
 }
 
 // The summary of a DML statement.

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 241a71e..b388673 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -883,27 +883,46 @@ public class DistributedPlanner {
   private PlanFragment createPhase2DistinctAggregationFragment(
       AggregationNode phase2AggNode, PlanFragment childFragment,
       ArrayList<PlanFragment> fragments) throws ImpalaException {
+    // When a query has both grouping and distinct exprs, impala can optionally include
+    // the distinct exprs in the hash exchange of the first aggregation phase to spread
+    // the data among more nodes. However, this plan requires another hash exchange on the
+    // grouping exprs in the second phase which is not required when omitting the distinct
+    // exprs in the first phase. Shuffling by both is better if the grouping exprs have
+    // low NDVs.
+    boolean shuffleDistinctExprs = ctx_.getQueryOptions().shuffle_distinct_exprs ||
+        phase2AggNode.getAggInfo().getGroupingExprs().isEmpty();
     // The phase-1 aggregation node is already in the child fragment.
     Preconditions.checkState(phase2AggNode.getChild(0) == childFragment.getPlanRoot());
 
     AggregateInfo phase1AggInfo = ((AggregationNode) phase2AggNode.getChild(0))
         .getAggInfo();
-    // We need to do
-    // - child fragment:
-    //   * phase-1 aggregation
-    // - first merge fragment, hash-partitioned on grouping and distinct exprs:
-    //   * merge agg of phase-1
-    //   * phase-2 agg
-    // - second merge fragment, partitioned on grouping exprs or unpartitioned
-    //   without grouping exprs
-    //   * merge agg of phase-2
+    ArrayList<Expr> partitionExprs;
     // With grouping, the output partition exprs of the child are the (input) grouping
     // exprs of the parent. The grouping exprs reference the output tuple of phase-1
     // but the partitioning happens on the intermediate tuple of the phase-1.
-    ArrayList<Expr> partitionExprs = Expr.substituteList(
-        phase1AggInfo.getGroupingExprs(), phase1AggInfo.getIntermediateSmap(),
-        ctx_.getRootAnalyzer(), false);
-
+    if (shuffleDistinctExprs) {
+      // We need to do
+      // - child fragment:
+      //   * phase-1 aggregation
+      // - first merge fragment, hash-partitioned on grouping and distinct exprs:
+      //   * merge agg of phase-1
+      //   * phase-2 agg
+      // - second merge fragment, partitioned on grouping exprs or unpartitioned
+      //   without grouping exprs
+      //   * merge agg of phase-2
+      partitionExprs = Expr.substituteList(
+          phase1AggInfo.getGroupingExprs(), phase1AggInfo.getIntermediateSmap(),
+          ctx_.getRootAnalyzer(), false);
+    } else {
+      // We need to do
+      // - child fragment:
+      //   * phase-1 aggregation
+      // - merge fragment, hash-partitioned on grouping exprs:
+      //   * merge agg of phase-1
+      //   * phase-2 agg
+      partitionExprs = Expr.substituteList(phase2AggNode.getAggInfo().getGroupingExprs(),
+          phase1AggInfo.getOutputToIntermediateSmap(), ctx_.getRootAnalyzer(), false);
+    }
     PlanFragment firstMergeFragment;
     boolean childHasCompatPartition = ctx_.getRootAnalyzer().setsHaveValueTransfer(
         partitionExprs, childFragment.getDataPartition().getPartitionExprs(), true);
@@ -932,8 +951,9 @@ public class DistributedPlanner {
       // if there is a limit, it had already been placed with the phase-2 aggregation
       // step (which is where it should be)
       firstMergeFragment.addPlanRoot(phase2AggNode);
-      fragments.add(firstMergeFragment);
+      if (shuffleDistinctExprs) fragments.add(firstMergeFragment);
     }
+    if (!shuffleDistinctExprs) return firstMergeFragment;
     phase2AggNode.unsetNeedsFinalize();
     phase2AggNode.setIntermediateTuple();
     // Limit should be applied at the final merge aggregation node

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 5dbba75..8b9166f 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -75,6 +75,11 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testShuffleByDistinctExprs() {
+    runPlannerTestFile("shuffle-by-distinct-exprs");
+  }
+
+  @Test
   public void testAggregation() {
     runPlannerTestFile("aggregation");
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/testdata/workloads/functional-planner/queries/PlannerTest/shuffle-by-distinct-exprs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/shuffle-by-distinct-exprs.test b/testdata/workloads/functional-planner/queries/PlannerTest/shuffle-by-distinct-exprs.test
new file mode 100644
index 0000000..74f09c3
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/shuffle-by-distinct-exprs.test
@@ -0,0 +1,329 @@
+# Distinct agg without a grouping expr
+select count(distinct int_col) from functional.alltypes;
+---- QUERYOPTIONS
+SHUFFLE_DISTINCT_EXPRS=false
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  output: count(int_col)
+|
+04:AGGREGATE
+|  group by: int_col
+|
+03:EXCHANGE [HASH(int_col)]
+|
+01:AGGREGATE [STREAMING]
+|  group by: int_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+select count(distinct int_col) from functional.alltypes;
+---- QUERYOPTIONS
+# Distinct exprs in a aggregation without grouping is always shuffled by. Setting it to
+# true doesn't affect the plan.
+SHUFFLE_DISTINCT_EXPRS=true
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  output: count(int_col)
+|
+04:AGGREGATE
+|  group by: int_col
+|
+03:EXCHANGE [HASH(int_col)]
+|
+01:AGGREGATE [STREAMING]
+|  group by: int_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Distinct agg with a grouping expr
+select count(distinct int_col) from functional.alltypes group by year;
+---- QUERYOPTIONS
+SHUFFLE_DISTINCT_EXPRS=false
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(int_col)
+|  group by: year
+|
+04:AGGREGATE
+|  group by: year, int_col
+|
+03:EXCHANGE [HASH(year)]
+|
+01:AGGREGATE [STREAMING]
+|  group by: year, int_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+select count(distinct int_col) from functional.alltypes group by year;
+---- QUERYOPTIONS
+# Shuffling by distinct exprs will create 1 more exchange node and 1 more agg node.
+SHUFFLE_DISTINCT_EXPRS=true
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
+|  group by: year
+|
+05:EXCHANGE [HASH(year)]
+|
+02:AGGREGATE [STREAMING]
+|  output: count(int_col)
+|  group by: year
+|
+04:AGGREGATE
+|  group by: year, int_col
+|
+03:EXCHANGE [HASH(year,int_col)]
+|
+01:AGGREGATE [STREAMING]
+|  group by: year, int_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Distinct agg without a grouping expr and with a compatible child partition
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+    functional.alltypes b on a.int_col = b.int_col;
+---- QUERYOPTIONS
+SHUFFLE_DISTINCT_EXPRS=false
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(a.int_col)
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:AGGREGATE
+|  output: count(a.int_col)
+|
+03:AGGREGATE
+|  group by: a.int_col
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.int_col = b.int_col
+|  runtime filters: RF000 <- b.int_col
+|
+|--06:EXCHANGE [HASH(b.int_col)]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:EXCHANGE [HASH(a.int_col)]
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.int_col
+====
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+    functional.alltypes b on a.int_col = b.int_col;
+---- QUERYOPTIONS
+# Distinct exprs in a aggregation without grouping is always shuffled by. Setting it to
+# true doesn't affect the plan.
+SHUFFLE_DISTINCT_EXPRS=true
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(a.int_col)
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:AGGREGATE
+|  output: count(a.int_col)
+|
+03:AGGREGATE
+|  group by: a.int_col
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.int_col = b.int_col
+|  runtime filters: RF000 <- b.int_col
+|
+|--06:EXCHANGE [HASH(b.int_col)]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:EXCHANGE [HASH(a.int_col)]
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.int_col
+====
+# Distinct agg with a grouping expr and a compatible child partition
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+    functional.alltypes b on a.year = b.year group by a.year;
+---- QUERYOPTIONS
+SHUFFLE_DISTINCT_EXPRS=false
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:AGGREGATE [FINALIZE]
+|  output: count(a.int_col)
+|  group by: a.year
+|
+03:AGGREGATE
+|  group by: a.year, a.int_col
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.year = b.year
+|  runtime filters: RF000 <- b.year
+|
+|--06:EXCHANGE [HASH(b.year)]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:EXCHANGE [HASH(a.year)]
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.year
+====
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+    functional.alltypes b on a.year = b.year group by a.year;
+---- QUERYOPTIONS
+# Shuffling by distinct exprs will create 2 more exchange nodes and 2 more agg nodes.
+SHUFFLE_DISTINCT_EXPRS=true
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+11:EXCHANGE [UNPARTITIONED]
+|
+10:AGGREGATE [FINALIZE]
+|  output: count:merge(a.int_col)
+|  group by: a.year
+|
+09:EXCHANGE [HASH(a.year)]
+|
+04:AGGREGATE [STREAMING]
+|  output: count(a.int_col)
+|  group by: a.year
+|
+08:AGGREGATE
+|  group by: a.year, a.int_col
+|
+07:EXCHANGE [HASH(a.year,a.int_col)]
+|
+03:AGGREGATE [STREAMING]
+|  group by: a.year, a.int_col
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.year = b.year
+|  runtime filters: RF000 <- b.year
+|
+|--06:EXCHANGE [HASH(b.year)]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:EXCHANGE [HASH(a.year)]
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.year
+====
+# The input is partitioned by distinct exprs + grouping exprs
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+   functional.alltypes b on a.year = b.year and a.int_col = b.int_col group by a.year;
+---- QUERYOPTIONS
+# The input partition is compatible with grouping exprs + distinct exprs. Phase-1 merge
+# aggregation is skipped.
+SHUFFLE_DISTINCT_EXPRS=true
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+09:EXCHANGE [UNPARTITIONED]
+|
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(a.int_col)
+|  group by: a.year
+|
+07:EXCHANGE [HASH(a.year)]
+|
+04:AGGREGATE [STREAMING]
+|  output: count(a.int_col)
+|  group by: a.year
+|
+03:AGGREGATE
+|  group by: a.year, a.int_col
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.int_col = b.int_col, a.year = b.year
+|  runtime filters: RF000 <- b.int_col, RF001 <- b.year
+|
+|--06:EXCHANGE [HASH(b.int_col,b.year)]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:EXCHANGE [HASH(a.int_col,a.year)]
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.int_col, RF001 -> a.year
+====
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+   functional.alltypes b on a.year = b.year and a.int_col = b.int_col group by a.year;
+---- QUERYOPTIONS
+# The input partition is not compatible with grouping exprs. Phase-1 merge aggregation is
+# executed.
+SHUFFLE_DISTINCT_EXPRS=false
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+09:EXCHANGE [UNPARTITIONED]
+|
+04:AGGREGATE [FINALIZE]
+|  output: count(a.int_col)
+|  group by: a.year
+|
+08:AGGREGATE
+|  group by: a.year, a.int_col
+|
+07:EXCHANGE [HASH(a.year)]
+|
+03:AGGREGATE [STREAMING]
+|  group by: a.year, a.int_col
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.int_col = b.int_col, a.year = b.year
+|  runtime filters: RF000 <- b.int_col, RF001 <- b.year
+|
+|--06:EXCHANGE [HASH(b.int_col,b.year)]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:EXCHANGE [HASH(a.int_col,a.year)]
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.int_col, RF001 -> a.year
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/testdata/workloads/functional-query/queries/QueryTest/distinct.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/distinct.test b/testdata/workloads/functional-query/queries/QueryTest/distinct.test
index fb51584..0e65e53 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/distinct.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/distinct.test
@@ -370,3 +370,33 @@ SELECT COUNT(*) FROM
 ---- TYPES
 bigint
 ====
+---- QUERY
+# Distinct agg without a grouping expr and with a compatible child partition
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+    functional.alltypes b on a.int_col = b.int_col;
+---- RESULTS
+10
+---- TYPES
+bigint
+====
+---- QUERY
+# Distinct agg with a grouping expr. The input is partitioned by grouping exprs.
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+    functional.alltypes b on a.year = b.year group by a.year;
+---- RESULTS
+10
+10
+---- TYPES
+bigint
+====
+---- QUERY
+# Distinct agg with a grouping expr. The input is partitioned by grouping exprs and
+# distinct exprs.
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+   functional.alltypes b on a.year = b.year and a.int_col = b.int_col group by a.year;
+---- RESULTS
+10
+10
+---- TYPES
+bigint
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/tests/query_test/test_aggregation.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py
index 97c0e41..daaa741 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -23,6 +23,7 @@ from testdata.common import widetable
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import (
     create_exec_option_dimension,
+    create_exec_option_dimension_from_dict,
     create_uncompressed_text_dimension)
 from tests.common.test_result_verifier import (
     assert_codegen_enabled,
@@ -198,14 +199,6 @@ class TestAggregationQueries(ImpalaTestSuite):
       pytest.xfail(reason="IMPALA-283 - select count(*) produces inconsistent results")
     self.run_test_case('QueryTest/aggregation', vector)
 
-  def test_distinct(self, vector):
-    if vector.get_value('table_format').file_format == 'hbase':
-      pytest.xfail("HBase returns columns in alphabetical order for select distinct *, "
-                   "making the result verication to fail.")
-    if vector.get_value('table_format').file_format == 'kudu':
-      pytest.xfail("IMPALA-4042: count(distinct NULL) fails on a view, needed for kudu")
-    self.run_test_case('QueryTest/distinct', vector)
-
   def test_group_concat(self, vector):
     """group_concat distinct tests
        Required to run directly in python because the order in which results will be
@@ -339,6 +332,38 @@ class TestAggregationQueries(ImpalaTestSuite):
       for i in xrange(14, 16):
         self.appx_equals(int(sampled_ndv_vals[i]) * sample_perc, int(ndv_vals[i]), 2.0)
 
+
+class TestDistinctAggregation(ImpalaTestSuite):
+  """Run the distinct aggregation test suite, with codegen and shuffle_distinct_exprs
+  enabled and disabled."""
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestDistinctAggregation, cls).add_test_dimensions()
+
+    cls.ImpalaTestMatrix.add_dimension(
+      create_exec_option_dimension_from_dict({
+        'disable_codegen': [False, True],
+        'shuffle_distinct_exprs': [False, True]
+      }))
+
+    if cls.exploration_strategy() == 'core':
+      cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'text' and
+        v.get_value('table_format').compression_codec == 'none')
+
+  def test_distinct(self, vector):
+    if vector.get_value('table_format').file_format == 'hbase':
+      pytest.xfail("HBase returns columns in alphabetical order for select distinct *, "
+                   "making the result verication to fail.")
+    if vector.get_value('table_format').file_format == 'kudu':
+      pytest.xfail("IMPALA-4042: count(distinct NULL) fails on a view, needed for kudu")
+    self.run_test_case('QueryTest/distinct', vector)
+
+
 class TestWideAggregationQueries(ImpalaTestSuite):
   """Test that aggregations with many grouping columns work"""
   @classmethod