You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/05/01 15:45:02 UTC

[1/4] impala git commit: Update version to 2.13.0-SNAPSHOT

Repository: impala
Updated Branches:
  refs/heads/2.x 9c32594f7 -> 955ad0833


Update version to 2.13.0-SNAPSHOT

Change-Id: Iac447c57a273e0c334e0e63d6e740c2a00d5c762
Reviewed-on: http://gerrit.cloudera.org:8080/10241
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1dc6dc32
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1dc6dc32
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1dc6dc32

Branch: refs/heads/2.x
Commit: 1dc6dc32021e1fd74028979ee634ac2cd8cd1357
Parents: 9c32594
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Mon Apr 30 10:04:59 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Apr 30 22:06:29 2018 +0000

----------------------------------------------------------------------
 bin/save-version.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/1dc6dc32/bin/save-version.sh
----------------------------------------------------------------------
diff --git a/bin/save-version.sh b/bin/save-version.sh
index 9ead5be..fc820fc 100755
--- a/bin/save-version.sh
+++ b/bin/save-version.sh
@@ -21,7 +21,7 @@
 # Note: for internal (aka pre-release) versions, the version should have
 # "-INTERNAL" appended. Parts of the code will look for this to distinguish
 # between released and internal versions.
-VERSION=2.12.0-SNAPSHOT
+VERSION=2.13.0-SNAPSHOT
 GIT_HASH=$(git rev-parse HEAD 2> /dev/null)
 if [ -z $GIT_HASH ]
 then


[2/4] impala git commit: IMPALA-6070: Further improvements to test-with-docker.

Posted by mi...@apache.org.
IMPALA-6070: Further improvements to test-with-docker.

This commit tackles a few additions and improvements to
test-with-docker. In general, I'm adding workloads (e.g., exhaustive,
rat-check), tuning memory setting and parallelism, and trying to speed
things up.

Bug fixes:

* Embarassingly, I was still skipping thrift-server-test in the backend
  tests. This was a mistake in handling feedback from my last review.

* I made the timeline a little bit taller to clip less.

Adding workloads:

* I added the RAT licensing check.

* I added exhaustive runs. This led me to model the suites a little
  bit more in Python, with a class representing a suite with a
  bunch of data about the suite. It's not perfect and still
  coupled with the entrypoint.sh shell script, but it feels
  workable. As part of adding exhaustive tests, I had
  to re-work the timeout handling, since now different
  suites meaningfully have different timeouts.

Speed ups:

* To speed up test runs, I added a mechanism to split py.test suites into
  multiple shards with a py.test argument. This involved a little bit of work in
  conftest.py, and exposing $RUN_CUSTOM_CLUSTER_TESTS_ARGS in run-all-tests.sh.

  Furthermore, I moved a bit more logic about managing the
  list of suites into Python.

* Doing the full build with "-notests" and only building
  the backend tests in the relevant target that needs them. This speeds
  up "docker commit" significantly by removing about 20GB from the
  container.  I had to indicates that expr-codegen-test depends on
  expr-codegen-test-ir, which was missing.

* I sped up copying the Kudu data: previously I did
  both a move and a copy; now I'm doing a move followed by a move. One
  of the moves is cross-filesystem so is slow, but this does half the
  amount of copying.

Memory usage:

* I tweaked the memlimit_gb settings to have a higher default. I've been
  fighting empirically to have the tests run well on c4.8xlarge and
  m4.10xlarge.

The more memory a minicluster and test suite run uses, the fewer parallel
suites we can run. By observing the peak processes at the tail of a run (with a
new "memory_usage" function that uses a ps/sort/awk trick) and by observing
peak container total_rss, I found that we had several JVMs that
didn't have Xmx settings set. I added Xms/Xmx settings in a few
places:

 * The non-first Impalad does very little JVM work, so having
   an Xmx keeps it small, even in the parallel tests.
 * Datanodes do work, but they essentially were never garbage
   collecting, because JVM defaults let them use up to 1/4th
   the machine memory. (I observed this based on RSS at the
   end of the run; nothing fancier.) Adding Xms/Xmx settings
   helped.
 * Similarly, I piped the settings through to HBase.

A few daemons still run without resource limitations, but they don't
seem to be a problem.

Change-Id: I43fe124f00340afa21ad1eeb6432d6d50151ca7c
Reviewed-on: http://gerrit.cloudera.org:8080/10123
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/10248
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d733ea68
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d733ea68
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d733ea68

Branch: refs/heads/2.x
Commit: d733ea68ca144798ff67054faf577dfdae0f201e
Parents: 1dc6dc3
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Fri Apr 6 10:16:39 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue May 1 02:46:38 2018 +0000

----------------------------------------------------------------------
 be/src/exprs/CMakeLists.txt                     |   1 +
 bin/run-all-tests.sh                            |   5 +-
 docker/entrypoint.sh                            | 173 +++++++----
 docker/monitor.py                               |  29 +-
 docker/test-with-docker.py                      | 296 ++++++++++++++-----
 docker/timeline.html.template                   |   9 +-
 testdata/bin/run-hbase.sh                       |   1 +
 .../common/etc/init.d/hdfs-common               |   7 +
 tests/conftest.py                               |  35 +++
 tests/run-tests.py                              |  19 +-
 10 files changed, 426 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/be/src/exprs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index cff391c..755c166 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -72,5 +72,6 @@ ADD_BE_TEST(expr-codegen-test)
 # expr-codegen-test includes test IR functions
 COMPILE_TO_IR(expr-codegen-test.cc)
 add_dependencies(expr-codegen-test-ir gen-deps)
+add_dependencies(expr-codegen-test expr-codegen-test-ir)
 
 ADD_UDF_TEST(aggregate-functions-test)

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/bin/run-all-tests.sh
----------------------------------------------------------------------
diff --git a/bin/run-all-tests.sh b/bin/run-all-tests.sh
index 4743a4f..7702134 100755
--- a/bin/run-all-tests.sh
+++ b/bin/run-all-tests.sh
@@ -57,6 +57,8 @@ fi
 : ${TEST_START_CLUSTER_ARGS:=}
 # Extra args to pass to run-tests.py
 : ${RUN_TESTS_ARGS:=}
+# Extra args to pass to run-custom-cluster-tests.sh
+: ${RUN_CUSTOM_CLUSTER_TESTS_ARGS:=}
 if [[ "${TARGET_FILESYSTEM}" == "local" ]]; then
   # TODO: Remove abort_on_config_error flag from here and create-load-data.sh once
   # checkConfiguration() accepts the local filesystem (see IMPALA-1850).
@@ -223,7 +225,8 @@ do
     # Run the custom-cluster tests after all other tests, since they will restart the
     # cluster repeatedly and lose state.
     # TODO: Consider moving in to run-tests.py.
