You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2018/04/12 23:10:13 UTC

[1/6] impala git commit: IMPALA-6842: [DOCS] Remove disable_admission_control startup flag

Repository: impala
Updated Branches:
  refs/heads/master 15b388c5f -> 9a751f00b


IMPALA-6842: [DOCS] Remove disable_admission_control startup flag

Change-Id: Idbd15823308dbce5d2d00e79607e5ebbdab3e38f
Reviewed-on: http://gerrit.cloudera.org:8080/10046
Reviewed-by: Alex Rodoni <ar...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ead30cff
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ead30cff
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ead30cff

Branch: refs/heads/master
Commit: ead30cff35574a25151691ab24872f6efdeea658
Parents: 15b388c
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Thu Apr 12 11:26:05 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Apr 12 18:51:06 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_admission.xml | 16 ----------------
 1 file changed, 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ead30cff/docs/topics/impala_admission.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_admission.xml b/docs/topics/impala_admission.xml
index f1359cf..5de246b 100644
--- a/docs/topics/impala_admission.xml
+++ b/docs/topics/impala_admission.xml
@@ -484,22 +484,6 @@ under the License.
                 <codeph>""</codeph> (empty string, meaning unlimited) </p>
             </dd>
           </dlentry>
-          <dlentry id="disable_admission_control">
-            <dt>
-              <codeph>disable_admission_control</codeph>
-            </dt>
-            <dd>
-              <indexterm audience="hidden">--disable_admission_control</indexterm>
-              <b>Purpose:</b> Turns off the admission control feature entirely,
-              regardless of other configuration option settings.
-              <p>
-                <b>Type:</b> Boolean </p>
-              <p>
-                <b>Default:</b>
-                <codeph>false</codeph>
-              </p>
-            </dd>
-          </dlentry>
           <dlentry id="disable_pool_max_requests">
             <dt>
               <codeph>disable_pool_max_requests</codeph>


[2/6] impala git commit: IMPALA-6480: [DOCS] DESCRIBE respects column-level privilege

Posted by ph...@apache.org.
IMPALA-6480: [DOCS] DESCRIBE respects column-level privilege

Cherry-picks: not for 2.x.

Change-Id: I094e00c2a4e8b19226e06afd8cf67968265edc4d
Reviewed-on: http://gerrit.cloudera.org:8080/9996
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/75b612ad
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/75b612ad
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/75b612ad

Branch: refs/heads/master
Commit: 75b612ad49e962052a6dcaf931b2a302e2c30413
Parents: ead30cf
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Wed Apr 11 13:27:43 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Apr 12 20:19:38 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_describe.xml | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/75b612ad/docs/topics/impala_describe.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_describe.xml b/docs/topics/impala_describe.xml
index 17d8875..b6b59ad 100644
--- a/docs/topics/impala_describe.xml
+++ b/docs/topics/impala_describe.xml
@@ -132,6 +132,21 @@ describe database extended default;
   If the <codeph>DATABASE</codeph> keyword is omitted, the default
   for the <codeph>DESCRIBE</codeph> statement is to refer to a table.
 </p>
+    <p>
+      If you have the <codeph>SELECT</codeph> privilege on a subset of the table
+      columns and no other relevant table/database/server-level privileges,
+      <codeph>DESCRIBE</codeph> returns the data from the columns you have
+      access to.
+    </p>
+
+    <p>
+      If you have the <codeph>SELECT</codeph> privilege on a subset of the table
+      columns and no other relevant table/database/server-level privileges,
+      <codeph>DESCRIBE FORMATTED/EXTENDED</codeph> does not return
+      the <codeph>LOCATION</codeph> field. The <codeph>LOCATION</codeph> data
+      is shown if you have any privilege on the table, the containing database
+      or the server.
+    </p>
 
 <codeblock>
 -- By default, the table is assumed to be in the current database.


[4/6] impala git commit: IMPALA-5814: Remove startup flag to disable admission control

Posted by ph...@apache.org.
IMPALA-5814: Remove startup flag to disable admission control

Remove "--disable admission control" startup flag and its related
functionality and usage.

Cherry-picks: not for 2.x

Change-Id: I9bf4087ce03ca63f82fd27c6d94b578881b85d42
Reviewed-on: http://gerrit.cloudera.org:8080/9964
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4438a85a
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4438a85a
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4438a85a

Branch: refs/heads/master
Commit: 4438a85a34eda96a23c01862589c89219de733fb
Parents: 318051c
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Thu Jan 18 14:29:38 2018 -0800
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Apr 12 21:05:22 2018 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc                     |  1 +
 be/src/runtime/coordinator.cc                     |  3 ++-
 be/src/runtime/exec-env.cc                        | 11 +++--------
 be/src/service/client-request-state.cc            | 11 +++++------
 bin/generate_minidump_collection_testdata.py      |  1 -
 tests/custom_cluster/test_admission_controller.py |  6 ++----
 6 files changed, 13 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4438a85a/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index b62170d..a15e91b 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -246,3 +246,4 @@ REMOVED_FLAG(rpc_cnxn_retry_interval_ms);
 REMOVED_FLAG(staging_cgroup);
 REMOVED_FLAG(suppress_unknown_disk_id_warnings);
 REMOVED_FLAG(use_statestore);
+REMOVED_FLAG(disable_admission_control);

http://git-wip-us.apache.org/repos/asf/impala/blob/4438a85a/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 5a3de5d..bdd02eb 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -784,7 +784,8 @@ void Coordinator::ReleaseAdmissionControlResourcesLocked() {
             << PrintId(query_ctx().query_id);
   AdmissionController* admission_controller =
       ExecEnv::GetInstance()->admission_controller();
-  if (admission_controller != nullptr) admission_controller->ReleaseQuery(schedule_);
+  DCHECK(admission_controller != nullptr);
+  admission_controller->ReleaseQuery(schedule_);
   released_admission_control_resources_ = true;
   query_events_->MarkEvent("Released admission control resources");
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/4438a85a/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 57e1406..072baf3 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -79,7 +79,6 @@ DEFINE_int32(state_store_subscriber_port, 23000,
     "port where StatestoreSubscriberService should be exported");
 DEFINE_int32(num_hdfs_worker_threads, 16,
     "(Advanced) The number of threads in the global HDFS operation pool");
-DEFINE_bool(disable_admission_control, false, "Disables admission control.");
 DEFINE_bool(use_krpc, true, "If true, use KRPC for the DataStream subsystem. "
     "Otherwise use Thrift RPC.");
 
@@ -185,12 +184,8 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port,
         request_pool_service_.get()));
   }
 
-  if (FLAGS_disable_admission_control) {
-    LOG(INFO) << "Admission control is disabled.";
-  } else {
-    admission_controller_.reset(new AdmissionController(statestore_subscriber_.get(),
-        request_pool_service_.get(), metrics_.get(), backend_address_));
-  }
+  admission_controller_.reset(new AdmissionController(statestore_subscriber_.get(),
+      request_pool_service_.get(), metrics_.get(), backend_address_));
   exec_env_ = this;
 }
 
@@ -365,7 +360,7 @@ Status ExecEnv::Init() {
   if (scheduler_ != nullptr) {
     RETURN_IF_ERROR(scheduler_->Init(backend_address_, krpc_address_, ip_address_));
   }
-  if (admission_controller_ != nullptr) RETURN_IF_ERROR(admission_controller_->Init());
+  RETURN_IF_ERROR(admission_controller_->Init());
 
   // Get the fs.defaultFS value set in core-site.xml and assign it to configured_defaultFs
   TGetHadoopConfigRequest config_request;

http://git-wip-us.apache.org/repos/asf/impala/blob/4438a85a/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index a8b9bfa..f58eeb8 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -469,12 +469,11 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
     RETURN_IF_ERROR(UpdateQueryStatus(status));
   }
 
-  if (exec_env_->admission_controller() != nullptr) {
-    status = exec_env_->admission_controller()->AdmitQuery(schedule_.get());
-    {
-      lock_guard<mutex> l(lock_);
-      RETURN_IF_ERROR(UpdateQueryStatus(status));
-    }
+  DCHECK(exec_env_->admission_controller() != nullptr);
+  status = exec_env_->admission_controller()->AdmitQuery(schedule_.get());
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_IF_ERROR(UpdateQueryStatus(status));
   }
 
   coord_.reset(new Coordinator(*schedule_, query_events_));

http://git-wip-us.apache.org/repos/asf/impala/blob/4438a85a/bin/generate_minidump_collection_testdata.py
----------------------------------------------------------------------
diff --git a/bin/generate_minidump_collection_testdata.py b/bin/generate_minidump_collection_testdata.py
index 5e3d9fb..021941b 100755
--- a/bin/generate_minidump_collection_testdata.py
+++ b/bin/generate_minidump_collection_testdata.py
@@ -74,7 +74,6 @@ CONFIG_FILE = '''-beeswax_port=21000
 -catalog_service_host=host2.example.com
 -catalog_service_port=26000
 -local_library_dir=/var/lib/impala/udfs
--disable_admission_control=true
 -disk_spill_encryption=false
 -abort_on_config_error=true'''
 