-    if ! "${IMPALA_HOME}/tests/run-custom-cluster-tests.sh" ${COMMON_PYTEST_ARGS}; then
+    if ! "${IMPALA_HOME}/tests/run-custom-cluster-tests.sh" ${COMMON_PYTEST_ARGS} \
+        ${RUN_CUSTOM_CLUSTER_TESTS_ARGS}; then
       TEST_RET_CODE=1
     fi
     export IMPALA_MAX_LOG_FILES="${IMPALA_MAX_LOG_FILES_SAVE}"

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/docker/entrypoint.sh
----------------------------------------------------------------------
diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh
index d371d25..00b2ee6 100755
--- a/docker/entrypoint.sh
+++ b/docker/entrypoint.sh
@@ -90,12 +90,6 @@ function impala_environment() {
 function boot_container() {
   pushd /home/impdev/Impala
 
-  # Required for metastore
-  sudo service postgresql start
-
-  # Required for starting HBase
-  sudo service ssh start
-
   # Make log directories. This is typically done in buildall.sh.
   mkdir -p logs/be_tests logs/fe_tests/coverage logs/ee_tests logs/custom_cluster_tests
 
@@ -112,17 +106,37 @@ function boot_container() {
   echo Hosts file:
   cat /etc/hosts
 
-  # Make a copy of Kudu's WALs to avoid isue with Docker filesystems (aufs and
+  popd
+}
+
+function start_minicluster {
+  # The subshell here avoids the verbose output from set -x.
+  (echo ">>> Starting PostgreSQL and SSH") 2> /dev/null
+  pushd /home/impdev/Impala
+
+  # Required for metastore
+  sudo service postgresql start
+
+  # Required for starting HBase
+  sudo service ssh start
+
+  (echo ">>> Copying Kudu Data") 2> /dev/null
+  # Move around Kudu's WALs to avoid issue with Docker filesystems (aufs and
   # overlayfs) that don't support os.rename(2) on directories, which Kudu
   # requires. We make a fresh copy of the data, in which case rename(2) works
   # presumably because there's only one layer involved. See
   # https://issues.apache.org/jira/browse/KUDU-1419.
-  cd /home/impdev/Impala/testdata
+  set -x
+  pushd /home/impdev/Impala/testdata
   for x in cluster/cdh*/node-*/var/lib/kudu/*/wal; do
+    echo $x
+    # This mv takes time, as it's actually copying into the latest layer.
     mv $x $x-orig
-    cp -r $x-orig $x
-    rm -r $x-orig
+    mkdir $x
+    mv $x-orig/* $x
+    rmdir $x-orig
   done
+  popd
 
   # Wait for postgresql to really start; if it doesn't, Hive Metastore will fail to start.
   for i in {1..120}; do
@@ -135,6 +149,9 @@ function boot_container() {
   done
   sudo -u postgres psql -c "select 1"
 
+  (echo ">>> Starting mini cluster") 2> /dev/null
+  testdata/bin/run-all.sh
+
   popd
 }
 
@@ -164,8 +181,13 @@ function build_impdev() {
 
   # Builds Impala and loads test data.
   # Note that IMPALA-6494 prevents us from using shared library builds,
-  # which are smaller and thereby speed things up.
-  ./buildall.sh -noclean -format -testdata -skiptests
+  # which are smaller and thereby speed things up. We use "-notests"
+  # to avoid building backend tests, which are sizable, and
+  # can be built when executing those tests.
+  ./buildall.sh -noclean -format -testdata -notests
+
+  # Dump current memory usage to logs, before shutting things down.
+  memory_usage
 
   # Shut down things cleanly.
   testdata/bin/kill-all.sh
@@ -176,9 +198,34 @@ function build_impdev() {
   # Clean up things we don't need to reduce image size
   find be -name '*.o' -execdir rm '{}' + # ~1.6GB
 
+  # Clean up dangling symlinks. These (typically "cluster/cdh*-node-*")
+  # may point to something inside a container that no longer exists
+  # and can confuse Jenkins.
+  find /logs -xtype l -execdir rm '{}' ';'
+
   popd
 }
 
+# Prints top 20 RSS consumers (and other, total), in megabytes Common culprits
+# are Java processes without Xmx set. Since most things don't reclaim memory,
+# this is a decent proxy for peak memory usage by long-lived processes.
+function memory_usage() {
+  (
+  echo "Top 20 memory consumers (RSS in MBs)"
+  sudo ps -axho rss,args | \
+    sed -e 's/^ *//' | \
+    sed -e 's, ,\t,' | \
+    sort -nr | \
+    awk -F'\t' '
+    FNR < 20 { print $1/1024.0, $2; total += $1/1024.0 }
+    FNR >= 20 { other+= $1/1024.0; total += $1/1024.0 }
+    END {
+      if (other) { print other, "-- other --" };
+      print total, "-- total --"
+    }'
+  ) >& /logs/memory_usage.txt
+}
+
 # Runs a suite passed in as the first argument. Tightly
 # coupled with Impala's run-all-tests and the suite names.
 # from test-with-docker.py.
@@ -189,6 +236,8 @@ function test_suite() {
 
   # These test suites are for testing.
   if [[ $1 == NOOP ]]; then
+    # Sleep busily for 10 seconds.
+    bash -c 'while [[ $SECONDS -lt 10 ]]; do :; done'
     return 0
   fi
   if [[ $1 == NOOP_FAIL ]]; then
@@ -208,31 +257,9 @@ function test_suite() {
   boot_container
   impala_environment
 
-  # By default, the JVM will use 1/4 of your OS memory for its heap size. For a
-  # long-running test, this will delay GC inside of impalad's leading to
-  # unnecessarily large process RSS footprints. We cap the heap size at
-  # a more reasonable size.  Note that "test_insert_large_string" fails
-  # at 2g and 3g, so the suite that includes it (EE_TEST_PARALLEL) gets
-  # additional memory.
-  #
-  # Similarly, bin/start-impala-cluster typically configures the memlimit
-  # to be 80% of the machine memory, divided by the number of daemons.
-  # If multiple containers are to be run simultaneously, this is scaled
-  # down in test-with-docker.py (and further configurable with --impalad-mem-limit-bytes)
-  # and passed in via $IMPALAD_MEM_LIMIT_BYTES to the container. There is a
-  # relationship between the number of parallel tests that can be run by py.test and this
-  # limit.
-  JVM_HEAP_GB=2
-  if [[ $1 = EE_TEST_PARALLEL ]]; then
-    JVM_HEAP_GB=4
-  fi
-  export TEST_START_CLUSTER_ARGS="--jvm_args=-Xmx${JVM_HEAP_GB}g \
-    --impalad_args=--mem_limit=$IMPALAD_MEM_LIMIT_BYTES"
-
   # BE tests don't require the minicluster, so we can run them directly.
   if [[ $1 = BE_TEST ]]; then
-    # IMPALA-6494: thrift-server-test fails in Ubuntu16.04 for the moment; skip it.
-    export SKIP_BE_TEST_PATTERN='thrift-server-test*'
+    make -j$(nproc) --load-average=$(nproc) be-test be-benchmarks
     if ! bin/run-backend-tests.sh; then
       echo "Tests $1 failed!"
       return 1
@@ -242,37 +269,73 @@ function test_suite() {
     fi
   fi
 
+  if [[ $1 == RAT_CHECK ]]; then
+    # Runs Apache RAT (a license checker)
+    git archive --prefix=rat/ -o rat-impala.zip HEAD
+    wget --quiet https://archive.apache.org/dist/creadur/apache-rat-0.12/apache-rat-0.12-bin.tar.gz
+    tar xzf apache-rat-0.12-bin.tar.gz
+    java -jar apache-rat-0.12/apache-rat-0.12.jar -x rat-impala.zip > logs/rat.xml
+    bin/check-rat-report.py bin/rat_exclude_files.txt logs/rat.xml
+    return $?
+  fi
+
   # Start the minicluster
-  testdata/bin/run-all.sh
+  start_minicluster
 
-  export MAX_PYTEST_FAILURES=0
-  # Choose which suite to run; this is how run-all.sh chooses between them.
-  export FE_TEST=false
-  export BE_TEST=false
-  export EE_TEST=false
-  export JDBC_TEST=false
-  export CLUSTER_TEST=false
-
-  eval "export ${1}=true"
-
-  if [[ ${1} = "EE_TEST_SERIAL" ]]; then
-    # We bucket the stress tests with the parallel tests.
-    export RUN_TESTS_ARGS="--skip-parallel --skip-stress"
-    export EE_TEST=true
-  elif [[ ${1} = "EE_TEST_PARALLEL" ]]; then
-    export RUN_TESTS_ARGS="--skip-serial"
-    export EE_TEST=true
+  # By default, the JVM will use 1/4 of your OS memory for its heap size. For a
+  # long-running test, this will delay GC inside of impalad's leading to
+  # unnecessarily large process RSS footprints. To combat this, we
+  # set a small initial heap size, and then cap it at a more reasonable
+  # size. The small initial heap sizes help for daemons that do little
+  # in the way of JVM work (e.g., the 2nd and 3rd impalad's).
+  # Note that "test_insert_large_string" fails at 2g and 3g, so the suite that
+  # includes it (EE_TEST_PARALLEL) gets additional memory.
+
+  # Note that we avoid using TEST_START_CLUSTER_ARGS="--jvm-args=..."
+  # because it gets flattened along the way if we need to provide
+  # more than one Java argument. We use JAVA_TOOL_OPTIONS instead.
+  JVM_HEAP_MAX_GB=2
+  if [[ $1 = EE_TEST_PARALLEL ]]; then
+    JVM_HEAP_MAX_GB=4
+  elif [[ $1 = EE_TEST_PARALLEL_EXHAUSTIVE ]]; then
+    JVM_HEAP_MAX_GB=8
   fi
+  JAVA_TOOL_OPTIONS="-Xms512M -Xmx${JVM_HEAP_MAX_GB}G"
+
+  # Similarly, bin/start-impala-cluster typically configures the memlimit
+  # to be 80% of the machine memory, divided by the number of daemons.
+  # If multiple containers are to be run simultaneously, this is scaled
+  # down in test-with-docker.py (and further configurable with --impalad-mem-limit-bytes)
+  # and passed in via $IMPALAD_MEM_LIMIT_BYTES to the container. There is a
+  # relationship between the number of parallel tests that can be run by py.test and this
+  # limit.
+  export TEST_START_CLUSTER_ARGS="--impalad_args=--mem_limit=$IMPALAD_MEM_LIMIT_BYTES"
+
+  export MAX_PYTEST_FAILURES=0
+
+  # Asserting that these should are all set (to either true or false as strings).
+  # This is how run-all.sh chooses between them.
+  [[ $FE_TEST && $BE_TEST && $EE_TEST && $JDBC_TEST && $CLUSTER_TEST ]]
 
   ret=0
 
+  if [[ ${EE_TEST} = true ]]; then
+    # test_insert_parquet.py depends on this binary
+    make -j$(nproc) --load-average=$(nproc) parquet-reader
+  fi
+
   # Run tests.
-  if ! time -p bin/run-all-tests.sh; then
+  (echo ">>> $1: Starting run-all-test") 2> /dev/null
+  if ! time -p bash -x bin/run-all-tests.sh; then
     ret=1
     echo "Tests $1 failed!"
   else
     echo "Tests $1 succeeded!"
   fi
+
+  # Save memory usage after tests have run but before shutting down the cluster.
+  memory_usage || true
+
   # Oddly, I've observed bash fail to exit (and wind down the container),
   # leading to test-with-docker.py hitting a timeout. Killing the minicluster
   # daemons fixes this.
@@ -312,6 +375,8 @@ function main() {
   shift
 
   echo ">>> ${CMD} $@ (begin)"
+  # Dump environment, for debugging
+  env | grep -vE "AWS_(SECRET_)?ACCESS_KEY"
   set -x
   if "${CMD}" "$@"; then
     ret=0

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/docker/monitor.py
----------------------------------------------------------------------
diff --git a/docker/monitor.py b/docker/monitor.py
index 64cde0c..58e2a83 100644
--- a/docker/monitor.py
+++ b/docker/monitor.py
@@ -232,28 +232,37 @@ class Timeline(object):
         if self.interesting_re.search(line)]
     return [(container.name,) + split_timestamp(line) for line in interesting_lines]
 
-  @staticmethod
-  def parse_metrics(f):
+  def parse_metrics(self, f):
     """Parses timestamped metric lines.
 
     Given metrics lines like:
 
     2017-10-25 10:08:30.961510 87d5562a5fe0ea075ebb2efb0300d10d23bfa474645bb464d222976ed872df2a cpu user 33 system 15
 
-    Returns an iterable of (ts, container, user_cpu, system_cpu)
+    Returns an iterable of (ts, container, user_cpu, system_cpu). It also updates
+    container.peak_total_rss and container.total_user_cpu and container.total_system_cpu.
     """
     prev_by_container = {}
+    peak_rss_by_container = {}
     for line in f:
       ts, rest = split_timestamp(line.rstrip())
+      total_rss = None
       try:
         container, metric_type, rest2 = rest.split(" ", 2)
-        if metric_type != "cpu":
-          continue
-        _, user_cpu_s, _, system_cpu_s = rest2.split(" ", 3)
+        if metric_type == "cpu":
+          _, user_cpu_s, _, system_cpu_s = rest2.split(" ", 3)
+        elif metric_type == "memory":
+          memory_metrics = rest2.split(" ")
+          total_rss = int(memory_metrics[memory_metrics.index("total_rss") + 1 ])
       except:
         logging.warning("Skipping metric line: %s", line)
         continue
 
+      if total_rss is not None:
+        peak_rss_by_container[container] = max(peak_rss_by_container.get(container, 0),
+            total_rss)
+        continue
+
       prev_ts, prev_user, prev_system = prev_by_container.get(
           container, (None, None, None))
       user_cpu = int(user_cpu_s)
@@ -267,6 +276,14 @@ class Timeline(object):
               (system_cpu - prev_system)/dt/USER_HZ
       prev_by_container[container] = ts, user_cpu, system_cpu
 
+    # Now update container totals
+    for c in self.containers:
+      if c.id in prev_by_container:
+        _, u, s = prev_by_container[c.id]
+        c.total_user_cpu, c.total_system_cpu = u / USER_HZ, s / USER_HZ
+      if c.id in peak_rss_by_container:
+        c.peak_total_rss = peak_rss_by_container[c.id]
+
   def create(self, output):
     # Read logfiles
     timelines = []

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/docker/test-with-docker.py
----------------------------------------------------------------------
diff --git a/docker/test-with-docker.py b/docker/test-with-docker.py
index 59640b4..c3f4427 100755
--- a/docker/test-with-docker.py
+++ b/docker/test-with-docker.py
@@ -95,14 +95,10 @@ as part of the build, in logs/docker/*/timeline.html.
 # Suggested speed improvement TODOs:
 #   - Speed up testdata generation
 #   - Skip generating test data for variants not being run
-#   - Make container image smaller; perhaps make BE test binaries
-#     smaller
-#   - Split up cluster tests into two groups
+#   - Make container image smaller
 #   - Analyze .xml junit files to find slow tests; eradicate
 #     or move to different suite.
-#   - Avoid building BE tests, and build them during execution,
-#     saving on container space as well as baseline build
-#     time.
+#   - Run BE tests earlier (during data load)
 
 # We do not use Impala's python environment here, nor do we depend on
 # non-standard python libraries to avoid needing extra build steps before
@@ -125,11 +121,11 @@ if __name__ == '__main__' and __package__ is None:
 
 base = os.path.dirname(os.path.abspath(__file__))
 
+LOG_FORMAT="%(asctime)s %(threadName)s: %(message)s"
 
-def main():
 
-  logging.basicConfig(level=logging.INFO,
-                      format='%(asctime)s %(threadName)s: %(message)s')
+def main():
+  logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
 
   default_parallel_test_concurrency, default_suite_concurrency, default_memlimit_gb = \
       _compute_defaults()
@@ -160,20 +156,25 @@ def main():
   parser.add_argument(
       '--build-image', metavar='IMAGE',
       help='Skip building, and run tests on pre-existing image.')
-  parser.add_argument(
+
+  suite_group = parser.add_mutually_exclusive_group()
+  suite_group.add_argument(
       '--suite', metavar='VARIANT', action='append',
-      help="Run specific test suites; can be specified multiple times. \
-          If not specified, all tests are run. Choices: " + ",".join(ALL_SUITES))
+      help="""
+        Run specific test suites; can be specified multiple times.
+        Test-with-docker may shard some suites to improve parallelism.
+        If not specified, default tests are run.
+        Default: %s, All Choices: %s
+        """ % (",".join([ s.name for s in DEFAULT_SUITES]),
+          ",".join([ s.name for s in ALL_SUITES ])))
+  suite_group.add_argument('--all-suites', action='store_true', default=False,
+      help="If set, run all available suites.")
   parser.add_argument(
       '--name', metavar='NAME',
       help="Use a specific name for the test run. The name is used " +
       "as a prefix for the container and image names, and " +
       "as part of the log directory naming. Defaults to include a timestamp.",
       default=datetime.datetime.now().strftime("i-%Y%m%d-%H%M%S"))
-  parser.add_argument('--timeout', metavar='MINUTES',
-                      help="Timeout for test suites, in minutes.",
-                      type=int,
-                      default=60*2)
   parser.add_argument('--ccache-dir', metavar='DIR',
                       help="CCache directory to use",
                       default=os.path.expanduser("~/.ccache"))
@@ -181,22 +182,28 @@ def main():
   args = parser.parse_args()
 
   if not args.suite:
-    args.suite = ALL_SUITES
+    if args.all_suites:
+      # Ignore "NOOP" tasks, as they are just for testing.
+      args.suite = [ s.name for s in ALL_SUITES if not s.name.startswith("NOOP") ]
+    else:
+      args.suite = [ s.name for s in DEFAULT_SUITES ]
   t = TestWithDocker(
-      build_image=args.build_image, suites=args.suite,
-      name=args.name, timeout=args.timeout, cleanup_containers=args.cleanup_containers,
+      build_image=args.build_image, suite_names=args.suite,
+      name=args.name, cleanup_containers=args.cleanup_containers,
       cleanup_image=args.cleanup_image, ccache_dir=args.ccache_dir, test_mode=args.test,
       parallel_test_concurrency=args.parallel_test_concurrency,
       suite_concurrency=args.suite_concurrency,
       impalad_mem_limit_bytes=args.impalad_mem_limit_bytes)
 
-  logging.getLogger('').addHandler(
-      logging.FileHandler(os.path.join(_make_dir_if_not_exist(t.log_dir), "log.txt")))
+  fh = logging.FileHandler(os.path.join(_make_dir_if_not_exist(t.log_dir), "log.txt"))
+  fh.setFormatter(logging.Formatter(LOG_FORMAT))
+  logging.getLogger('').addHandler(fh)
 
   logging.info("Arguments: %s", args)
 
   ret = t.run()
   t.create_timeline()
+  t.log_summary()
 
   if not ret:
     sys.exit(1)
@@ -229,10 +236,11 @@ def _compute_defaults():
   logging.info("CPUs: %s Memory (GB): %s", cpus, total_memory_gb)
 
   parallel_test_concurrency = min(cpus, 8)
-  memlimit_gb = 7
+  memlimit_gb = 8
 
   if total_memory_gb >= 95:
     suite_concurrency = 4
+    memlimit_gb = 11
     parallel_test_concurrency = min(cpus, 12)
   elif total_memory_gb >= 65:
     suite_concurrency = 3
@@ -244,19 +252,100 @@ def _compute_defaults():
 
   return parallel_test_concurrency, suite_concurrency, memlimit_gb * 1024 * 1024 * 1024
 
+class Suite(object):
+  """Encapsulates a test suite.
+
+  A test suite is a named thing that the user can select to run,
+  and it runs in its own container, in parallel with other suites.
+  The actual running happens from entrypoint.sh and is controlled
+  mostly by environment variables. When complexity is easier
+  to handle in Python (with its richer data types), we prefer
+  it here.
+  """
+  def __init__(self, name, **envs):
+    """Create suite with given name and environment."""
+    self.name = name
+    self.envs = dict(
+        FE_TEST="false",
+        BE_TEST="false",
+        EE_TEST="false",
+        JDBC_TEST="false",
+        CLUSTER_TEST="false")
+    # If set, this suite is sharded past a certain suite concurrency threshold.
+    self.shard_at_concurrency = None
+    # Variable to which to append --shard_tests
+    self.sharding_variable = None
+    self.envs[name] = "true"
+    self.envs.update(envs)
+    self.timeout_minutes = 120
+
+  def copy(self, name, **envs):
+    """Duplicates current suite allowing for environment updates."""
+    v = dict()
+    v.update(self.envs)
+    v.update(envs)
+    ret = Suite(name, **v)
+    ret.shard_at_concurrency = self.shard_at_concurrency
+    ret.sharding_variable = self.sharding_variable
+    ret.timeout_minutes = self.timeout_minutes
+    return ret
+
+  def exhaustive(self):
+    """Returns an "exhaustive" copy of the suite."""
+    r = self.copy(self.name + "_EXHAUSTIVE", EXPLORATION_STRATEGY="exhaustive")
+    r.timeout_minutes = 240
+    return r
+
+  def sharded(self, shards):
+    """Returns a list of sharded copies of the list.
+
+    key is the name of the variable which needs to be appended with "--shard-tests=..."
+    """
+    # RUN_TESTS_ARGS
+    ret = []
+    for i in range(1, shards + 1):
+      s = self.copy("%s_%d_of_%d" % (self.name, i, shards))
+      s.envs[self.sharding_variable] = self.envs.get(self.sharding_variable, "") \
+          + " --shard_tests=%s/%s" % (i, shards)
+      ret.append(s)
+    return ret
 
-# The names of all the test tracks supported.  NOOP isn't included here, but is
-# handy for testing.  These are organized slowest-to-fastest, so that, when
-# parallelism of suites is limited, the total time is not impacted.
-ALL_SUITES = [
-    "EE_TEST_SERIAL",
-    "EE_TEST_PARALLEL",
-    "CLUSTER_TEST",
-    "BE_TEST",
-    "FE_TEST",
-    "JDBC_TEST",
+# Definitions of all known suites:
+ee_test_serial = Suite("EE_TEST_SERIAL", EE_TEST="true",
+    RUN_TESTS_ARGS="--skip-parallel --skip-stress")
+ee_test_serial.shard_at_concurrency = 4
+ee_test_serial.sharding_variable = "RUN_TESTS_ARGS"
+ee_test_serial_exhaustive = ee_test_serial.exhaustive()
+ee_test_parallel = Suite("EE_TEST_PARALLEL", EE_TEST="true",
+    RUN_TESTS_ARGS="--skip-serial")
+ee_test_parallel_exhaustive = ee_test_parallel.exhaustive()
+cluster_test = Suite("CLUSTER_TEST")
+cluster_test.shard_at_concurrency = 4
+cluster_test.sharding_variable = "RUN_CUSTOM_CLUSTER_TESTS_ARGS"
+cluster_test_exhaustive = cluster_test.exhaustive()
+
+# Default supported suites. These are organized slowest-to-fastest, so that,
+# when parallelism is limited, the total time is least impacted.
+DEFAULT_SUITES = [
+    ee_test_serial,
+    ee_test_parallel,
+    cluster_test,
+    Suite("BE_TEST"),
+    Suite("FE_TEST"),
+    Suite("JDBC_TEST")
 ]
 
+OTHER_SUITES = [
+    ee_test_parallel_exhaustive,
+    ee_test_serial_exhaustive,
+    cluster_test_exhaustive,
+    Suite("RAT_CHECK"),
+    # These are used for testing this script
+    Suite("NOOP"),
+    Suite("NOOP_FAIL"),
+    Suite("NOOP_SLEEP_FOREVER")
+]
+ALL_SUITES = DEFAULT_SUITES + OTHER_SUITES
 
 def _call(args, check=True):
   """Wrapper for calling a subprocess.
@@ -297,6 +386,12 @@ class Container(object):
     self.running = running
     self.start = None
     self.end = None
+    self.removed = False
+
+    # Updated by Timeline class
+    self.total_user_cpu = -1
+    self.total_system_cpu = -1
+    self.peak_total_rss = -1
 
   def runtime_seconds(self):
     if self.start and self.end:
@@ -311,15 +406,13 @@ class Container(object):
 class TestWithDocker(object):
   """Tests Impala using Docker containers for parallelism."""
 
-  def __init__(self, build_image, suites, name, timeout, cleanup_containers,
+  def __init__(self, build_image, suite_names, name, cleanup_containers,
                cleanup_image, ccache_dir, test_mode,
                suite_concurrency, parallel_test_concurrency,
                impalad_mem_limit_bytes):
     self.build_image = build_image
-    self.suites = [TestSuiteRunner(self, suite) for suite in suites]
     self.name = name
     self.containers = []
-    self.timeout_minutes = timeout
     self.git_root = _check_output(["git", "rev-parse", "--show-toplevel"]).strip()
     self.cleanup_containers = cleanup_containers
     self.cleanup_image = cleanup_image
@@ -336,6 +429,26 @@ class TestWithDocker(object):
     self.parallel_test_concurrency = parallel_test_concurrency
     self.impalad_mem_limit_bytes = impalad_mem_limit_bytes
 
+    # Map suites back into objects; we ignore case for this mapping.
+    suites = []
+    suites_by_name = {}
+    for suite in ALL_SUITES:
+      suites_by_name[suite.name.lower()] = suite
+    for suite_name in suite_names:
+      suites.append(suites_by_name[suite_name.lower()])
+
+    # If we have enough concurrency, shard some suites into two halves.
+    suites2 = []
+    for suite in suites:
+      if suite.shard_at_concurrency is not None and \
+          suite_concurrency >= suite.shard_at_concurrency:
+        suites2.extend(suite.sharded(2))
+      else:
+        suites2.append(suite)
+    suites = suites2
+
+    self.suite_runners = [TestSuiteRunner(self, suite) for suite in suites]
+
   def _create_container(self, image, name, logdir, logname, entrypoint, extras=None):
     """Returns a new container.
 
@@ -374,8 +487,10 @@ class TestWithDocker(object):
         + extras
         + [image]
         + entrypoint).strip()
-    return Container(name=name, id_=container_id,
+    ctr = Container(name=name, id_=container_id,
                      logfile=os.path.join(self.log_dir, logdir, logname))
+    logging.info("Created container %s", ctr)
+    return ctr
 
   def _run_container(self, container):
     """Runs container, and returns True if the container had a successful exit value.
@@ -413,15 +528,17 @@ class TestWithDocker(object):
   @staticmethod
   def _stop_container(container):
     """Stops container. Ignores errors (e.g., if it's already exited)."""
-    _call(["docker", "stop", container.id], check=False)
     if container.running:
+      _call(["docker", "stop", container.id], check=False)
       container.end = time.time()
       container.running = False
 
   @staticmethod
   def _rm_container(container):
     """Removes container."""
-    _call(["docker", "rm", container.id], check=False)
+    if not container.removed:
+      _call(["docker", "rm", container.id], check=False)
+      container.removed = True
 
   def _create_build_image(self):
     """Creates the "build image", with Impala compiled and data loaded."""
@@ -451,36 +568,36 @@ class TestWithDocker(object):
         self._rm_container(container)
 
   def _run_tests(self):
-    start_time = time.time()
-    timeout_seconds = self.timeout_minutes * 60
-    deadline = start_time + timeout_seconds
     pool = multiprocessing.pool.ThreadPool(processes=self.suite_concurrency)
     outstanding_suites = []
-    for suite in self.suites:
+    for suite in self.suite_runners:
       suite.task = pool.apply_async(suite.run)
       outstanding_suites.append(suite)
 
     ret = True
-    while time.time() < deadline and len(outstanding_suites) > 0:
-      for suite in list(outstanding_suites):
-        task = suite.task
-        if task.ready():
-          this_task_ret = task.get()
-          outstanding_suites.remove(suite)
-          if this_task_ret:
-            logging.info("Suite %s succeeded.", suite.name)
-          else:
-            logging.info("Suite %s failed.", suite.name)
-            ret = False
-      time.sleep(10)
-    if len(outstanding_suites) > 0:
-      for container in self.containers:
-        self._stop_container(container)
-      for suite in outstanding_suites:
-        suite.task.get()
-      raise Exception("Tasks not finished within timeout (%s minutes): %s" %
-                      (self.timeout_minutes, ",".join([
-                          suite.name for suite in outstanding_suites])))
+    try:
+      while len(outstanding_suites) > 0:
+        for suite in list(outstanding_suites):
+          if suite.timed_out():
+            msg = "Task %s not finished within timeout %s" % (suite.name,
+                suite.suite.timeout_minutes,)
+            logging.error(msg)
+            raise Exception(msg)
+          task = suite.task
+          if task.ready():
+            this_task_ret = task.get()
+            outstanding_suites.remove(suite)
+            if this_task_ret:
+              logging.info("Suite %s succeeded.", suite.name)
+            else:
+              logging.info("Suite %s failed.", suite.name)
+              ret = False
+        time.sleep(5)
+    except KeyboardInterrupt:
+      logging.info("\n\nDetected KeyboardInterrupt; shutting down!\n\n")
+      raise
+    finally:
+      pool.terminate()
     return ret
 
   def run(self):
@@ -495,17 +612,13 @@ class TestWithDocker(object):
       else:
         self.image = self.build_image
       ret = self._run_tests()
-      logging.info("Containers:")
-      for c in self.containers:
-        def to_success_string(exitcode):
-          if exitcode == 0:
-            return "SUCCESS"
-          return "FAILURE"
-        logging.info("%s %s %s %s", to_success_string(c.exitcode), c.name, c.logfile,
-                     c.runtime_seconds())
       return ret
     finally:
       self.monitor.stop()
+      if self.cleanup_containers:
+        for c in self.containers:
+          self._stop_container(c)
+          self._rm_container(c)
       if self.cleanup_image and self.image:
         _call(["docker", "rmi", self.image], check=False)
       logging.info("Memory usage: %s GB min, %s GB max",
@@ -526,6 +639,23 @@ class TestWithDocker(object):
         interesting_re=self._INTERESTING_RE)
     timeline.create(os.path.join(self.log_dir, "timeline.html"))
 
+  def log_summary(self):
+    logging.info("Containers:")
+    def to_success_string(exitcode):
+      if exitcode == 0:
+        return "SUCCESS"
+      return "FAILURE"
+
+    for c in self.containers:
+      logging.info("%s %s %s %0.1fm wall, %0.1fm user, %0.1fm system, " +
+            "%0.1fx parallelism, %0.1f GB peak RSS",
+          to_success_string(c.exitcode), c.name, c.logfile,
+          c.runtime_seconds() / 60.0,
+          c.total_user_cpu / 60.0,
+          c.total_system_cpu / 60.0,
+          (c.total_user_cpu + c.total_system_cpu) / max(c.runtime_seconds(), 0.0001),
+          c.peak_total_rss / 1024.0 / 1024.0 / 1024.0)
+
 
 class TestSuiteRunner(object):
   """Runs a single test suite."""
@@ -534,12 +664,24 @@ class TestSuiteRunner(object):
     self.test_with_docker = test_with_docker
     self.suite = suite
     self.task = None
-    self.name = self.suite.lower()
+    self.name = suite.name.lower()
+    # Set at the beginning of run and facilitates enforcing timeouts
+    # for individual suites.
+    self.deadline = None
+
+  def timed_out(self):
+    return self.deadline is not None and time.time() > self.deadline
 
   def run(self):
     """Runs given test. Returns true on success, based on exit code."""
+    self.deadline = time.time() + self.suite.timeout_minutes * 60
     test_with_docker = self.test_with_docker
     suite = self.suite
+    envs = ["-e", "NUM_CONCURRENT_TESTS=" + str(test_with_docker.parallel_test_concurrency)]
+    for k, v in sorted(suite.envs.iteritems()):
+      envs.append("-e")
+      envs.append("%s=%s" % (k, v))
+
     self.start = time.time()
 
     # io-file-mgr-test expects a real-ish file system at /tmp;
@@ -554,13 +696,11 @@ class TestSuiteRunner(object):
         name=container_name,
         extras=[
             "-v", tmpdir + ":/tmp",
-            "-u", str(os.getuid()),
-            "-e", "NUM_CONCURRENT_TESTS=" +
-            str(test_with_docker.parallel_test_concurrency),
-        ],
+            "-u", str(os.getuid())
+        ] + envs,
         logdir=self.name,
-        logname="log-test-" + self.suite + ".txt",
-        entrypoint=["/mnt/base/entrypoint.sh", "test_suite", suite])
+        logname="log-test-" + self.suite.name + ".txt",
+        entrypoint=["/mnt/base/entrypoint.sh", "test_suite", suite.name])
 
     test_with_docker.containers.append(container)
     test_with_docker.monitor.add(container)
@@ -569,7 +709,7 @@ class TestSuiteRunner(object):
     except:
       return False
     finally:
-      logging.info("Cleaning up containers for %s" % (suite,))
+      logging.info("Cleaning up containers for %s" % (suite.name,))
       test_with_docker._stop_container(container)
       if test_with_docker.cleanup_containers:
         test_with_docker._rm_container(container)

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/docker/timeline.html.template
----------------------------------------------------------------------
diff --git a/docker/timeline.html.template b/docker/timeline.html.template
index c8de821..d2960d7 100644
--- a/docker/timeline.html.template
+++ b/docker/timeline.html.template
@@ -96,9 +96,7 @@ function ts_to_date(secs) {
 }
 
 function drawChart() {
-  var container = document.getElementById('container');
-  var timelineContainer = document.createElement("div");
-  container.appendChild(timelineContainer);
+  var timelineContainer = document.getElementById('timelineContainer');
   var chart = new google.visualization.Timeline(timelineContainer);
   var dataTable = new google.visualization.DataTable();
   dataTable.addColumn({ type: 'string', id: 'Position' });
@@ -115,7 +113,7 @@ function drawChart() {
 
   for (const k of Object.keys(data.metrics)) {
     var lineChart = document.createElement("div");
-    container.appendChild(lineChart);
+    lineChartContainer.appendChild(lineChart);
 
     var dataTable = new google.visualization.DataTable();
     dataTable.addColumn({ type: 'timeofday', id: 'Time' });
@@ -139,4 +137,5 @@ function drawChart() {
   }
 }
 </script>
-<div id="container" style="height: 200px;"></div>
+<div id="timelineContainer" style="height: 400px;"></div>
+<div id="lineChartContainer" style="height: 200px;"></div>

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/testdata/bin/run-hbase.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/run-hbase.sh b/testdata/bin/run-hbase.sh
index f264b65..241ccd2 100755
--- a/testdata/bin/run-hbase.sh
+++ b/testdata/bin/run-hbase.sh
@@ -36,6 +36,7 @@ cat > ${HBASE_CONF_DIR}/hbase-env.sh <<EOF
 export JAVA_HOME=${JAVA_HOME}
 export HBASE_LOG_DIR=${HBASE_LOGDIR}
 export HBASE_PID_DIR=${HBASE_LOGDIR}
+export HBASE_HEAPSIZE=1g
 EOF
 
 # Put zookeeper things in the logs/cluster/zoo directory.

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/testdata/cluster/node_templates/common/etc/init.d/hdfs-common
----------------------------------------------------------------------
diff --git a/testdata/cluster/node_templates/common/etc/init.d/hdfs-common b/testdata/cluster/node_templates/common/etc/init.d/hdfs-common
index 9a7ddd3..d53ecce 100644
--- a/testdata/cluster/node_templates/common/etc/init.d/hdfs-common
+++ b/testdata/cluster/node_templates/common/etc/init.d/hdfs-common
@@ -18,3 +18,10 @@
 export HADOOP_LOG_DIR="$LOG_DIR/hadoop-hdfs"
 export HADOOP_ROOT_LOGGER="${HADOOP_ROOT_LOGGER:-INFO,RFA}"
 export HADOOP_LOGFILE=$(basename $0).log
+
+# Force minicluster processes to have a maximum heap.
+# If unset, on large machines, the JVM default
+# is 1/4th of the RAM (or so), and processes like DataNode
+# end up never garbage collecting.
+export HADOOP_HEAPSIZE_MIN=512m
+export HADOOP_HEAPSIZE_MAX=2g

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/tests/conftest.py
----------------------------------------------------------------------
diff --git a/tests/conftest.py b/tests/conftest.py
index 144fc5c..1e6adda 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -120,6 +120,10 @@ def pytest_addoption(parser):
                    default=False, help="Run all tests with KRPC disabled. This assumes "
                    "that the test cluster has been started with --disable_krpc.")
 
+  parser.addoption("--shard_tests", default=None,
+                   help="If set to N/M (e.g., 3/5), will split the tests into "
+                   "M partitions and run the Nth partition. 1-indexed.")
+
 
 def pytest_assertrepr_compare(op, left, right):
   """
@@ -501,3 +505,34 @@ def validate_pytest_config():
     if any(pytest.config.option.impalad.startswith(loc) for loc in local_prefixes):
       logging.error("--testing_remote_cluster can not be used with a local impalad")
       pytest.exit("Invalid pytest config option: --testing_remote_cluster")
+
+
+@pytest.hookimpl(trylast=True)
+def pytest_collection_modifyitems(items, config, session):
+  """Hook to handle --shard_tests command line option.
+
+  If set, this "deselects" a subset of tests, by hashing
+  their id into buckets.
+  """
+  if not config.option.shard_tests:
+    return
+
+  num_items = len(items)
+  this_shard, num_shards = map(int, config.option.shard_tests.split("/"))
+  assert 0 <= this_shard <= num_shards
+  if this_shard == num_shards:
+    this_shard = 0
+
+  items_selected, items_deselected = [], []
+  for i in items:
+    if hash(i.nodeid) % num_shards == this_shard:
+      items_selected.append(i)
+    else:
+      items_deselected.append(i)
+  config.hook.pytest_deselected(items=items_deselected)
+
+  # We must modify the items list in place for it to take effect.
+  items[:] = items_selected
+
+  logging.info("pytest shard selection enabled %s. Of %d items, selected %d items by hash.",
+      config.option.shard_tests, num_items, len(items))

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/tests/run-tests.py
----------------------------------------------------------------------
diff --git a/tests/run-tests.py b/tests/run-tests.py
index 95e0d11..b1a9fdd 100755
--- a/tests/run-tests.py
+++ b/tests/run-tests.py
@@ -180,6 +180,10 @@ def build_test_args(base_name, valid_dirs=VALID_TEST_DIRS):
     #
     explicit_tests = pytest.config.getoption(FILE_OR_DIR)
     config_options = [arg for arg in commandline_args if arg not in explicit_tests]
+    # We also want to strip out any --shard_tests option and its corresponding value.
+    while "--shard_tests" in config_options:
+      i = config_options.index("--shard_tests")
+      del config_options[i:i+2]
     test_args = ignored_dirs + logging_args + config_options
 
   return test_args
@@ -237,6 +241,11 @@ if __name__ == "__main__":
     test_executor.run_tests(sys.argv[1:])
     sys.exit(0)
 
+  def run(args):
+    """Helper to print out arguments of test_executor before invoking."""
+    print "Running TestExecutor with args: %s" % (args,)
+    test_executor.run_tests(args)
+
   os.chdir(TEST_DIR)
 
   # Create the test result directory if it doesn't already exist.
@@ -248,25 +257,25 @@ if __name__ == "__main__":
   # pytest warnings/messages and displays collected tests
 
   if '--collect-only' in sys.argv:
-    test_executor.run_tests(sys.argv[1:])
+    run(sys.argv[1:])
   else:
     print_metrics('connections')
     # First run query tests that need to be executed serially
     if not skip_serial:
       base_args = ['-m', 'execute_serially']
-      test_executor.run_tests(base_args + build_test_args('serial'))
+      run(base_args + build_test_args('serial'))
       print_metrics('connections')
 
     # Run the stress tests tests
     if not skip_stress:
       base_args = ['-m', 'stress', '-n', NUM_STRESS_CLIENTS]
-      test_executor.run_tests(base_args + build_test_args('stress'))
+      run(base_args + build_test_args('stress'))
       print_metrics('connections')
 
     # Run the remaining query tests in parallel
     if not skip_parallel:
       base_args = ['-m', 'not execute_serially and not stress', '-n', NUM_CONCURRENT_TESTS]
-      test_executor.run_tests(base_args + build_test_args('parallel'))
+      run(base_args + build_test_args('parallel'))
 
     # The total number of tests executed at this point is expected to be >0
     # If it is < 0 then the script needs to exit with a non-zero
@@ -277,7 +286,7 @@ if __name__ == "__main__":
     # Finally, validate impalad/statestored metrics.
     args = build_test_args(base_name='verify-metrics', valid_dirs=['verifiers'])
     args.append('verifiers/test_verify_metrics.py')
-    test_executor.run_tests(args)
+    run(args)
 
   if test_executor.tests_failed:
     sys.exit(1)


[4/4] impala git commit: IMPALA-6821: Push down limits into Kudu

Posted by mi...@apache.org.
IMPALA-6821: Push down limits into Kudu

This patch takes advantage of a recent change in Kudu (KUDU-16) that
exposes the ability to set limits on KuduScanners. Since each
KuduScanner corresponds to a scan token, and there will be multiple
scan tokens per query, this is just a performance optimization in
cases where the limit is smaller than the number of rows per token,
and Impala still needs to apply the limit on our side for cases where
the limit is greater than the number of rows per token.

Testing:
- Added e2e tests for various situations where limits are applied at
  a Kudu scan node.
- For the query 'select * from tpch_kudu.lineitem limit 1', a best
  case perf scenario for this change where the limit is highly
  effective, the time spent in the Kudu scan node was reduced from
  6.107ms to 3.498ms (avg over 3 runs).
- For the query 'select count(*) from (select * from
  tpch_kudu.lineitem limit 1000000) v', a worst case perf scenario for
  this change where the limit is ineffective, the time spent in the
  Kudu scan node was essentially unchanged, 32.815ms previously vs.
  29.532ms (avg over 3 runs).

Change-Id: Ibe35e70065d8706b575e24fe20902cd405b49941
Reviewed-on: http://gerrit.cloudera.org:8080/10119
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/955ad083
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/955ad083
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/955ad083

Branch: refs/heads/2.x
Commit: 955ad0833fdbe61ebb29d32e9b04757b467917be
Parents: 6472ccd
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Fri Apr 13 20:38:16 2018 +0000
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue May 1 03:10:15 2018 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scanner.cc                     |   5 +
 .../queries/QueryTest/kudu_limit.test           | 112 +++++++++++++++++++
 tests/query_test/test_kudu.py                   |   3 +
 3 files changed, 120 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/955ad083/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 3bc4441..9e90bdb 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -226,6 +226,11 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) {
     }
   }
 
+  if (scan_node_->limit() != -1 && conjunct_evals_.empty()) {
+    KUDU_RETURN_IF_ERROR(scanner_->SetLimit(scan_node_->limit()),
+        "Failed to set limit on scan.");
+  }
+
   {
     SCOPED_TIMER(state_->total_storage_wait_timer());
     KUDU_RETURN_IF_ERROR(scanner_->Open(), "Unable to open scanner");

http://git-wip-us.apache.org/repos/asf/impala/blob/955ad083/testdata/workloads/functional-query/queries/QueryTest/kudu_limit.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_limit.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_limit.test
new file mode 100644
index 0000000..c72afc9
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_limit.test
@@ -0,0 +1,112 @@
+====
+---- QUERY
+# limit 0
+select * from functional_kudu.alltypes limit 0
+---- RESULTS
+---- TYPES
+int,boolean,tinyint,smallint,int,bigint,float,double,string,string,timestamp,int,int
+====
+---- QUERY
+# no predicate, nondeterministic (no order by) so only check number of rows returned
+select count(*) from (select * from functional_kudu.alltypes limit 2) v
+---- RESULTS
+2
+---- TYPES
+bigint
+====
+---- QUERY
+# no predicate, deterministic (limit doesn't exclude any rows) so check actual values
+select * from functional_kudu.alltypestiny limit 100
+---- RESULTS : VERIFY_IS_EQUAL_SORTED
+0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
+6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
+3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
+---- TYPES
+int,boolean,tinyint,smallint,int,bigint,float,double,string,string,timestamp,int,int
+====
+---- QUERY
+# Kudu predicate on PK col, nondeterministic
+select count(id) from (select * from functional_kudu.alltypes where id > 0 limit 3) v
+---- RESULTS
+3
+---- TYPES
+bigint
+====
+---- QUERY
+# Kudu predicate on PK col, deterministic
+select * from functional_kudu.alltypestiny where id > 4 limit 3
+---- RESULTS : VERIFY_IS_EQUAL_SORTED
+5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
+6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
+---- TYPES
+int,boolean,tinyint,smallint,int,bigint,float,double,string,string,timestamp,int,int
+====
+---- QUERY
+# Kudu predicate on non-PK col, nondeterministic
+select count(tinyint_col) from (select * from functional_kudu.alltypes where tinyint_col = 6 limit 4) v
+---- RESULTS
+4
+---- TYPES
+bigint
+====
+---- QUERY
+# Kudu predicate on non-PK col, deterministic
+select * from functional_kudu.alltypestiny where tinyint_col = 1 limit 4
+---- RESULTS : VERIFY_IS_EQUAL_SORTED
+1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
+7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
+3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
+---- TYPES
+int,boolean,tinyint,smallint,int,bigint,float,double,string,string,timestamp,int,int
+====
+---- QUERY
+# Impala predicate, nondeterministic
+select count(string_col) from (select * from functional_kudu.alltypes where id % 2 = 0 limit 2) v
+---- RESULTS
+2
+---- TYPES
+bigint
+====
+---- QUERY
+# Impala predicate, deterministic
+select * from functional_kudu.alltypestiny where id % 2 = 0 limit 100
+---- RESULTS : VERIFY_IS_EQUAL_SORTED
+0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+---- TYPES
+int,boolean,tinyint,smallint,int,bigint,float,double,string,string,timestamp,int,int
+====
+---- QUERY
+# Both Impala and Kudu predicates, nondeterministic
+select count(month) from
+  (select * from functional_kudu.alltypes where id % 2 = 0 and id > 4 limit 5) v
+---- RESULTS
+5
+---- TYPES
+bigint
+====
+---- QUERY
+# Both Impala and Kudu predicates, deterministic
+select * from functional_kudu.alltypestiny where id % 2 = 0 and id > 4 limit 5
+---- RESULTS
+6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+---- TYPES
+int,boolean,tinyint,smallint,int,bigint,float,double,string,string,timestamp,int,int
+====
+---- QUERY
+# large limit, nondeterministic
+select count(*) from (select * from functional_kudu.alltypes where id % 2 = 0 and id > 1 limit 1000) v;
+---- RESULTS
+1000
+---- TYPES
+bigint
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/955ad083/tests/query_test/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index b0ced46..ae76f75 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -107,6 +107,9 @@ class TestKuduOperations(KuduTestSuite):
   def test_kudu_describe(self, vector, unique_database):
     self.run_test_case('QueryTest/kudu_describe', vector, use_db=unique_database)
 
+  def test_kudu_limit(self, vector, unique_database):
+    self.run_test_case('QueryTest/kudu_limit', vector, use_db=unique_database)
+
   def test_kudu_column_options(self, cursor, kudu_client, unique_database):
     """Test Kudu column options"""
     encodings = ["ENCODING PLAIN_ENCODING", ""]


[3/4] impala git commit: IMPALA-6314: Add run time scalar subquery check for uncorrelated subqueries

Posted by mi...@apache.org.
IMPALA-6314: Add run time scalar subquery check for uncorrelated subqueries

If a scalar subquery is used with a binary predicate,
or, used in an arithmetic expression, it must return
only one row/column to be valid. If this cannot be
guaranteed at parse time through a single row aggregate
or limit clause, Impala fails the query like such.

E.g., currently the following query is not allowed:
SELECT bigint_col
FROM alltypesagg
WHERE id = (SELECT id FROM alltypesagg WHERE id = 1)

However, it would be allowed if the query contained
a LIMIT 1 clause, or instead of id it was max(id).

This commit makes the example valid by introducing a
runtime check to test if the subquery returns a single
row. If the subquery returns more than one row, it
aborts the query with an error.

I added a new node type, called CardinalityCheckNode. It
is created during planning on top of the subquery when
needed, then during execution it checks if its child
only returns a single row.

I extended the frontend tests and e2e tests as well.

Change-Id: I0f52b93a60eeacedd242a2f17fa6b99c4fc38e06
Reviewed-on: http://gerrit.cloudera.org:8080/9005
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/6472ccdd
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/6472ccdd
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/6472ccdd

Branch: refs/heads/2.x
Commit: 6472ccdd6ae5e386597020652ea4778ce793e407
Parents: d733ea6
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Wed Apr 4 17:38:59 2018 +0200
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue May 1 03:10:15 2018 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |   1 +
 be/src/exec/cardinality-check-node.cc           | 110 +++++
 be/src/exec/cardinality-check-node.h            |  61 +++
 be/src/exec/exec-node.cc                        |   4 +
 common/thrift/PlanNodes.thrift                  |  10 +-
 .../apache/impala/analysis/BinaryPredicate.java |  16 +-
 .../impala/analysis/ComputeStatsStmt.java       |   2 +-
 .../apache/impala/analysis/CreateViewStmt.java  |   6 +-
 .../apache/impala/analysis/ExistsPredicate.java |   6 +
 .../java/org/apache/impala/analysis/Expr.java   |  15 +-
 .../apache/impala/analysis/HdfsCachingOp.java   |   9 +-
 .../org/apache/impala/analysis/InPredicate.java |   1 +
 .../apache/impala/analysis/IsNullPredicate.java |   3 +-
 .../org/apache/impala/analysis/QueryStmt.java   |  17 +
 .../org/apache/impala/analysis/SelectStmt.java  |   4 +-
 .../apache/impala/analysis/StmtRewriter.java    |  32 +-
 .../org/apache/impala/analysis/Subquery.java    |   5 +-
 .../org/apache/impala/analysis/UnionStmt.java   |   1 +
 .../impala/planner/CardinalityCheckNode.java    |  91 ++++
 .../impala/planner/DistributedPlanner.java      |  21 +
 .../impala/planner/SingleNodePlanner.java       |   8 +
 .../impala/analysis/AnalyzeStmtsTest.java       |   2 +-
 .../impala/analysis/AnalyzeSubqueriesTest.java  |  71 ++--
 .../org/apache/impala/analysis/ToSqlTest.java   |  16 +-
 .../queries/PlannerTest/nested-collections.test |  66 +++
 .../queries/PlannerTest/subquery-rewrite.test   | 422 +++++++++++++++++++
 .../queries/QueryTest/nested-types-subplan.test |  38 ++
 .../queries/QueryTest/subquery.test             | 171 ++++++++
 28 files changed, 1130 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 7224df8..ccec3fb 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -28,6 +28,7 @@ add_library(Exec
   analytic-eval-node.cc
   base-sequence-scanner.cc
   blocking-join-node.cc
+  cardinality-check-node.cc
   catalog-op-executor.cc
   data-sink.cc
   data-source-scan-node.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/be/src/exec/cardinality-check-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/cardinality-check-node.cc b/be/src/exec/cardinality-check-node.cc
new file mode 100644
index 0000000..76579dd
--- /dev/null
+++ b/be/src/exec/cardinality-check-node.cc
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/cardinality-check-node.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
+#include "gen-cpp/PlanNodes_types.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+CardinalityCheckNode::CardinalityCheckNode(
+    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
+    : ExecNode(pool, tnode, descs),
+      display_statement_(tnode.cardinality_check_node.display_statement) {
+}
+
+Status CardinalityCheckNode::Prepare(RuntimeState* state) {
+  DCHECK(conjuncts_.empty());
+  DCHECK_EQ(limit_, 1);
+
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecNode::Prepare(state));
+  return Status::OK();
+}
+
+Status CardinalityCheckNode::Open(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecNode::Open(state));
+  RETURN_IF_ERROR(child(0)->Open(state));
+  row_batch_.reset(
+      new RowBatch(row_desc(), 1, mem_tracker()));
+
+  // Read rows from the child, raise error if there are more rows than one
+  RowBatch child_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+  bool child_eos = false;
+  int rows_collected = 0;
+  do {
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(QueryMaintenance(state));
+    RETURN_IF_ERROR(child(0)->GetNext(state, &child_batch, &child_eos));
+
+    rows_collected += child_batch.num_rows();
+    if (rows_collected > 1) {
+      return Status(Substitute("Subquery must not return more than one row: $0",
+          display_statement_));
+    }
+    if (child_batch.num_rows() != 0) child_batch.DeepCopyTo(row_batch_.get());
+    child_batch.Reset();
+  } while (!child_eos);
+
+  DCHECK(rows_collected == 0 || rows_collected == 1);
+
+  // If we are inside a subplan we can expect a call to Open()/GetNext()
+  // on the child again.
+  if (!IsInSubplan()) child(0)->Close(state);
+  return Status::OK();
+}
+
+Status CardinalityCheckNode::GetNext(RuntimeState* state, RowBatch* output_row_batch,
+    bool* eos) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+  DCHECK_LE(row_batch_->num_rows(), 1);
+
+  if (row_batch_->num_rows() == 1) {
+    TupleRow* src_row = row_batch_->GetRow(0);
+    TupleRow* dst_row = output_row_batch->GetRow(output_row_batch->AddRow());
+    output_row_batch->CopyRow(src_row, dst_row);
+    output_row_batch->CommitLastRow();
+    row_batch_->TransferResourceOwnership(output_row_batch);
+    num_rows_returned_ = 1;
+    COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  }
+  *eos = true;
+  row_batch_->Reset();
+  return Status::OK();
+}
+
+Status CardinalityCheckNode::Reset(RuntimeState* state) {
+  row_batch_->Reset();
+  return ExecNode::Reset(state);
+}
+
+void CardinalityCheckNode::Close(RuntimeState* state) {
+  if (is_closed()) return;
+  // Need to call destructor to release resources before calling ExecNode::Close().
+  row_batch_.reset();
+  ExecNode::Close(state);
+}
+
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/be/src/exec/cardinality-check-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/cardinality-check-node.h b/be/src/exec/cardinality-check-node.h
new file mode 100644
index 0000000..c71bd2b
--- /dev/null
+++ b/be/src/exec/cardinality-check-node.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_EXEC_CARDINALITY_CHECK_NODE_H
+#define IMPALA_EXEC_CARDINALITY_CHECK_NODE_H
+
+#include "exec/exec-node.h"
+#include <boost/scoped_ptr.hpp>
+
+namespace impala {
+
+/// Node that returns an error if its child produces more than a single row.
+/// If successful, this node returns a deep copy of its single input row.
+///
+/// Note that this node must be a blocking node. It would be incorrect to return rows
+/// before the single row constraint has been validated because downstream exec nodes
+/// might produce results and incorrectly return them to the client. If the child of this
+/// node produces more than one row it means the SQL query is semantically invalid, so no
+/// rows must be returned to the client.
+class CardinalityCheckNode : public ExecNode {
+ public:
+  CardinalityCheckNode(ObjectPool* pool, const TPlanNode& tnode,
+      const DescriptorTbl& descs);
+
+  virtual Status Prepare(RuntimeState* state) override;
+  virtual Status Open(RuntimeState* state) override;
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+  virtual Status Reset(RuntimeState* state) override;
+  virtual void Close(RuntimeState* state) override;
+ private:
+  /////////////////////////////////////////
+  /// BEGIN: Members that must be Reset()
+
+  /// Row batch that contains a single row from child
+  boost::scoped_ptr<RowBatch> row_batch_;
+
+  /// END: Members that must be Reset()
+  /////////////////////////////////////////
+
+  // The associated SQL statement for error reporting
+  std::string display_statement_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 8553459..37e6602 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -30,6 +30,7 @@
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "exec/analytic-eval-node.h"
+#include "exec/cardinality-check-node.h"
 #include "exec/data-source-scan-node.h"
 #include "exec/empty-set-node.h"
 #include "exec/exchange-node.h"
@@ -401,6 +402,9 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
     case TPlanNodeType::UNNEST_NODE:
       *node = pool->Add(new UnnestNode(pool, tnode, descs));
       break;
+    case TPlanNodeType::CARDINALITY_CHECK_NODE:
+      *node = pool->Add(new CardinalityCheckNode(pool, tnode, descs));
+      break;
     default:
       map<int, const char*>::const_iterator i =
           _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index c5df1cd..01698ce 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -46,7 +46,8 @@ enum TPlanNodeType {
   SINGULAR_ROW_SRC_NODE,
   UNNEST_NODE,
   SUBPLAN_NODE,
-  KUDU_SCAN_NODE
+  KUDU_SCAN_NODE,
+  CARDINALITY_CHECK_NODE
 }
 
 // phases of an execution node
@@ -519,6 +520,11 @@ struct TBackendResourceProfile {
   4: optional i64 max_row_buffer_size
 }
 
+struct TCardinalityCheckNode {
+  // Associated statement of child
+  1: required string display_statement
+}
+
 // This is essentially a union of all messages corresponding to subclasses
 // of PlanNode.
 struct TPlanNode {
@@ -567,6 +573,8 @@ struct TPlanNode {
 
   // Resource profile for this plan node.
   25: required TBackendResourceProfile resource_profile
+
+  26: optional TCardinalityCheckNode cardinality_check_node
 }
 
 // A flattened representation of a tree of PlanNodes, obtained by depth-first

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java b/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
index 444c003..e15b91f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
@@ -173,20 +173,8 @@ public class BinaryPredicate extends Predicate {
     fn_ = getBuiltinFunction(analyzer, opName, collectChildReturnTypes(),
         CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
     if (fn_ == null) {
-      // Construct an appropriate error message and throw an AnalysisException.
-      String errMsg = "operands of type " + getChild(0).getType().toSql() + " and " +
-            getChild(1).getType().toSql()  + " are not comparable: " + toSql();
-
-      // Check if any of the children is a Subquery that does not return a
-      // scalar.
-      for (Expr expr: children_) {
-        if (expr instanceof Subquery && !expr.getType().isScalarType()) {
-          errMsg = "Subquery must return a single row: " + expr.toSql();
-          break;
-        }
-      }
-
-      throw new AnalysisException(errMsg);
+      throw new AnalysisException("operands of type " + getChild(0).getType().toSql() +
+          " and " + getChild(1).getType().toSql()  + " are not comparable: " + toSql());
     }
     Preconditions.checkState(fn_.getReturnType().isBoolean());
 

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index e442d66..f20e1c7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -776,7 +776,7 @@ public class ComputeStatsStmt extends StatementBase {
       return "COMPUTE STATS " + tableName_.toSql() + columnList.toString() + tblsmpl;
     } else {
       return "COMPUTE INCREMENTAL STATS " + tableName_.toSql() +
-          partitionSet_ == null ? "" : partitionSet_.toSql();
+          (partitionSet_ == null ? "" : partitionSet_.toSql());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java
index 6b90083..6e98fe5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java
@@ -72,9 +72,9 @@ public class CreateViewStmt extends CreateOrAlterViewStmtBase {
     sb.append("CREATE VIEW ");
     if (ifNotExists_) sb.append("IF NOT EXISTS ");
     if (tableName_.getDb() != null) sb.append(tableName_.getDb() + ".");
-    sb.append(tableName_.getTbl() + " (");
-    sb.append(Joiner.on(", ").join(columnDefs_));
-    sb.append(") AS ");
+    sb.append(tableName_.getTbl());
+    if (columnDefs_ != null) sb.append("(" + Joiner.on(", ").join(columnDefs_) + ")");
+    sb.append(" AS ");
     sb.append(viewDefStmt_.toSql());
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java b/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java
index 381848c..8da020f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java
@@ -55,6 +55,12 @@ public class ExistsPredicate extends Predicate {
   }
 
   @Override
+  protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
+    super.analyzeImpl(analyzer);
+    ((Subquery)children_.get(0)).getStatement().setIsRuntimeScalar(false);
+  }
+
+  @Override
   protected void toThrift(TExprNode msg) {
     // Cannot serialize a nested predicate
     Preconditions.checkState(false);

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/analysis/Expr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index ef53476..c519ea4 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -152,6 +152,16 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
         }
       };
 
+  // Returns true if an Expr is a user-defined aggregate function.
+  public final static com.google.common.base.Predicate<Expr> IS_UDA_FN =
+      new com.google.common.base.Predicate<Expr>() {
+        @Override
+        public boolean apply(Expr arg) {
+          return isAggregatePredicate_.apply(arg) &&
+              !((FunctionCallExpr)arg).getFnName().isBuiltin();
+        }
+      };
+
   public final static com.google.common.base.Predicate<Expr> IS_TRUE_LITERAL =
       new com.google.common.base.Predicate<Expr>() {
         @Override
@@ -1189,7 +1199,10 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
    */
   public boolean isScalarSubquery() {
     Preconditions.checkState(isAnalyzed_);
-    return this instanceof Subquery && getType().isScalarType();
+    if (!(this instanceof Subquery)) return false;
+    Subquery subq = (Subquery) this;
+    SelectStmt stmt = (SelectStmt) subq.getStatement();
+    return stmt.returnsSingleRow() && getType().isScalarType();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/analysis/HdfsCachingOp.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/HdfsCachingOp.java b/fe/src/main/java/org/apache/impala/analysis/HdfsCachingOp.java
index 0ee274c..4f10a1b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/HdfsCachingOp.java
+++ b/fe/src/main/java/org/apache/impala/analysis/HdfsCachingOp.java
@@ -79,8 +79,13 @@ public class HdfsCachingOp implements ParseNode {
 
   @Override
   public String toSql() {
-    return !shouldCache() ? "UNCACHED" : "CACHED IN '" + getCachePoolName() + "' WITH " +
-        "REPLICATION = " + parsedReplication_.longValue();
+    if (!shouldCache()) return "UNCACHED";
+    StringBuilder sb = new StringBuilder();
+    sb.append("CACHED IN '" + getCachePoolName() + "'");
+    if (parsedReplication_ != null) {
+      sb.append(" WITH REPLICATION = " + parsedReplication_.longValue());
+    }
+    return sb.toString();
   }
 
   public THdfsCachingOp toThrift() { return cacheOp_; }

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/analysis/InPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InPredicate.java b/fe/src/main/java/org/apache/impala/analysis/InPredicate.java
index f840ef2..89479ad 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InPredicate.java
@@ -114,6 +114,7 @@ public class InPredicate extends Predicate {
             toSqlImpl());
       }
       Subquery subquery = (Subquery)getChild(1);
+      subquery.getStatement().setIsRuntimeScalar(false);
       if (!subquery.returnsScalarColumn()) {
         throw new AnalysisException("Subquery must return a single column: " +
             subquery.toSql());

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java b/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java
index 5a1fe99..2629be2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java
@@ -104,7 +104,8 @@ public class IsNullPredicate extends Predicate {
         // a null value.
         setChild(0, new BoolLiteral(true));
         getChild(0).analyze(analyzer);
-      } else if (!getChild(0).contains(Expr.IS_SCALAR_SUBQUERY)) {
+      } else if (!getChild(0).contains(Expr.IS_SCALAR_SUBQUERY) &&
+          !getChild(0).getSubquery().getStatement().isRuntimeScalar()) {
         // We only support scalar subqueries in an IS NULL predicate because
         // they can be rewritten into a join.
         // TODO: Add support for InPredicates and BinaryPredicates with

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
index d631bd8..1dd55a0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
@@ -88,6 +88,16 @@ public abstract class QueryStmt extends StatementBase {
   /////////////////////////////////////////
   // END: Members that need to be reset()
 
+  // Contains the post-analysis toSql() string before rewrites. I.e. table refs are
+  // resolved and fully qualified, but no rewrites happened yet. This string is showed
+  // to the user in some cases in order to display a statement that is very similar
+  // to what was originally issued.
+  protected String origSqlString_ = null;
+
+  // If true, we need a runtime check on this statement's result to check if it
+  // returns a single row.
+  protected boolean isRuntimeScalar_ = false;
+
   QueryStmt(ArrayList<OrderByElement> orderByElements, LimitElement limitElement) {
     orderByElements_ = orderByElements;
     sortInfo_ = null;
@@ -372,6 +382,11 @@ public abstract class QueryStmt extends StatementBase {
   public WithClause getWithClause() { return withClause_; }
   public boolean hasOrderByClause() { return orderByElements_ != null; }
   public boolean hasLimit() { return limitElement_.getLimitExpr() != null; }
+  public String getOrigSqlString() { return origSqlString_; }
+  public boolean isRuntimeScalar() { return isRuntimeScalar_; }
+  public void setIsRuntimeScalar(boolean isRuntimeScalar) {
+    isRuntimeScalar_ = isRuntimeScalar;
+  }
   public long getLimit() { return limitElement_.getLimit(); }
   public boolean hasOffset() { return limitElement_.getOffsetExpr() != null; }
   public long getOffset() { return limitElement_.getOffset(); }
@@ -446,6 +461,8 @@ public abstract class QueryStmt extends StatementBase {
     sortInfo_ = (other.sortInfo_ != null) ? other.sortInfo_.clone() : null;
     analyzer_ = other.analyzer_;
     evaluateOrderBy_ = other.evaluateOrderBy_;
+    origSqlString_ = other.origSqlString_;
+    isRuntimeScalar_ = other.isRuntimeScalar_;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index 71e7e8e..ecc785c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -255,6 +255,7 @@ public class SelectStmt extends QueryStmt {
 
     // Remember the SQL string before inline-view expression substitution.
     sqlString_ = toSql();
+    if (origSqlString_ == null) origSqlString_ = sqlString_;
     resolveInlineViewRefs(analyzer);
 
     // If this block's select-project-join portion returns an empty result set and the
@@ -1066,8 +1067,9 @@ public class SelectStmt extends QueryStmt {
    * result set also depends on the data a stmt is processing.
    */
   public boolean returnsSingleRow() {
+    Preconditions.checkState(isAnalyzed());
     // limit 1 clause
-    if (limitElement_ != null && limitElement_.getLimit() == 1) return true;
+    if (limitElement_ != null && hasLimit() && limitElement_.getLimit() == 1) return true;
     // No from clause (base tables or inline views)
     if (fromClause_.isEmpty()) return true;
     // Aggregation with no group by and no DISTINCT

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java b/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
index a0eb757..6cfbd20 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
@@ -217,7 +217,7 @@ public class StmtRewriter {
       }
       if (!(conjunct instanceof InPredicate) && !(conjunct instanceof ExistsPredicate) &&
           !(conjunct instanceof BinaryPredicate) &&
-          !conjunct.contains(Expr.IS_SCALAR_SUBQUERY)) {
+          !conjunct.getSubquery().getType().isScalarType()) {
         throw new AnalysisException("Non-scalar subquery is not supported in " +
             "expression: " + conjunct.toSql());
       }
@@ -457,8 +457,9 @@ public class StmtRewriter {
     Preconditions.checkNotNull(expr);
     Preconditions.checkNotNull(analyzer);
     boolean updateSelectList = false;
-
     SelectStmt subqueryStmt = (SelectStmt)expr.getSubquery().getStatement();
+    boolean isScalarSubquery = expr.getSubquery().isScalarSubquery();
+    boolean isRuntimeScalar = subqueryStmt.isRuntimeScalar();
     // Create a new inline view from the subquery stmt. The inline view will be added
     // to the stmt's table refs later. Explicitly set the inline view's column labels
     // to eliminate any chance that column aliases from the parent query could reference
@@ -478,10 +479,13 @@ public class StmtRewriter {
       // safely remove it.
       subqueryStmt.limitElement_ = new LimitElement(null, null);
     }
+    // If runtime scalar, we need to prevent the propagation of predicates into the
+    // inline view by setting a limit on the statement.
+    if (isRuntimeScalar) subqueryStmt.setLimit(2);
 
     // Update the subquery's select list and/or its GROUP BY clause by adding
     // exprs from the extracted correlated predicates.
-    boolean updateGroupBy = expr.getSubquery().isScalarSubquery()
+    boolean updateGroupBy = isScalarSubquery
         || (expr instanceof ExistsPredicate
             && !subqueryStmt.getSelectList().isDistinct()
             && subqueryStmt.hasAggInfo());
@@ -610,14 +614,15 @@ public class StmtRewriter {
       // TODO: Remove this when independent subquery evaluation is implemented.
       // TODO: Requires support for non-equi joins.
       boolean hasGroupBy = ((SelectStmt) inlineView.getViewStmt()).hasGroupByClause();
-      if (!expr.getSubquery().isScalarSubquery() ||
-          (!(hasGroupBy && stmt.selectList_.isDistinct()) && hasGroupBy)) {
+      Subquery subquery = expr.getSubquery();
+      if ((!isScalarSubquery && !isRuntimeScalar) ||
+          (hasGroupBy && !stmt.selectList_.isDistinct())) {
         throw new AnalysisException("Unsupported predicate with subquery: " +
             expr.toSql());
       }
 
       // TODO: Requires support for null-aware anti-join mode in nested-loop joins
-      if (expr.getSubquery().isScalarSubquery() && expr instanceof InPredicate
+      if (isScalarSubquery && expr instanceof InPredicate
           && ((InPredicate) expr).isNotIn()) {
         throw new AnalysisException("Unsupported NOT IN predicate with subquery: " +
             expr.toSql());
@@ -796,7 +801,12 @@ public class StmtRewriter {
       throw new AnalysisException("Unsupported correlated subquery with grouping " +
           "and/or aggregation: " + stmt.toSql());
     }
-
+    // TODO: instead of this check, implement IMPALA-6315
+    if (!expr.getSubquery().isScalarSubquery() &&
+        !(expr instanceof InPredicate || expr instanceof ExistsPredicate)) {
+      throw new AnalysisException(
+          "Unsupported correlated subquery with runtime scalar check: " + stmt.toSql());
+    }
     // The following correlated subqueries with a limit clause are supported:
     // 1. EXISTS subqueries
     // 2. Scalar subqueries with aggregation
@@ -1016,16 +1026,12 @@ public class StmtRewriter {
       pred.analyze(analyzer);
       return pred;
     }
-    // Only scalar subqueries are supported
     Subquery subquery = exprWithSubquery.getSubquery();
-    if (!subquery.isScalarSubquery()) {
-      throw new AnalysisException("Unsupported predicate with a non-scalar subquery: "
-          + subquery.toSql());
-    }
+    Preconditions.checkState(subquery.getType().isScalarType());
     ExprSubstitutionMap smap = new ExprSubstitutionMap();
     SelectListItem item =
       ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
-    if (isCorrelated && !item.getExpr().contains(Expr.IS_BUILTIN_AGG_FN)) {
+    if (isCorrelated && item.getExpr().contains(Expr.IS_UDA_FN)) {
       throw new AnalysisException("UDAs are not supported in the select list of " +
           "correlated subqueries: " + subquery.toSql());
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/analysis/Subquery.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Subquery.java b/fe/src/main/java/org/apache/impala/analysis/Subquery.java
index ca15020..d5c2e68 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Subquery.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Subquery.java
@@ -45,6 +45,7 @@ public class Subquery extends Expr {
 
   public Analyzer getAnalyzer() { return analyzer_; }
   public QueryStmt getStatement() { return stmt_; }
+
   @Override
   public String toSqlImpl() { return "(" + stmt_.toSql() + ")"; }
 
@@ -92,8 +93,8 @@ public class Subquery extends Expr {
       type_ = createStructTypeFromExprList();
     }
 
-    // If the subquery returns many rows, set its type to ArrayType.
-    if (!((SelectStmt)stmt_).returnsSingleRow()) type_ = new ArrayType(type_);
+    // If the subquery can return many rows, do the cardinality check at runtime.
+    if (!((SelectStmt)stmt_).returnsSingleRow()) stmt_.setIsRuntimeScalar(true);
 
     Preconditions.checkNotNull(type_);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
index bb472a2..1e58fad 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
@@ -219,6 +219,7 @@ public class UnionStmt extends QueryStmt {
 
     // Remember the SQL string before unnesting operands.
     toSqlString_ = toSql();
+    if (origSqlString_ == null) origSqlString_ = toSqlString_;
 
     // Unnest the operands before casting the result exprs. Unnesting may add
     // additional entries to operands_ and the result exprs of those unnested

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
new file mode 100644
index 0000000..94f3dd1
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.thrift.TCardinalityCheckNode;
+import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TPlanNode;
+import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Node that returns an error if its child produces more than a single row.
+ * If successful, this node returns a deep copy of its single input row.
+ *
+ * Note that this node must be a blocking node. It would be incorrect to return rows
+ * before the single row constraint has been validated because downstream exec nodes
+ * might produce results and incorrectly return them to the client. If the child of this
+ * node produces more than one row it means the SQL query is semantically invalid, so no
+ * rows must be returned to the client.
+ */
+public class CardinalityCheckNode extends PlanNode {
+  private final String displayStatement_;
+
+  protected CardinalityCheckNode(PlanNodeId id, PlanNode child, String displayStmt) {
+    super(id, "CARDINALITY CHECK");
+    Preconditions.checkState(child.getLimit() == 2);
+    cardinality_ = 1;
+    limit_ = 1;
+    displayStatement_ = displayStmt;
+    addChild(child);
+    computeTupleIds();
+  }
+
+  /**
+   * Same as PlanNode.init(), except we don't assign conjuncts.
+   */
+  @Override
+  public void init(Analyzer analyzer) throws ImpalaException {
+    computeStats(analyzer);
+    createDefaultSmap(analyzer);
+  }
+
+  @Override
+  public boolean isBlockingNode() { return true; }
+
+  @Override
+  public void computeTupleIds() {
+    clearTupleIds();
+    tblRefIds_.addAll(getChild(0).getTblRefIds());
+    tupleIds_.addAll(getChild(0).getTupleIds());
+    nullableTupleIds_.addAll(getChild(0).getNullableTupleIds());
+  }
+
+  @Override
+  protected void toThrift(TPlanNode msg) {
+    msg.node_type = TPlanNodeType.CARDINALITY_CHECK_NODE;
+    TCardinalityCheckNode cardinalityCheckNode = new TCardinalityCheckNode(
+        displayStatement_);
+    msg.setCardinality_check_node(cardinalityCheckNode);
+  }
+
+  @Override
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
+    nodeResourceProfile_ = ResourceProfile.noReservation(0);
+  }
+
+  @Override
+  protected String getNodeExplainString(String prefix, String detailPrefix,
+      TExplainLevel detailLevel) {
+    return String.format("%s%s:%s\n", prefix, id_.toString(), displayName_);
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index b388673..ef24f6c 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -143,6 +143,8 @@ public class DistributedPlanner {
     } else if (root instanceof EmptySetNode) {
       result = new PlanFragment(
           ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED);
+    } else if (root instanceof CardinalityCheckNode) {
+      result = createCardinalityCheckNodeFragment((CardinalityCheckNode) root, childFragments);
     } else {
       throw new InternalException("Cannot create plan fragment for this node type: "
           + root.getExplainString(ctx_.getQueryOptions()));
@@ -728,6 +730,25 @@ public class DistributedPlanner {
   }
 
   /**
+   * Adds the CardinalityCheckNode as the new plan root to the child fragment and returns
+   * the child fragment.
+   */
+  private PlanFragment createCardinalityCheckNodeFragment(
+      CardinalityCheckNode cardinalityCheckNode,
+      ArrayList<PlanFragment> childFragments) throws ImpalaException {
+    PlanFragment childFragment = childFragments.get(0);
+    // The cardinality check must execute on a single node.
+    if (childFragment.getOutputPartition().isPartitioned()) {
+      childFragment = createMergeFragment(childFragment);
+    }
+    // Set the child explicitly, an ExchangeNode might have been inserted
+    // (whereas cardinalityCheckNode.child[0] would point to the original child)
+    cardinalityCheckNode.setChild(0, childFragment.getPlanRoot());
+    childFragment.setPlanRoot(cardinalityCheckNode);
+    return childFragment;
+  }
+
+  /**
    * Replace node's child at index childIdx with an ExchangeNode that receives its
    * input from childFragment. ParentFragment contains node and the new ExchangeNode.
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 27d293d..fb669c5 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -1098,6 +1098,14 @@ public class SingleNodePlanner {
     // Set output smap of rootNode *before* creating a SelectNode for proper resolution.
     rootNode.setOutputSmap(outputSmap);
 
+    // Add runtime cardinality check if needed
+    if (inlineViewRef.getViewStmt().isRuntimeScalar()) {
+      rootNode = new CardinalityCheckNode(ctx_.getNextNodeId(), rootNode,
+          inlineViewRef.getViewStmt().getOrigSqlString());
+      rootNode.setOutputSmap(outputSmap);
+      rootNode.init(ctx_.getRootAnalyzer());
+    }
+
     // If the inline view has a LIMIT/OFFSET or unassigned conjuncts due to analytic
     // functions, we may have conjuncts that need to be assigned to a SELECT node on
     // top of the current plan root node.

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index e8d42e5..f591927 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -3612,7 +3612,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
    */
   @Test
   public void TestClone() {
-    testNumberOfMembers(QueryStmt.class, 9);
+    testNumberOfMembers(QueryStmt.class, 11);
     testNumberOfMembers(UnionStmt.class, 9);
     testNumberOfMembers(ValuesStmt.class, 0);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
index de8632e..06e834d 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
@@ -938,26 +938,11 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
       }
     }
 
-    // Subquery returns multiple rows
-    AnalysisError("select * from functional.alltypestiny where " +
-        "(select max(id) from functional.alltypes) = " +
-        "(select id from functional.alltypestiny)",
-        "Subquery must return a single row: " +
-        "(SELECT id FROM functional.alltypestiny)");
-    AnalysisError("select id from functional.alltypestiny t where int_col = " +
-        "(select int_col from functional.alltypessmall limit 2)",
-        "Subquery must return a single row: " +
-        "(SELECT int_col FROM functional.alltypessmall LIMIT 2)");
-    AnalysisError("select id from functional.alltypestiny where int_col = " +
-        "(select id from functional.alltypessmall)",
-        "Subquery must return a single row: " +
-        "(SELECT id FROM functional.alltypessmall)");
-
     // Subquery returns multiple columns
     AnalysisError("select id from functional.alltypestiny where int_col = " +
         "(select id, int_col from functional.alltypessmall)",
-        "Subquery must return a single row: " +
-        "(SELECT id, int_col FROM functional.alltypessmall)");
+        "operands of type INT and STRUCT<id:INT,int_col:INT> are not " +
+        "comparable: int_col = (SELECT id, int_col FROM functional.alltypessmall)");
     AnalysisError("select * from functional.alltypestiny where id in " +
         "(select * from (values(1,2)) as t)",
         "Subquery must return a single column: (SELECT * FROM (VALUES(1, 2)) t)");
@@ -965,9 +950,9 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
     // Subquery returns multiple columns due to a group by clause
     AnalysisError("select id from functional.alltypestiny where int_col = " +
         "(select int_col, count(*) from functional.alltypessmall group by int_col)",
-        "Subquery must return a single row: " +
-        "(SELECT int_col, count(*) FROM functional.alltypessmall " +
-        "GROUP BY int_col)");
+        "operands of type INT and STRUCT<int_col:INT,_1:BIGINT> are not " +
+        "comparable: int_col = (SELECT int_col, count(*) FROM " +
+        "functional.alltypessmall GROUP BY int_col)");
 
     // Outer join with a table from the outer block using an explicit alias
     AnalysisError("select id from functional.alltypestiny t where int_col = " +
@@ -1083,12 +1068,10 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
         "g.int_col = t.int_col limit 1)");
 
     // Aggregate subquery with analytic function
-    AnalysisError("select id, int_col, bool_col from " +
+    AnalyzesOk("select id, int_col, bool_col from " +
       "functional.alltypestiny t1 where int_col = (select min(bigint_col) " +
       "over (partition by bool_col) from functional.alltypessmall t2 where " +
-      "int_col < 10)", "Subquery must return a single row: (SELECT " +
-      "min(bigint_col) OVER (PARTITION BY bool_col) FROM " +
-      "functional.alltypessmall t2 WHERE int_col < 10)");
+      "int_col < 10)");
 
     // Aggregate subquery with analytic function + limit 1 and a relative table ref
     // TODO: Modify the StmtRewriter to allow this query with only relative refs.
@@ -1114,10 +1097,8 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
       "int_col < 10 limit 1)");
 
     // Subquery with distinct in binary predicate
-    AnalysisError("select * from functional.alltypes where int_col = " +
-        "(select distinct int_col from functional.alltypesagg)", "Subquery " +
-        "must return a single row: (SELECT DISTINCT int_col FROM " +
-        "functional.alltypesagg)");
+    AnalyzesOk("select * from functional.alltypes where int_col = " +
+        "(select distinct int_col from functional.alltypesagg)");
     AnalyzesOk("select * from functional.alltypes where int_col = " +
         "(select count(distinct int_col) from functional.alltypesagg)");
     // Multiple count aggregate functions in a correlated subquery's select list
@@ -1179,6 +1160,29 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
         "(select t.* from functional.alltypes)",
         "Could not resolve star expression: 't.*'");
 
+    // Scalar subquery check is done at runtime, not during analysis
+    for (String cmpOp: cmpOperators) {
+      AnalyzesOk(String.format(
+          "select id from functional.alltypestiny t where int_col %s " +
+          "(select int_col from functional.alltypessmall)", cmpOp));
+      AnalyzesOk(String.format(
+          "select id from functional.alltypestiny t where int_col %s " +
+          "(select int_col from functional.alltypessmall where id = 1)", cmpOp));
+      AnalyzesOk(String.format(
+          "select id from functional.alltypestiny t where int_col %s " +
+          "1 - (select int_col from functional.alltypessmall where id = 1)", cmpOp));
+      AnalyzesOk(String.format(
+          "select id from functional.alltypestiny t where int_col %s " +
+          "1 - (select int_col from functional.alltypessmall limit 10)", cmpOp));
+      AnalyzesOk(String.format(
+          "select id from functional.alltypestiny t where int_col %s " +
+          "(select int_col from functional.alltypessmall) * 7", cmpOp));
+    }
+    AnalysisError("select * from functional.alltypes t1 where id < " +
+        "(select id from functional.alltypes t2 where t1.int_col = t2.int_col)",
+        "Unsupported correlated subquery with runtime scalar check: " +
+        "SELECT id FROM functional.alltypes t2 WHERE t1.int_col = t2.int_col");
+
     // Test resolution of correlated table references inside subqueries. The testing
     // here is rather basic, because the analysis goes through the same codepath
     // as the analysis of correlated inline views, which are more thoroughly tested.
@@ -1396,5 +1400,16 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
     AnalyzesOk("select * from functional.alltypestiny a where " +
         "double_col between cast(1 as double) and cast(10 as double) and " +
         "exists (select 1 from functional.alltypessmall b where a.id = b.id)");
+
+    AnalyzesOk("select count(1) from functional.alltypes " +
+        "where (select int_col > 10 from functional.alltypes)");
+    AnalyzesOk("select count(1) from functional.alltypes " +
+        "where (select string_col is null from functional.alltypes)");
+    AnalyzesOk("select count(1) from functional.alltypes " +
+        "where (select int_col from functional.alltypes) is null");
+    AnalyzesOk("select count(1) from functional.alltypes " +
+        "where (select int_col from functional.alltypes) is not null");
+    AnalyzesOk("select 1 from functional.alltypes where " +
+        "coalesce(null, (select bool_col from functional.alltypes where id = 0))");
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 760ea59..0a073eb 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -79,7 +79,11 @@ public class ToSqlTest extends FrontendTestBase {
     String actual = null;
     try {
       ParseNode node = AnalyzesOk(query, createAnalysisCtx(defaultDb));
-      actual = node.toSql();
+      if (node instanceof QueryStmt) {
+        actual = ((QueryStmt)node).getOrigSqlString();
+      } else {
+        actual = node.toSql();
+      }
       if (ignoreWhitespace) {
         // Transform whitespace to single space.
         actual = actual.replace('\n', ' ').replaceAll(" +", " ").trim();
@@ -886,9 +890,6 @@ public class ToSqlTest extends FrontendTestBase {
   /**
    * Tests that toSql() properly handles subqueries in the where clause.
    */
-  // TODO Fix testToSql to print the stmt after the first analysis phase and not
-  // after the rewrite.
-  @Ignore("Prints the rewritten statement")
   @Test
   public void subqueryTest() {
     // Nested predicates
@@ -912,13 +913,6 @@ public class ToSqlTest extends FrontendTestBase {
             "(select * from functional.alltypestiny)",
         "SELECT * FROM functional.alltypes WHERE NOT EXISTS " +
             "(SELECT * FROM functional.alltypestiny)");
-    // Multiple nested predicates in the WHERE clause
-    testToSql("select * from functional.alltypes where not (id < 10 and " +
-            "(int_col in (select int_col from functional.alltypestiny)) and " +
-            "(string_col = (select max(string_col) from functional.alltypestiny)))",
-        "SELECT * FROM functional.alltypes WHERE NOT (id < 10 AND " +
-            "(int_col IN (SELECT int_col FROM functional.alltypestiny)) AND " +
-        "(string_col = (SELECT max(string_col) FROM functional.alltypestiny)))");
     // Multiple nesting levels
     testToSql("select * from functional.alltypes where id in " +
         "(select id from functional.alltypestiny where int_col = " +

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test b/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
index 3646146..bf7f8b8 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
@@ -1987,3 +1987,69 @@ PLAN-ROOT SINK
    partitions=1/1 files=1 size=4.18KB
    runtime filters: RF000 -> t1.pos
 ====
+# Add run time scalar subquery check for uncorrelated subqueries
+# Create CardinalityCheckNode inside a subplan
+select c_custkey
+from tpch_nested_parquet.customer c
+where c_custkey < (select o_orderkey
+                   from c.c_orders
+                   where o_orderkey = 6000000)
+---- PLAN
+PLAN-ROOT SINK
+|
+01:SUBPLAN
+|
+|--05:NESTED LOOP JOIN [RIGHT SEMI JOIN]
+|  |  join predicates: c_custkey < o_orderkey
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |
+|  04:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  03:UNNEST [c.c_orders]
+|     limit: 2
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c]
+   partitions=1/1 files=4 size=292.35MB
+   predicates on c_orders: o_orderkey = 6000000
+====
+# CardinalityCheckNode in subplan in a subplan
+select c_custkey
+from tpch_nested_parquet.customer c
+where c_custkey < (select o_orderkey
+                   from c.c_orders co
+                   where o_orderkey = (select li.l_linenumber
+                                       from co.o_lineitems li))
+---- PLAN
+PLAN-ROOT SINK
+|
+01:SUBPLAN
+|
+|--10:NESTED LOOP JOIN [RIGHT SEMI JOIN]
+|  |  join predicates: c_custkey < o_orderkey
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |
+|  09:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  04:SUBPLAN
+|  |  limit: 2
+|  |
+|  |--08:NESTED LOOP JOIN [RIGHT SEMI JOIN]
+|  |  |  join predicates: li.l_linenumber = o_orderkey
+|  |  |
+|  |  |--05:SINGULAR ROW SRC
+|  |  |
+|  |  07:CARDINALITY CHECK
+|  |  |  limit: 1
+|  |  |
+|  |  06:UNNEST [co.o_lineitems li]
+|  |     limit: 2
+|  |
+|  03:UNNEST [c.c_orders co]
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c]
+   partitions=1/1 files=4 size=292.35MB
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
index ea22679..796ccd8 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
@@ -2338,4 +2338,426 @@ PLAN-ROOT SINK
 01:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
    predicates: id < 5
+=======
+# Subquery in binary predicate that needs cardinality check at runtime
+select bigint_col from functional.alltypes where id =
+  (select id
+   from functional.alltypes where id = 1
+  )
+---- PLAN
+PLAN-ROOT SINK
+|
+03:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: id = id
+|  runtime filters: RF000 <- id
+|
+|--02:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  01:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|     predicates: id = 1
+|     limit: 2
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   predicates: functional.alltypes.id = 1
+   runtime filters: RF000 -> id
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+03:HASH JOIN [LEFT SEMI JOIN, PARTITIONED]
+|  hash predicates: id = id
+|  runtime filters: RF000 <- id
+|
+|--06:EXCHANGE [HASH(id)]
+|  |
+|  02:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  04:EXCHANGE [UNPARTITIONED]
+|  |  limit: 2
+|  |
+|  01:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|     predicates: id = 1
+|     limit: 2
+|
+05:EXCHANGE [HASH(id)]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   predicates: functional.alltypes.id = 1
+   runtime filters: RF000 -> id
+====
+# Subquery in arithmetic expression that needs cardinality check at runtime
+select bigint_col from functional.alltypes where id =
+  3 * (select id
+   from functional.alltypes where id = 1
+  )
+---- PLAN
+PLAN-ROOT SINK
+|
+03:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: id = 3 * id
+|  runtime filters: RF000 <- 3 * id
+|
+|--02:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  01:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|     predicates: id = 1
+|     limit: 2
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> id
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+03:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+|  hash predicates: id = 3 * id
+|  runtime filters: RF000 <- 3 * id
+|
+|--05:EXCHANGE [BROADCAST]
+|  |
+|  02:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  04:EXCHANGE [UNPARTITIONED]
+|  |  limit: 2
+|  |
+|  01:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|     predicates: id = 1
+|     limit: 2
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> id
+====
+# Subquery that contains union and needs cardinality check at runtime
+select * from functional.alltypes where id =
+  (select i from (select bigint_col as i from functional.alltypes
+                  union
+                  select smallint_col as i from functional.alltypes) t)
+---- PLAN
+PLAN-ROOT SINK
+|
+06:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: id = i
+|  runtime filters: RF000 <- i
+|
+|--05:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  04:AGGREGATE [FINALIZE]
+|  |  group by: i
+|  |  limit: 2
+|  |
+|  01:UNION
+|  |  pass-through-operands: 02
+|  |
+|  |--03:SCAN HDFS [functional.alltypes]
+|  |     partitions=24/24 files=24 size=478.45KB
+|  |
+|  02:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> id
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+11:EXCHANGE [UNPARTITIONED]
+|
+06:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+|  hash predicates: id = i
+|  runtime filters: RF000 <- i
+|
+|--10:EXCHANGE [BROADCAST]
+|  |
+|  05:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  09:EXCHANGE [UNPARTITIONED]
+|  |  limit: 2
+|  |
+|  08:AGGREGATE [FINALIZE]
+|  |  group by: i
+|  |  limit: 2
+|  |
+|  07:EXCHANGE [HASH(i)]
+|  |
+|  04:AGGREGATE [STREAMING]
+|  |  group by: i
+|  |
+|  01:UNION
+|  |  pass-through-operands: 02
+|  |
+|  |--03:SCAN HDFS [functional.alltypes]
+|  |     partitions=24/24 files=24 size=478.45KB
+|  |
+|  02:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> id
+====
+# Subquery that contains join and GROUP BY and needs cardinality check at runtime
+select * from functional.alltypes where id =
+  (select max(allt.smallint_col) from functional.alltypes allt, functional.alltypesagg ata
+   where allt.id = ata.id and ata.month = 1 group by ata.month)
+---- PLAN
+PLAN-ROOT SINK
+|
+06:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: id = max(allt.smallint_col)
+|  runtime filters: RF000 <- max(allt.smallint_col)
+|
+|--05:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  04:AGGREGATE [FINALIZE]
+|  |  output: max(allt.smallint_col)
+|  |  group by: ata.month
+|  |  limit: 2
+|  |
+|  03:HASH JOIN [INNER JOIN]
+|  |  hash predicates: ata.id = allt.id
+|  |  runtime filters: RF002 <- allt.id
+|  |
+|  |--01:SCAN HDFS [functional.alltypes allt]
+|  |     partitions=24/24 files=24 size=478.45KB
+|  |
+|  02:SCAN HDFS [functional.alltypesagg ata]
+|     partitions=11/11 files=11 size=814.73KB
+|     runtime filters: RF002 -> ata.id
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> id
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+13:EXCHANGE [UNPARTITIONED]
+|
+06:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+|  hash predicates: id = max(allt.smallint_col)
+|  runtime filters: RF000 <- max(allt.smallint_col)
+|
+|--12:EXCHANGE [BROADCAST]
+|  |
+|  05:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  11:EXCHANGE [UNPARTITIONED]
+|  |  limit: 2
+|  |
+|  10:AGGREGATE [FINALIZE]
+|  |  output: max:merge(allt.smallint_col)
+|  |  group by: ata.month
+|  |  limit: 2
+|  |
+|  09:EXCHANGE [HASH(ata.month)]
+|  |
+|  04:AGGREGATE [STREAMING]
+|  |  output: max(allt.smallint_col)
+|  |  group by: ata.month
+|  |
+|  03:HASH JOIN [INNER JOIN, PARTITIONED]
+|  |  hash predicates: ata.id = allt.id
+|  |  runtime filters: RF002 <- allt.id
+|  |
+|  |--08:EXCHANGE [HASH(allt.id)]
+|  |  |
+|  |  01:SCAN HDFS [functional.alltypes allt]
+|  |     partitions=24/24 files=24 size=478.45KB
+|  |
+|  07:EXCHANGE [HASH(ata.id)]
+|  |
+|  02:SCAN HDFS [functional.alltypesagg ata]
+|     partitions=11/11 files=11 size=814.73KB
+|     runtime filters: RF002 -> ata.id
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> id
+====
+# IS NULL predicate must not be pushed down to the scan node of the inline view.
+select count(1) from functional.alltypes
+where (select int_col from functional.alltypes) is null
+---- PLAN
+PLAN-ROOT SINK
+|
+05:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+04:NESTED LOOP JOIN [CROSS JOIN]
+|
+|--03:SELECT
+|  |  predicates: int_col IS NULL
+|  |
+|  02:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  01:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|     limit: 2
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+09:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+05:AGGREGATE
+|  output: count(*)
+|
+04:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|
+|--07:EXCHANGE [BROADCAST]
+|  |
+|  03:SELECT
+|  |  predicates: int_col IS NULL
+|  |
+|  02:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  06:EXCHANGE [UNPARTITIONED]
+|  |  limit: 2
+|  |
+|  01:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|     limit: 2
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Binary predicate with constant must not be pushed down
+# to the scan node of the inline view.
+select count(1) from functional.alltypes
+where (select int_col from functional.alltypes) > 10
+---- PLAN
+PLAN-ROOT SINK
+|
+05:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+04:NESTED LOOP JOIN [CROSS JOIN]
+|
+|--03:SELECT
+|  |  predicates: int_col > 10
+|  |
+|  02:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  01:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|     limit: 2
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+09:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+05:AGGREGATE
+|  output: count(*)
+|
+04:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|
+|--07:EXCHANGE [BROADCAST]
+|  |
+|  03:SELECT
+|  |  predicates: int_col > 10
+|  |
+|  02:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  06:EXCHANGE [UNPARTITIONED]
+|  |  limit: 2
+|  |
+|  01:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|     limit: 2
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Runtime scalar subquery with offset.
+select count(*) from functional.alltypes
+where 7 = (select id from functional.alltypestiny
+           order by id limit 8 offset 7)
+---- PLAN
+PLAN-ROOT SINK
+|
+06:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+05:NESTED LOOP JOIN [CROSS JOIN]
+|
+|--04:SELECT
+|  |  predicates: id = 7
+|  |
+|  03:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  02:TOP-N [LIMIT=2 OFFSET=7]
+|  |  order by: id ASC
+|  |
+|  01:SCAN HDFS [functional.alltypestiny]
+|     partitions=4/4 files=4 size=460B
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+10:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|
+09:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE
+|  output: count(*)
+|
+05:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|
+|--08:EXCHANGE [BROADCAST]
+|  |
+|  04:SELECT
+|  |  predicates: id = 7
+|  |
+|  03:CARDINALITY CHECK
+|  |  limit: 1
+|  |
+|  07:MERGING-EXCHANGE [UNPARTITIONED]
+|  |  offset: 7
+|  |  order by: id ASC
+|  |  limit: 2
+|  |
+|  02:TOP-N [LIMIT=9]
+|  |  order by: id ASC
+|  |
+|  01:SCAN HDFS [functional.alltypestiny]
+|     partitions=4/4 files=4 size=460B
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
index 82930ad..162f4be 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
@@ -670,3 +670,41 @@ where c_custkey < 10 and c_custkey in
 ---- TYPES
 bigint
 ====
+---- QUERY
+# Create CardinalityCheckNode inside a subplan
+select c_custkey
+from customer c
+where c_custkey < (select o_orderkey
+                   from c.c_orders
+                   where o_orderkey = 6000000)
+---- RESULTS
+110063
+---- TYPES
+bigint
+====
+---- QUERY
+# Create CardinalityCheckNode inside a subplan
+# o_orderkey 6000000 and 5000000 belong to different customers
+select c_custkey
+from customer c
+where c_custkey < (select o_orderkey
+                   from c.c_orders
+                   where o_orderkey = 6000000 or o_orderkey = 5000000)
+order by c_custkey
+---- RESULTS
+24325
+110063
+---- TYPES
+bigint
+====
+---- QUERY
+# Create CardinalityCheckNode inside a subplan.
+# o_orderkey 6000000 and 4285920 belong to the same customer
+select c_custkey
+from customer c
+where c_custkey < (select o_orderkey
+                   from c.c_orders
+                   where o_orderkey = 6000000 or o_orderkey = 4285920)
+---- CATCH
+Subquery must not return more than one row: SELECT o_orderkey FROM c.c_orders WHERE o_orderkey = 6000000 OR o_orderkey = 4285920
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/6472ccdd/testdata/workloads/functional-query/queries/QueryTest/subquery.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/subquery.test b/testdata/workloads/functional-query/queries/QueryTest/subquery.test
index 4aaa665..2d691ed 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/subquery.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/subquery.test
@@ -847,6 +847,177 @@ WHERE EXISTS
    ORDER BY id LIMIT 10 OFFSET 6)
 ---- RESULTS
 0
+====
+---- QUERY
+# Uncorrelated subquery in binary predicate that returns scalar value at runtime
+SELECT id FROM functional.alltypessmall
+WHERE int_col =
+  (SELECT int_col
+   FROM functional.alltypessmall
+   WHERE id = 1)
+ORDER BY id
+---- RESULTS
+1
+11
+21
+26
+36
+46
+51
+61
+71
+76
+86
+96
+---- TYPES
+INT
+====
+---- QUERY
+# Uncorrelated subquery in arithmetic expr that returns scalar value at runtime
+SELECT id FROM functional.alltypessmall
+WHERE int_col =
+  3 * (SELECT int_col
+       FROM functional.alltypessmall
+       WHERE id = 1)
+ORDER BY id
+---- RESULTS
+3
+13
+23
+28
+38
+48
+53
+63
+73
+78
+88
+98
+---- TYPES
+INT
+====
+---- QUERY
+# Uncorrelated subquery in binary predicate that returns no rows.
+SELECT id FROM functional.alltypessmall
+WHERE int_col =
+  (SELECT int_col
+   FROM functional.alltypessmall
+   WHERE id = -123)
+ORDER BY id
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Uncorrelated subquery in arithmetic expr that returns no rows.
+SELECT id FROM functional.alltypessmall
+WHERE int_col =
+  3 * (SELECT int_col
+       FROM functional.alltypessmall
+       WHERE id = -123)
+ORDER BY id
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Uncorrelated subquery in binary predicate that returns multiple rows
+SELECT id FROM functional.alltypessmall
+WHERE int_col =
+  (SELECT int_col
+   FROM functional.alltypessmall)
+ORDER BY id
+---- RESULTS
+---- CATCH
+Query aborted:Subquery must not return more than one row:
+====
+---- QUERY
+# Uncorrelated subquery in arithmetic expr that returns multiple rows
+SELECT id FROM functional.alltypessmall
+WHERE int_col =
+  3 * (SELECT int_col
+       FROM functional.alltypessmall)
+ORDER BY id
+---- RESULTS
+---- CATCH
+Query aborted:Subquery must not return more than one row:
+====
+---- QUERY
+# Uncorrelated subquery in binary predicate that returns scalar value at runtime
+SELECT count(id) FROM functional.alltypes
+WHERE int_col =
+  (SELECT int_col
+   FROM functional.alltypessmall
+   WHERE id = 1)
+---- RESULTS
+730
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Uncorrelated subquery in arithmetic expr that returns scalar value at runtime
+SELECT count(id) FROM functional.alltypes
+WHERE int_col =
+  3 * (SELECT int_col
+       FROM functional.alltypessmall
+       WHERE id = 1)
+---- RESULTS
+730
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Uncorrelated subquery in binary predicate that returns scalar value at runtime
+# executed on a single node.
+set num_nodes=1;
+SELECT count(id) FROM functional.alltypes
+WHERE int_col =
+  (SELECT int_col
+   FROM functional.alltypessmall
+   WHERE id = 1)
+---- RESULTS
+730
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Uncorrelated subquery in arithmetic expr that returns scalar value at runtime
+# executed on a single node.
+set num_nodes=1;
+SELECT count(id) FROM functional.alltypes
+WHERE int_col =
+  3 * (SELECT int_col
+       FROM functional.alltypessmall
+       WHERE id = 1)
+---- RESULTS
+730
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Subquery that returns more than one row
+SELECT a FROM (values(1 a),(2),(3)) v
+WHERE a = (SELECT x FROM (values(1 x),(2),(3)) v)
+---- RESULTS
+---- CATCH
+Query aborted:Subquery must not return more than one row:
+====
+---- QUERY
+# Subquery that returns more than one row
+# The error message must not reveal the definition of functional.alltypes_view
+SELECT id FROM functional.alltypes
+WHERE id = (SELECT bigint_col FROM functional.alltypes_view)
+---- RESULTS
+---- CATCH
+Query aborted:Subquery must not return more than one row: SELECT bigint_col FROM functional.alltypes_view
+====
+---- QUERY
+# Runtime scalar subquery with offset.
+select count(*) from functional.alltypes
+where 7 = (select id from functional.alltypestiny
+           order by id limit 8 offset 7)
+---- RESULTS
+7300
 ---- TYPES
 BIGINT
 ====