http://git-wip-us.apache.org/repos/asf/impala/blob/4438a85a/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index e5b4f99..ec32cce 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -145,8 +145,7 @@ def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
   if queue_wait_timeout_ms is not None:
     extra_flags += " -queue_wait_timeout_ms={0}".format(queue_wait_timeout_ms)
   return ("-vmodule admission-controller=3 -default_pool_max_requests {0} "
-      "-default_pool_max_queued {1} -default_pool_mem_limit {2} "
-      "-disable_admission_control=false {3}".format(
+      "-default_pool_max_queued {1} -default_pool_mem_limit {2} {3}".format(
       max_requests, max_queued, pool_max_mem, extra_flags))
 
 
@@ -156,8 +155,7 @@ def impalad_admission_ctrl_config_args(additional_args=""):
   fs_allocation_path = os.path.join(resources_dir, "fair-scheduler-test2.xml")
   llama_site_path = os.path.join(resources_dir, "llama-site-test2.xml")
   return ("-vmodule admission-controller=3 -fair_scheduler_allocation_path %s "
-        "-llama_site_path %s -disable_admission_control=false %s" %\
-        (fs_allocation_path, llama_site_path, additional_args))
+        "-llama_site_path %s %s" % (fs_allocation_path, llama_site_path, additional_args))
 
 def log_metrics(log_prefix, metrics):
   LOG.info("%sadmitted=%s, queued=%s, dequeued=%s, rejected=%s, "\


[3/6] impala git commit: IMPALA-2717: fix output of formatted unicode to non-TTY

Posted by ph...@apache.org.
IMPALA-2717: fix output of formatted unicode to non-TTY

The bug is that PrettyOutputFormatter.format() returned a unicode
object, and Python cannot automatically write unicode objects to
output streams where there is no default encoding.

The fix is to convert to UTF-8 encoded in a regular string, which
can be output to any output device. This makes the output type
consistent with DelimitedOutputFormatter.format().

Based on code by Marcell Szabo.

Testing:
Added a basic test.

Played around in an interactive shell to make sure that unicode
characters still work in interactive mode.

Change-Id: I9de641ecf767a2feef3b9f48b344ef2d55e17a7f
Reviewed-on: http://gerrit.cloudera.org:8080/9928
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/318051cc
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/318051cc
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/318051cc

Branch: refs/heads/master
Commit: 318051cc21cc7fbe96886e30b3f13b90bbb7b50a
Parents: 75b612a
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Apr 4 11:51:51 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Apr 12 20:34:47 2018 +0000

----------------------------------------------------------------------
 shell/impala_shell.py                 |  4 +++-
 shell/shell_output.py                 |  8 +++++++-
 tests/shell/test_shell_commandline.py | 25 +++++++++++++++++++++----
 3 files changed, 31 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/318051cc/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 93bdafb..55ea692 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -70,7 +70,9 @@ class CmdStatus:
   ERROR = False
 
 class ImpalaPrettyTable(prettytable.PrettyTable):
-  """Patched version of PrettyTable that TODO"""
+  """Patched version of PrettyTable with different unicode handling - instead of throwing
+  exceptions when a character can't be converted to unicode, it is replaced with a
+  placeholder character."""
   def _unicode(self, value):
     if not isinstance(value, basestring):
       value = str(value)

http://git-wip-us.apache.org/repos/asf/impala/blob/318051cc/shell/shell_output.py
----------------------------------------------------------------------
diff --git a/shell/shell_output.py b/shell/shell_output.py
index f0cecc8..8ab3bee 100644
--- a/shell/shell_output.py
+++ b/shell/shell_output.py
@@ -28,11 +28,16 @@ class PrettyOutputFormatter(object):
     self.prettytable = prettytable
 
   def format(self, rows):
+    """Returns string containing UTF-8-encoded representation of the table data."""
     # Clear rows that already exist in the table.
     self.prettytable.clear_rows()
     try:
       map(self.prettytable.add_row, rows)
-      return self.prettytable.get_string()
+      # PrettyTable.get_string() converts UTF-8-encoded strs added via add_row() into
+      # Python unicode strings. We need to convert it back to a UTF-8-encoded str for
+      # output, since Python won't do the encoding automatically when outputting to a
+      # non-terminal (see IMPALA-2717).
+      return self.prettytable.get_string().encode('utf-8')
     except Exception, e:
       # beeswax returns each row as a tab separated string. If a string column
       # value in a row has tabs, it will break the row split. Default to displaying
@@ -53,6 +58,7 @@ class DelimitedOutputFormatter(object):
         raise ValueError, error_msg
 
   def format(self, rows):
+    """Returns string containing UTF-8-encoded representation of the table data."""
     # csv.writer expects a file handle to the input.
     # cStringIO is used as the temporary buffer.
     temp_buffer = StringIO()

http://git-wip-us.apache.org/repos/asf/impala/blob/318051cc/tests/shell/test_shell_commandline.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index e69c512..6aa05f6 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -33,6 +33,8 @@ from util import assert_var_substitution, run_impala_shell_cmd, ImpalaShell
 DEFAULT_QUERY = 'select 1'
 QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
 
+RUSSIAN_CHARS = (u"А, Б, В, Г, Д, Е, Ё, Ж, З, И, Й, К, Л, М, Н, О, П, Р,"
+                 u"С, Т, У, Ф, Х, Ц,Ч, Ш, Щ, Ъ, Ы, Ь, Э, Ю, Я")
 
 @pytest.fixture
 def empty_table(unique_database, request):
@@ -405,12 +407,27 @@ class TestImpalaShell(ImpalaTestSuite):
 
   def test_international_characters(self):
     """Sanity test to ensure that the shell can read international characters."""
-    russian_chars = (u"А, Б, В, Г, Д, Е, Ё, Ж, З, И, Й, К, Л, М, Н, О, П, Р,"
-                     u"С, Т, У, Ф, Х, Ц,Ч, Ш, Щ, Ъ, Ы, Ь, Э, Ю, Я")
-    args = """-B -q "select '%s'" """ % russian_chars
+    args = """-B -q "select '%s'" """ % RUSSIAN_CHARS
     result = run_impala_shell_cmd(args.encode('utf-8'))
     assert 'UnicodeDecodeError' not in result.stderr
-    assert russian_chars.encode('utf-8') in result.stdout
+    assert RUSSIAN_CHARS.encode('utf-8') in result.stdout
+
+  def test_international_characters_prettyprint(self):
+    """IMPALA-2717: ensure we can handle international characters in pretty-printed
+    output"""
+    args = """-q "select '%s'" """ % RUSSIAN_CHARS
+    result = run_impala_shell_cmd(args.encode('utf-8'))
+    assert 'UnicodeDecodeError' not in result.stderr
+    assert RUSSIAN_CHARS.encode('utf-8') in result.stdout
+
+  def test_international_characters_prettyprint_tabs(self):
+    """IMPALA-2717: ensure we can handle international characters in pretty-printed
+    output when pretty-printing falls back to delimited output."""
+    args = """-q "select '%s\\t'" """ % RUSSIAN_CHARS
+    result = run_impala_shell_cmd(args.encode('utf-8'))
+    assert 'Reverting to tab delimited text' in result.stderr
+    assert 'UnicodeDecodeError' not in result.stderr
+    assert RUSSIAN_CHARS.encode('utf-8') in result.stdout
 
   @pytest.mark.execute_serially  # This tests invalidates metadata, and must run serially
   def test_config_file(self):


[5/6] impala git commit: Bump Kudu version to a954418

Posted by ph...@apache.org.
Bump Kudu version to a954418

Change-Id: Ib06c0fb3c24a8cee1dd4f34a221cf41a711a5359
Reviewed-on: http://gerrit.cloudera.org:8080/9982
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bc466c22
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bc466c22
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bc466c22

Branch: refs/heads/master
Commit: bc466c226f5f39014cf3fa2b6b88c0427ee3ec8a
Parents: 4438a85
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Tue Apr 10 23:11:27 2018 +0000
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Apr 12 21:22:34 2018 +0000

----------------------------------------------------------------------
 bin/impala-config.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/bc466c22/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 343610f..88a5dda 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -79,7 +79,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=88-8e37339d45
+export IMPALA_TOOLCHAIN_BUILD_ID=102-02a8e245df
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p4
@@ -167,7 +167,7 @@ if [[ $OSTYPE == "darwin"* ]]; then
 fi
 
 # Kudu version in the toolchain; provides libkudu_client.so and minicluster binaries.
-export IMPALA_KUDU_VERSION=0eef8e0
+export IMPALA_KUDU_VERSION=a954418
 unset IMPALA_KUDU_URL
 
 


[6/6] impala git commit: IMPALA-6822: Add a query option to control shuffling by distinct exprs

Posted by ph...@apache.org.
IMPALA-6822: Add a query option to control shuffling by distinct exprs

IMPALA-4794 changed the distinct aggregation behavior to shuffling by
both grouping exprs and the distinct expr. It's slower in queries
where the NDVs of grouping exprs are high and data are uniformly
distributed among groups. This patch adds a query option controlling
this behavior, letting users switch to the old plan.

Change-Id: Icb4b4576fb29edd62cf4b4ba0719c0e0a2a5a8dc
Reviewed-on: http://gerrit.cloudera.org:8080/9949
Reviewed-by: Tianyi Wang <tw...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9a751f00
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9a751f00
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9a751f00

Branch: refs/heads/master
Commit: 9a751f00b8a399116c12a81e130a696b01eb1ba8
Parents: bc466c2
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Fri Apr 6 17:43:37 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Apr 12 22:01:35 2018 +0000

----------------------------------------------------------------------
 be/src/service/query-options.cc                 |   5 +
 be/src/service/query-options.h                  |   6 +-
 common/thrift/ImpalaInternalService.thrift      |   7 +
 common/thrift/ImpalaService.thrift              |   7 +
 .../impala/planner/DistributedPlanner.java      |  48 ++-
 .../org/apache/impala/planner/PlannerTest.java  |   5 +
 .../PlannerTest/shuffle-by-distinct-exprs.test  | 329 +++++++++++++++++++
 .../queries/QueryTest/distinct.test             |  30 ++
 tests/query_test/test_aggregation.py            |  41 ++-
 9 files changed, 454 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 3c56f89..b219a00 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -632,6 +632,11 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_exec_time_limit_s(time_limit);
         break;
       }
+      case TImpalaQueryOptions::SHUFFLE_DISTINCT_EXPRS: {
+        query_options->__set_shuffle_distinct_exprs(
+                iequals(value, "true") || iequals(value, "1"));
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 2280cff..82e04a1 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::EXEC_TIME_LIMIT_S + 1);\
+      TImpalaQueryOptions::SHUFFLE_DISTINCT_EXPRS + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS,\
@@ -129,6 +129,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(compute_stats_min_sample_size, COMPUTE_STATS_MIN_SAMPLE_SIZE,\
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(exec_time_limit_s, EXEC_TIME_LIMIT_S, TQueryOptionLevel::REGULAR)\
+  QUERY_OPT_FN(shuffle_distinct_exprs, SHUFFLE_DISTINCT_EXPRS,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
@@ -149,7 +151,7 @@ std::string DebugQueryOptions(const TQueryOptions& query_options);
 
 /// Bitmask for the values of TQueryOptions.
 /// TODO: Find a way to set the size based on the number of fields.
-typedef std::bitset<64> QueryOptionsMask;
+typedef std::bitset<65> QueryOptionsMask;
 
 /// Updates the query options in dst from those in src where the query option is set
 /// (i.e. src->__isset.PROPERTY is true) and the corresponding bit in mask is set. If

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index e96ed87..8cbc573 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -272,6 +272,13 @@ struct TQueryOptions {
   // not include time spent in planning, scheduling or admission control. A value of 0
   // means no time limit.
   63: optional i32 exec_time_limit_s = 0;
+
+  // When a query has both grouping and distinct exprs, impala can optionally include the
+  // distinct exprs in the hash exchange of the first aggregation phase to spread the data
+  // among more nodes. However, this plan requires another hash exchange on the grouping
+  // exprs in the second phase which is not required when omitting the distinct exprs in
+  // the first phase. Shuffling by both is better if the grouping exprs have low NDVs.
+  64: optional bool shuffle_distinct_exprs = true;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 356f5e5..e25bd60 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -283,6 +283,13 @@ enum TImpalaQueryOptions {
   // not include time spent in planning, scheduling or admission control. A value of 0
   // means no time limit.
   EXEC_TIME_LIMIT_S,
+
+  // When a query has both grouping and distinct exprs, impala can optionally include the
+  // distinct exprs in the hash exchange of the first aggregation phase to spread the data
+  // among more nodes. However, this plan requires another hash exchange on the grouping
+  // exprs in the second phase which is not required when omitting the distinct exprs in
+  // the first phase. Shuffling by both is better if the grouping exprs have low NDVs.
+  SHUFFLE_DISTINCT_EXPRS,
 }
 
 // The summary of a DML statement.

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 241a71e..b388673 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -883,27 +883,46 @@ public class DistributedPlanner {
   private PlanFragment createPhase2DistinctAggregationFragment(
       AggregationNode phase2AggNode, PlanFragment childFragment,
       ArrayList<PlanFragment> fragments) throws ImpalaException {
+    // When a query has both grouping and distinct exprs, impala can optionally include
+    // the distinct exprs in the hash exchange of the first aggregation phase to spread
+    // the data among more nodes. However, this plan requires another hash exchange on the
+    // grouping exprs in the second phase which is not required when omitting the distinct
+    // exprs in the first phase. Shuffling by both is better if the grouping exprs have
+    // low NDVs.
+    boolean shuffleDistinctExprs = ctx_.getQueryOptions().shuffle_distinct_exprs ||
+        phase2AggNode.getAggInfo().getGroupingExprs().isEmpty();
     // The phase-1 aggregation node is already in the child fragment.
     Preconditions.checkState(phase2AggNode.getChild(0) == childFragment.getPlanRoot());
 
     AggregateInfo phase1AggInfo = ((AggregationNode) phase2AggNode.getChild(0))
         .getAggInfo();
-    // We need to do
-    // - child fragment:
-    //   * phase-1 aggregation
-    // - first merge fragment, hash-partitioned on grouping and distinct exprs:
-    //   * merge agg of phase-1
-    //   * phase-2 agg
-    // - second merge fragment, partitioned on grouping exprs or unpartitioned
-    //   without grouping exprs
-    //   * merge agg of phase-2
+    ArrayList<Expr> partitionExprs;
     // With grouping, the output partition exprs of the child are the (input) grouping
     // exprs of the parent. The grouping exprs reference the output tuple of phase-1
     // but the partitioning happens on the intermediate tuple of the phase-1.
-    ArrayList<Expr> partitionExprs = Expr.substituteList(
-        phase1AggInfo.getGroupingExprs(), phase1AggInfo.getIntermediateSmap(),
-        ctx_.getRootAnalyzer(), false);
-
+    if (shuffleDistinctExprs) {
+      // We need to do
+      // - child fragment:
+      //   * phase-1 aggregation
+      // - first merge fragment, hash-partitioned on grouping and distinct exprs:
+      //   * merge agg of phase-1
+      //   * phase-2 agg
+      // - second merge fragment, partitioned on grouping exprs or unpartitioned
+      //   without grouping exprs
+      //   * merge agg of phase-2
+      partitionExprs = Expr.substituteList(
+          phase1AggInfo.getGroupingExprs(), phase1AggInfo.getIntermediateSmap(),
+          ctx_.getRootAnalyzer(), false);
+    } else {
+      // We need to do
+      // - child fragment:
+      //   * phase-1 aggregation
+      // - merge fragment, hash-partitioned on grouping exprs:
+      //   * merge agg of phase-1
+      //   * phase-2 agg
+      partitionExprs = Expr.substituteList(phase2AggNode.getAggInfo().getGroupingExprs(),
+          phase1AggInfo.getOutputToIntermediateSmap(), ctx_.getRootAnalyzer(), false);
+    }
     PlanFragment firstMergeFragment;
     boolean childHasCompatPartition = ctx_.getRootAnalyzer().setsHaveValueTransfer(
         partitionExprs, childFragment.getDataPartition().getPartitionExprs(), true);
@@ -932,8 +951,9 @@ public class DistributedPlanner {
       // if there is a limit, it had already been placed with the phase-2 aggregation
       // step (which is where it should be)
       firstMergeFragment.addPlanRoot(phase2AggNode);
-      fragments.add(firstMergeFragment);
+      if (shuffleDistinctExprs) fragments.add(firstMergeFragment);
     }
+    if (!shuffleDistinctExprs) return firstMergeFragment;
     phase2AggNode.unsetNeedsFinalize();
     phase2AggNode.setIntermediateTuple();
     // Limit should be applied at the final merge aggregation node

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 5dbba75..8b9166f 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -75,6 +75,11 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testShuffleByDistinctExprs() {
+    runPlannerTestFile("shuffle-by-distinct-exprs");
+  }
+
+  @Test
   public void testAggregation() {
     runPlannerTestFile("aggregation");
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/testdata/workloads/functional-planner/queries/PlannerTest/shuffle-by-distinct-exprs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/shuffle-by-distinct-exprs.test b/testdata/workloads/functional-planner/queries/PlannerTest/shuffle-by-distinct-exprs.test
new file mode 100644
index 0000000..74f09c3
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/shuffle-by-distinct-exprs.test
@@ -0,0 +1,329 @@
+# Distinct agg without a grouping expr
+select count(distinct int_col) from functional.alltypes;
+---- QUERYOPTIONS
+SHUFFLE_DISTINCT_EXPRS=false
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  output: count(int_col)
+|
+04:AGGREGATE
+|  group by: int_col
+|
+03:EXCHANGE [HASH(int_col)]
+|
+01:AGGREGATE [STREAMING]
+|  group by: int_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+select count(distinct int_col) from functional.alltypes;
+---- QUERYOPTIONS
+# Distinct exprs in a aggregation without grouping is always shuffled by. Setting it to
+# true doesn't affect the plan.
+SHUFFLE_DISTINCT_EXPRS=true
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  output: count(int_col)
+|
+04:AGGREGATE
+|  group by: int_col
+|
+03:EXCHANGE [HASH(int_col)]
+|
+01:AGGREGATE [STREAMING]
+|  group by: int_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Distinct agg with a grouping expr
+select count(distinct int_col) from functional.alltypes group by year;
+---- QUERYOPTIONS
+SHUFFLE_DISTINCT_EXPRS=false
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(int_col)
+|  group by: year
+|
+04:AGGREGATE
+|  group by: year, int_col
+|
+03:EXCHANGE [HASH(year)]
+|
+01:AGGREGATE [STREAMING]
+|  group by: year, int_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+select count(distinct int_col) from functional.alltypes group by year;
+---- QUERYOPTIONS
+# Shuffling by distinct exprs will create 1 more exchange node and 1 more agg node.
+SHUFFLE_DISTINCT_EXPRS=true
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
+|  group by: year
+|
+05:EXCHANGE [HASH(year)]
+|
+02:AGGREGATE [STREAMING]
+|  output: count(int_col)
+|  group by: year
+|
+04:AGGREGATE
+|  group by: year, int_col
+|
+03:EXCHANGE [HASH(year,int_col)]
+|
+01:AGGREGATE [STREAMING]
+|  group by: year, int_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Distinct agg without a grouping expr and with a compatible child partition
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+    functional.alltypes b on a.int_col = b.int_col;
+---- QUERYOPTIONS
+SHUFFLE_DISTINCT_EXPRS=false
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(a.int_col)
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:AGGREGATE
+|  output: count(a.int_col)
+|
+03:AGGREGATE
+|  group by: a.int_col
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.int_col = b.int_col
+|  runtime filters: RF000 <- b.int_col
+|
+|--06:EXCHANGE [HASH(b.int_col)]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:EXCHANGE [HASH(a.int_col)]
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.int_col
+====
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+    functional.alltypes b on a.int_col = b.int_col;
+---- QUERYOPTIONS
+# Distinct exprs in a aggregation without grouping is always shuffled by. Setting it to
+# true doesn't affect the plan.
+SHUFFLE_DISTINCT_EXPRS=true
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(a.int_col)
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:AGGREGATE
+|  output: count(a.int_col)
+|
+03:AGGREGATE
+|  group by: a.int_col
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.int_col = b.int_col
+|  runtime filters: RF000 <- b.int_col
+|
+|--06:EXCHANGE [HASH(b.int_col)]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:EXCHANGE [HASH(a.int_col)]
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.int_col
+====
+# Distinct agg with a grouping expr and a compatible child partition
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+    functional.alltypes b on a.year = b.year group by a.year;
+---- QUERYOPTIONS
+SHUFFLE_DISTINCT_EXPRS=false
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:AGGREGATE [FINALIZE]
+|  output: count(a.int_col)
+|  group by: a.year
+|
+03:AGGREGATE
+|  group by: a.year, a.int_col
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.year = b.year
+|  runtime filters: RF000 <- b.year
+|
+|--06:EXCHANGE [HASH(b.year)]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:EXCHANGE [HASH(a.year)]
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.year
+====
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+    functional.alltypes b on a.year = b.year group by a.year;
+---- QUERYOPTIONS
+# Shuffling by distinct exprs will create 2 more exchange nodes and 2 more agg nodes.
+SHUFFLE_DISTINCT_EXPRS=true
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+11:EXCHANGE [UNPARTITIONED]
+|
+10:AGGREGATE [FINALIZE]
+|  output: count:merge(a.int_col)
+|  group by: a.year
+|
+09:EXCHANGE [HASH(a.year)]
+|
+04:AGGREGATE [STREAMING]
+|  output: count(a.int_col)
+|  group by: a.year
+|
+08:AGGREGATE
+|  group by: a.year, a.int_col
+|
+07:EXCHANGE [HASH(a.year,a.int_col)]
+|
+03:AGGREGATE [STREAMING]
+|  group by: a.year, a.int_col
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.year = b.year
+|  runtime filters: RF000 <- b.year
+|
+|--06:EXCHANGE [HASH(b.year)]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:EXCHANGE [HASH(a.year)]
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.year
+====
+# The input is partitioned by distinct exprs + grouping exprs
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+   functional.alltypes b on a.year = b.year and a.int_col = b.int_col group by a.year;
+---- QUERYOPTIONS
+# The input partition is compatible with grouping exprs + distinct exprs. Phase-1 merge
+# aggregation is skipped.
+SHUFFLE_DISTINCT_EXPRS=true
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+09:EXCHANGE [UNPARTITIONED]
+|
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(a.int_col)
+|  group by: a.year
+|
+07:EXCHANGE [HASH(a.year)]
+|
+04:AGGREGATE [STREAMING]
+|  output: count(a.int_col)
+|  group by: a.year
+|
+03:AGGREGATE
+|  group by: a.year, a.int_col
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.int_col = b.int_col, a.year = b.year
+|  runtime filters: RF000 <- b.int_col, RF001 <- b.year
+|
+|--06:EXCHANGE [HASH(b.int_col,b.year)]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:EXCHANGE [HASH(a.int_col,a.year)]
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.int_col, RF001 -> a.year
+====
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+   functional.alltypes b on a.year = b.year and a.int_col = b.int_col group by a.year;
+---- QUERYOPTIONS
+# The input partition is not compatible with grouping exprs. Phase-1 merge aggregation is
+# executed.
+SHUFFLE_DISTINCT_EXPRS=false
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+09:EXCHANGE [UNPARTITIONED]
+|
+04:AGGREGATE [FINALIZE]
+|  output: count(a.int_col)
+|  group by: a.year
+|
+08:AGGREGATE
+|  group by: a.year, a.int_col
+|
+07:EXCHANGE [HASH(a.year)]
+|
+03:AGGREGATE [STREAMING]
+|  group by: a.year, a.int_col
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.int_col = b.int_col, a.year = b.year
+|  runtime filters: RF000 <- b.int_col, RF001 <- b.year
+|
+|--06:EXCHANGE [HASH(b.int_col,b.year)]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:EXCHANGE [HASH(a.int_col,a.year)]
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.int_col, RF001 -> a.year
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/testdata/workloads/functional-query/queries/QueryTest/distinct.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/distinct.test b/testdata/workloads/functional-query/queries/QueryTest/distinct.test
index fb51584..0e65e53 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/distinct.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/distinct.test
@@ -370,3 +370,33 @@ SELECT COUNT(*) FROM
 ---- TYPES
 bigint
 ====
+---- QUERY
+# Distinct agg without a grouping expr and with a compatible child partition
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+    functional.alltypes b on a.int_col = b.int_col;
+---- RESULTS
+10
+---- TYPES
+bigint
+====
+---- QUERY
+# Distinct agg with a grouping expr. The input is partitioned by grouping exprs.
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+    functional.alltypes b on a.year = b.year group by a.year;
+---- RESULTS
+10
+10
+---- TYPES
+bigint
+====
+---- QUERY
+# Distinct agg with a grouping expr. The input is partitioned by grouping exprs and
+# distinct exprs.
+select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
+   functional.alltypes b on a.year = b.year and a.int_col = b.int_col group by a.year;
+---- RESULTS
+10
+10
+---- TYPES
+bigint
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/9a751f00/tests/query_test/test_aggregation.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py
index 97c0e41..daaa741 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -23,6 +23,7 @@ from testdata.common import widetable
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import (
     create_exec_option_dimension,
+    create_exec_option_dimension_from_dict,
     create_uncompressed_text_dimension)
 from tests.common.test_result_verifier import (
     assert_codegen_enabled,
@@ -198,14 +199,6 @@ class TestAggregationQueries(ImpalaTestSuite):
       pytest.xfail(reason="IMPALA-283 - select count(*) produces inconsistent results")
     self.run_test_case('QueryTest/aggregation', vector)
 
-  def test_distinct(self, vector):
-    if vector.get_value('table_format').file_format == 'hbase':
-      pytest.xfail("HBase returns columns in alphabetical order for select distinct *, "
-                   "making the result verication to fail.")
-    if vector.get_value('table_format').file_format == 'kudu':
-      pytest.xfail("IMPALA-4042: count(distinct NULL) fails on a view, needed for kudu")
-    self.run_test_case('QueryTest/distinct', vector)
-
   def test_group_concat(self, vector):
     """group_concat distinct tests
        Required to run directly in python because the order in which results will be
@@ -339,6 +332,38 @@ class TestAggregationQueries(ImpalaTestSuite):
       for i in xrange(14, 16):
         self.appx_equals(int(sampled_ndv_vals[i]) * sample_perc, int(ndv_vals[i]), 2.0)
 
+
+class TestDistinctAggregation(ImpalaTestSuite):
+  """Run the distinct aggregation test suite, with codegen and shuffle_distinct_exprs
+  enabled and disabled."""
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestDistinctAggregation, cls).add_test_dimensions()
+
+    cls.ImpalaTestMatrix.add_dimension(
+      create_exec_option_dimension_from_dict({
+        'disable_codegen': [False, True],
+        'shuffle_distinct_exprs': [False, True]
+      }))
+
+    if cls.exploration_strategy() == 'core':
+      cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'text' and
+        v.get_value('table_format').compression_codec == 'none')
+
+  def test_distinct(self, vector):
+    if vector.get_value('table_format').file_format == 'hbase':
+      pytest.xfail("HBase returns columns in alphabetical order for select distinct *, "
+                   "making the result verication to fail.")
+    if vector.get_value('table_format').file_format == 'kudu':
+      pytest.xfail("IMPALA-4042: count(distinct NULL) fails on a view, needed for kudu")
+    self.run_test_case('QueryTest/distinct', vector)
+
+
 class TestWideAggregationQueries(ImpalaTestSuite):
   """Test that aggregations with many grouping columns work"""
   @classmethod