You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/05 14:52:17 UTC

[impala] branch master updated (0a2261f -> f9ced75)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 0a2261f  IMPALA-8092: Add an admission controller debug page
     new 781cb91  IMPALA-7920: [DOCS] Document the LEVENSHTEIN function
     new 26b8b42  IMPALA-7565: Set TAcceptQueueServer connection_setup_pool to be multi-threaded by default
     new b08c8e3  IMPALA-8034: Improve planner tests
     new b7bd6ac  IMPALA-8093: Prefix time series counters with a hyphen
     new 3db3b3b  IMPALA-8150: Fix buggy AuditingTest::TestAccessEventsOnAuthFailure
     new f9ced75  IMPALA-7999: clean up start-*d.sh scripts

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/rpc/TAcceptQueueServer.cpp                  |  13 +-
 be/src/transport/TSaslServerTransport.cpp          |   1 +
 be/src/util/runtime-profile.cc                     |   4 +-
 bin/start-catalogd.sh                              |  75 --
 bin/{run-jdbc-client.sh => start-daemon.sh}        |  14 +-
 bin/start-impala-cluster.py                        | 129 +--
 bin/start-impalad.sh                               | 103 ---
 bin/start-statestored.sh                           |  64 --
 docs/topics/impala_string_functions.xml            |  55 ++
 .../impala/authorization/AuthorizationConfig.java  |   3 +-
 .../org/apache/impala/service/JniFrontend.java     |   1 -
 .../org/apache/impala/analysis/AuditingTest.java   |   6 +-
 .../apache/impala/analysis/AuthorizationTest.java  |  88 +-
 .../org/apache/impala/common/FrontendTestBase.java |   1 -
 .../org/apache/impala/planner/PlannerTest.java     |  32 +
 .../org/apache/impala/util/SentryProxyTest.java    |   1 -
 .../queries/PlannerTest/card-inner-join.test       | 985 +++++++++++++++++++++
 .../queries/PlannerTest/card-multi-join.test       | 275 ++++++
 .../queries/PlannerTest/card-outer-join.test       | 812 +++++++++++++++++
 .../queries/PlannerTest/card-scan.test             | 442 +++++++++
 tests/common/custom_cluster_test_suite.py          |   2 +-
 tests/common/impala_cluster.py                     |  66 +-
 tests/custom_cluster/test_breakpad.py              |   4 +-
 tests/custom_cluster/test_redaction.py             |  10 +-
 tests/custom_cluster/test_scratch_disk.py          |  10 +-
 tests/query_test/test_observability.py             |   5 +
 26 files changed, 2797 insertions(+), 404 deletions(-)
 delete mode 100755 bin/start-catalogd.sh
 copy bin/{run-jdbc-client.sh => start-daemon.sh} (64%)
 delete mode 100755 bin/start-impalad.sh
 delete mode 100755 bin/start-statestored.sh
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/card-inner-join.test
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/card-multi-join.test
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/card-outer-join.test
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/card-scan.test


[impala] 04/06: IMPALA-8093: Prefix time series counters with a hyphen

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b7bd6ac44dfbc5f82e5cba52bf7e751f62a35751
Author: Yongzhi Chen <yc...@cloudera.com>
AuthorDate: Tue Jan 29 13:45:44 2019 -0500

    IMPALA-8093: Prefix time series counters with a hyphen
    
    The change makes profiles prefix counters consistent.
    Only TimeSeriesCounters are affected.
    
    Testing:
    Add TimeSeriesCounter prefix checks in test_observability tests.
    Manual Tests: Run query and check profile for MemoryUsage and
    ThreadUsage. Following section of profile shows that TimeSeriesCounters
    are consistent with other counters:
    
        Fragment F00:
            . . .
            Fragment Instance Lifecycle Event Timeline: 273.841ms
               - Prepare Finished: 1.511ms (1.511ms)
               . . .
             - MemoryUsage(500.000ms): 2.81 MB
             - ThreadUsage(500.000ms): 1
             - AverageThreadTokens: 1.00
             - BloomFilterBytes: 1.00 MB (1048576)
             - ExchangeScanRatio: 0.00
             - PeakMemoryUsage: 54.26 MB (56891933)
             - PeakReservation: 53.00 MB (55574528)
    
    Change-Id: I2e3f08da765b3e6dedead45760729cbc5e8fb6fa
    Reviewed-on: http://gerrit.cloudera.org:8080/12296
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/util/runtime-profile.cc         | 4 ++--
 tests/query_test/test_observability.py | 5 +++++
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 9892c28..5696486 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -767,14 +767,14 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
 
   {
     // Print all time series counters as following:
-    // <Name> (<period>): <val1>, <val2>, <etc>
+    //    - <Name> (<period>): <val1>, <val2>, <etc>
     SpinLock* lock;
     int num, period;
     lock_guard<SpinLock> l(counter_map_lock_);
     for (const TimeSeriesCounterMap::value_type& v: time_series_counter_map_) {
       const int64_t* samples = v.second->samples_.GetSamples(&num, &period, &lock);
       if (num > 0) {
-        stream << prefix << "  " << v.first << "("
+        stream << prefix << "   - " << v.first << "("
                << PrettyPrinter::Print(period * 1000000L, TUnit::TIME_NS)
                << "): ";
         for (int i = 0; i < num; ++i) {
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 963ace1..9c85e2e 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -399,6 +399,11 @@ class TestObservability(ImpalaTestSuite):
     query = """select count(*) from tpch_parquet.orders o inner join tpch_parquet.lineitem
         l on o.o_orderkey = l.l_orderkey group by o.o_clerk limit 10"""
     profile = self.execute_query(query).runtime_profile
+
+    # TimeSeriesCounter should be prefixed with a hyphen.
+    assert "  MemoryUsage" not in profile
+    assert "- MemoryUsage" in profile
+
     assert "ExchangeScanRatio: 3.19" in profile
 
     keys = ["TotalBytesSent", "TotalScanBytesSent", "TotalInnerBytesSent"]


[impala] 02/06: IMPALA-7565: Set TAcceptQueueServer connection_setup_pool to be multi-threaded by default

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 26b8b427188052735ce52f319aebe2ff1ac17e2a
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Wed Dec 19 15:41:25 2018 -0800

    IMPALA-7565: Set TAcceptQueueServer connection_setup_pool to be
    multi-threaded by default
    
    Bumping up the thread count for the threads that process the
    post-accept, pre-setup connection queue to 2 in order to minimize the
    chances of a single client holding up others if the thread gets stuck
    in that step. This patch also un-hides the advanced startup flag
    'accepted_cnxn_setup_thread_pool_size'.
    
    Testing:
    - Ran exhaustive tests with a thread pool of 10.
    - Scanned manually through the code and parts of thrift lib to make
      sure the APIs are used in a thread safe manner.
    - Rapidly executed openssl connect-disconnect on the impalad's
      hs2 server port on a thread sanitizer build. No data races were
      flagged by the thread sanitizer.
    - Ran a stress test on an 8 node secure cluster with 100 concurrent
      streams executing the tpch workload with a scale of 5g and a
      connection_setup_pool of 100.
    
    Change-Id: I053120d4c3153ddbe5261acd28388be6cd191908
    Reviewed-on: http://gerrit.cloudera.org:8080/12249
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/TAcceptQueueServer.cpp         | 13 ++++---------
 be/src/transport/TSaslServerTransport.cpp |  1 +
 2 files changed, 5 insertions(+), 9 deletions(-)

diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 730901b..8f4a01a 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -29,11 +29,10 @@ DEFINE_int32(accepted_cnxn_queue_depth, 10000,
     "(Advanced) The size of the post-accept, pre-setup connection queue in each thrift "
     "server set up to service Impala internal and external connections.");
 
-DEFINE_int32_hidden(accepted_cnxn_setup_thread_pool_size, 1,
+DEFINE_int32(accepted_cnxn_setup_thread_pool_size, 2,
     "(Advanced) The size of the thread pool that is used to process the "
     "post-accept, pre-setup connection queue in each thrift server set up to service "
-    "Impala internal and external connections. Warning: This is untested for values "
-    "greater than 1 which might exhibit unpredictable behavior and/or cause crashes.");
+    "Impala internal and external connections.");
 
 namespace apache {
 namespace thrift {
@@ -216,14 +215,10 @@ void TAcceptQueueServer::serve() {
   }
 
   if (FLAGS_accepted_cnxn_setup_thread_pool_size > 1) {
-    LOG(WARNING) << "connection_setup_thread_pool_size is set to "
-                 << FLAGS_accepted_cnxn_setup_thread_pool_size
-                 << ". Values greater than 1 are untested and might exhibit "
-                    "unpredictable behavior and/or cause crashes.";
+    LOG(INFO) << "connection_setup_thread_pool_size is set to "
+              << FLAGS_accepted_cnxn_setup_thread_pool_size;
   }
   // New - this is the thread pool used to process the internal accept queue.
-  // TODO: IMPALA-7565: Make sure the related thrift code is thread safe and subsequently
-  // enable multi-threading by default.
   ThreadPool<shared_ptr<TTransport>> connection_setup_pool("setup-server", "setup-worker",
       FLAGS_accepted_cnxn_setup_thread_pool_size, FLAGS_accepted_cnxn_queue_depth,
       [this](int tid, const shared_ptr<TTransport>& item) {
diff --git a/be/src/transport/TSaslServerTransport.cpp b/be/src/transport/TSaslServerTransport.cpp
index 15d548e..9bd7795 100644
--- a/be/src/transport/TSaslServerTransport.cpp
+++ b/be/src/transport/TSaslServerTransport.cpp
@@ -177,6 +177,7 @@ boost::shared_ptr<TTransport> TSaslServerTransport::Factory::getTransport(
   socket->setSendTimeout(0);
   {
     lock_guard<mutex> l(transportMap_mutex_);
+    DCHECK(transportMap_.find(trans) == transportMap_.end());
     transportMap_[trans] = ret_transport;
   }
   return ret_transport;


[impala] 01/06: IMPALA-7920: [DOCS] Document the LEVENSHTEIN function

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 781cb915e5bfc01c3fb198cbf195d67f01b90aa1
Author: Alex Rodoni <ar...@cloudera.com>
AuthorDate: Mon Feb 4 15:48:10 2019 -0800

    IMPALA-7920: [DOCS] Document the LEVENSHTEIN function
    
    Change-Id: Iefeb38871f14c87f8cce7feb9197ab0e79429d04
    Reviewed-on: http://gerrit.cloudera.org:8080/12357
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Greg Rahn <gr...@cloudera.com>
---
 docs/topics/impala_string_functions.xml | 55 +++++++++++++++++++++++++++++++++
 1 file changed, 55 insertions(+)

diff --git a/docs/topics/impala_string_functions.xml b/docs/topics/impala_string_functions.xml
index a32dbf7..356f700 100644
--- a/docs/topics/impala_string_functions.xml
+++ b/docs/topics/impala_string_functions.xml
@@ -135,6 +135,10 @@ under the License.
       </li>
 
       <li>
+        <xref href="#string_functions/levenshtein">LEVENSHTEIN, LE_DST</xref>
+      </li>
+
+      <li>
         <xref href="#string_functions/locate">LOCATE</xref>
       </li>
 
@@ -767,7 +771,58 @@ select instr('foo bar bletch', 'b', 1, null);
         </dd>
 
       </dlentry>
+    </dl>
+
+    <dl>
+      <dlentry id="levenshtein" rev="3.2">
+
+        <dt>
+          LEVENSHTEIN(STRING str1, STRING str2), <ph id="le_dst"
+            >LE_DST(STRING
+          str1, STRING str2)</ph>
+        </dt>
+
+        <dd>
+          <b>Purpose:</b> Returns the Levenshtein distance between two input strings. The
+          Levenshtein distance between two strings is the minimum number of single-character
+          edits required to transform one string to other. The function indicates how different
+          the input strings are.
+          <p>
+            <b>Return type:</b> <codeph>INT</codeph>
+          </p>
+          <b>Usage notes:</b>
+          <p>
+            If input strings are equal, the function returns 0.
+          </p>
+
+          <p>
+            If either input exceeds 255 characters, the function returns an error.
+          </p>
+
+          <p>
+            If either input string is <codeph>NULL</codeph>, the function returns
+            <codeph>NULL</codeph>.
+          </p>
+
+          <p>
+            If the length of one input string is zero, the function returns the length of the
+            other string.
+          </p>
+
+          <p>
+            <b>Example:</b>
+          </p>
+          <p>
+            <codeph>LEVENSHTEIN ('welcome', 'We come')</codeph> returns 2, first change to
+            replace '<codeph>w</codeph>' to '<codeph>W</codeph>', and then to replace
+            '<codeph>l</codeph>' to a space character.
+          </p>
+        </dd>
+
+      </dlentry>
+    </dl>
 
+    <dl>
       <dlentry id="locate">
 
         <dt>


[impala] 05/06: IMPALA-8150: Fix buggy AuditingTest::TestAccessEventsOnAuthFailure

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3db3b3b0a74d003ef497cd76d3bd66480f7dc4d7
Author: Fredy Wijaya <fw...@cloudera.com>
AuthorDate: Fri Feb 1 11:14:12 2019 -0800

    IMPALA-8150: Fix buggy AuditingTest::TestAccessEventsOnAuthFailure
    
    I was able to reproduce this issue by running AuditingTest individually.
    Running all tests did not seem to reproduce this issue, which may be due
    to the test depends on the state from other tests. Looking at the code,
    the test was poorly written mainly because of two reasons:
    - The Sentry config was set to an empty string, which is not a valid
      config file.
    - Calling AuthorizationConfig.validateConfig() was missing.
    
    This patch ensures validateConfig() is always called when
    AuthorizationConfig instance is created.
    
    Testing:
    - Ran AuditingTest individually
    - Ran all FE tests
    - Ran all E2E authorization tests
    
    Change-Id: I712697e6d5a3e171b259a781bd07de9871a29b95
    Reviewed-on: http://gerrit.cloudera.org:8080/12334
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/authorization/AuthorizationConfig.java  |  3 +-
 .../org/apache/impala/service/JniFrontend.java     |  1 -
 .../org/apache/impala/analysis/AuditingTest.java   |  6 +-
 .../apache/impala/analysis/AuthorizationTest.java  | 88 +++++++++++-----------
 .../org/apache/impala/common/FrontendTestBase.java |  1 -
 .../org/apache/impala/util/SentryProxyTest.java    |  1 -
 6 files changed, 47 insertions(+), 53 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizationConfig.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizationConfig.java
index 80611d1..96b65d8 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizationConfig.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizationConfig.java
@@ -51,6 +51,7 @@ public class AuthorizationConfig {
       policyProviderClassName = policyProviderClassName.trim();
     }
     policyProviderClassName_ = policyProviderClassName;
+    validateConfig();
   }
 
   /**
@@ -74,7 +75,7 @@ public class AuthorizationConfig {
    * Validates the authorization configuration and throws an AuthorizationException
    * if any problems are found. If authorization is disabled, config checks are skipped.
    */
-  public void validateConfig() throws IllegalArgumentException {
+  private void validateConfig() throws IllegalArgumentException {
     // If authorization is not enabled, config checks are skipped.
     if (!isEnabled()) return;
 
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index d9f3f06..0cf5390 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -142,7 +142,6 @@ public class JniFrontend {
     AuthorizationConfig authConfig = new AuthorizationConfig(cfg.server_name,
         cfg.authorization_policy_file, cfg.sentry_config,
         cfg.authorization_policy_provider_class);
-    authConfig.validateConfig();
     if (authConfig.isEnabled()) {
       LOG.info(String.format("Authorization is 'ENABLED' using %s",
           authConfig.isFileBasedPolicy() ? " file based policy from: " +
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuditingTest.java b/fe/src/test/java/org/apache/impala/analysis/AuditingTest.java
index 484ae8e..680efa9 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuditingTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuditingTest.java
@@ -364,12 +364,12 @@ public class AuditingTest extends FrontendTestBase {
   }
 
   @Test
-  public void TestAccessEventsOnAuthFailure() throws AuthorizationException,
-      ImpalaException {
+  public void TestAccessEventsOnAuthFailure() throws ImpalaException {
     // The policy file doesn't exist so all operations will result in
     // an AuthorizationError
     AuthorizationConfig config = AuthorizationConfig.createHadoopGroupAuthConfig(
-        "server1", "/does/not/exist", "");
+        "server1", "/does/not/exist", System.getenv("IMPALA_HOME") +
+        "/fe/src/test/resources/sentry-site.xml");
     try (ImpaladCatalog catalog = new ImpaladTestCatalog(config)) {
       Frontend fe = new Frontend(config, catalog);
       AnalysisContext analysisCtx = createAnalysisCtx(config);
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
index ccdc8c4..15a06ee 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
@@ -141,7 +141,6 @@ public class AuthorizationTest extends FrontendTestBase {
     testCtxs_ = new ArrayList<>();
     // Create and init file based auth config.
     AuthorizationConfig filePolicyAuthzConfig = createPolicyFileAuthzConfig();
-    filePolicyAuthzConfig.validateConfig();
     ImpaladTestCatalog filePolicyCatalog = new ImpaladTestCatalog(filePolicyAuthzConfig);
     testCtxs_.add(new TestContext(filePolicyAuthzConfig, filePolicyCatalog));
 
@@ -180,7 +179,6 @@ public class AuthorizationTest extends FrontendTestBase {
     AuthorizationConfig result =
         AuthorizationConfig.createHadoopGroupAuthConfig("server1", AUTHZ_POLICY_FILE,
         System.getenv("IMPALA_HOME") + "/fe/src/test/resources/sentry-site.xml");
-    result.validateConfig();
     return result;
   }
 
@@ -841,7 +839,7 @@ public class AuthorizationTest extends FrontendTestBase {
     //     </value>
     //   </property>
     AuthorizationConfig authzConfig = new AuthorizationConfig("server1",
-        AUTHZ_POLICY_FILE, "",
+        AUTHZ_POLICY_FILE, ctx_.authzConfig.getSentryConfig().getConfigFile(),
         LocalGroupResourceAuthorizationProvider.class.getName());
     try (ImpaladCatalog catalog = new ImpaladTestCatalog(authzConfig)) {
       // This test relies on the auth_to_local rule -
@@ -903,7 +901,8 @@ public class AuthorizationTest extends FrontendTestBase {
     if (ctx_.authzConfig.isFileBasedPolicy()) {
       // Authorization config that has a different server name from policy file.
       TestWithIncorrectConfig(AuthorizationConfig.createHadoopGroupAuthConfig(
-          "differentServerName", AUTHZ_POLICY_FILE, ""),
+          "differentServerName", AUTHZ_POLICY_FILE,
+          ctx_.authzConfig.getSentryConfig().getConfigFile()),
           new User(System.getProperty("user.name")));
     } // TODO: Test using policy server.
   }
@@ -917,7 +916,8 @@ public class AuthorizationTest extends FrontendTestBase {
     // Use a HadoopGroupProvider in this case so the user -> group mappings can still be
     // resolved in the absence of the policy file.
     TestWithIncorrectConfig(AuthorizationConfig.createHadoopGroupAuthConfig("server1",
-        AUTHZ_POLICY_FILE + "_does_not_exist", ""),
+        AUTHZ_POLICY_FILE + "_does_not_exist",
+        ctx_.authzConfig.getSentryConfig().getConfigFile()),
         new User(System.getProperty("user.name")));
   }
 
@@ -927,87 +927,83 @@ public class AuthorizationTest extends FrontendTestBase {
     // Valid configs pass validation.
     AuthorizationConfig config = AuthorizationConfig.createHadoopGroupAuthConfig(
         "server1", AUTHZ_POLICY_FILE, sentryConfig);
-    config.validateConfig();
     Assert.assertTrue(config.isEnabled());
     Assert.assertTrue(config.isFileBasedPolicy());
 
     config = AuthorizationConfig.createHadoopGroupAuthConfig("server1", null,
         sentryConfig);
-    config.validateConfig();
     Assert.assertTrue(config.isEnabled());
     Assert.assertTrue(!config.isFileBasedPolicy());
 
     // Invalid configs
     // No sentry configuration file.
-    config = AuthorizationConfig.createHadoopGroupAuthConfig(
-        "server1", AUTHZ_POLICY_FILE, null);
-    Assert.assertTrue(config.isEnabled());
     try {
-      config.validateConfig();
+      config = AuthorizationConfig.createHadoopGroupAuthConfig(
+          "server1", AUTHZ_POLICY_FILE, null);
+      Assert.assertTrue(config.isEnabled());
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "A valid path to a sentry-site.xml config " +
-          "file must be set using --sentry_config to enable authorization.");
+      Assert.assertEquals("A valid path to a sentry-site.xml config " +
+          "file must be set using --sentry_config to enable authorization.",
+          e.getMessage());
     }
 
     // Empty / null server name.
-    config = AuthorizationConfig.createHadoopGroupAuthConfig(
-        "", AUTHZ_POLICY_FILE, sentryConfig);
-    Assert.assertTrue(config.isEnabled());
     try {
-      config.validateConfig();
+      config = AuthorizationConfig.createHadoopGroupAuthConfig(
+          "", AUTHZ_POLICY_FILE, sentryConfig);
+      Assert.assertTrue(config.isEnabled());
       fail("Expected configuration to fail.");
     } catch (IllegalArgumentException e) {
-      Assert.assertEquals(e.getMessage(),
+      Assert.assertEquals(
           "Authorization is enabled but the server name is null or empty. Set the " +
-          "server name using the impalad --server_name flag.");
+          "server name using the impalad --server_name flag.",
+          e.getMessage());
     }
-    config = AuthorizationConfig.createHadoopGroupAuthConfig(null, AUTHZ_POLICY_FILE,
-        sentryConfig);
-    Assert.assertTrue(config.isEnabled());
     try {
-      config.validateConfig();
+      config = AuthorizationConfig.createHadoopGroupAuthConfig(null, AUTHZ_POLICY_FILE,
+          sentryConfig);
+      Assert.assertTrue(config.isEnabled());
       fail("Expected configuration to fail.");
     } catch (IllegalArgumentException e) {
-      Assert.assertEquals(e.getMessage(),
+      Assert.assertEquals(
           "Authorization is enabled but the server name is null or empty. Set the " +
-          "server name using the impalad --server_name flag.");
+          "server name using the impalad --server_name flag.",
+          e.getMessage());
     }
 
     // Sentry config file does not exist.
-    config = AuthorizationConfig.createHadoopGroupAuthConfig("server1", "",
-        "/path/does/not/exist.xml");
-    Assert.assertTrue(config.isEnabled());
     try {
-      config.validateConfig();
+      config = AuthorizationConfig.createHadoopGroupAuthConfig("server1", "",
+          "/path/does/not/exist.xml");
+      Assert.assertTrue(config.isEnabled());
       fail("Expected configuration to fail.");
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(),
-          "Sentry configuration file does not exist: \"/path/does/not/exist.xml\"");
+      Assert.assertEquals(
+          "Sentry configuration file does not exist: \"/path/does/not/exist.xml\"",
+          e.getMessage());
     }
 
     // Invalid ResourcePolicyProvider class name.
-    config = new AuthorizationConfig("server1", AUTHZ_POLICY_FILE, "",
-        "ClassDoesNotExist");
-    Assert.assertTrue(config.isEnabled());
     try {
-      config.validateConfig();
-      fail("Expected configuration to fail.");
+      config = new AuthorizationConfig("server1", AUTHZ_POLICY_FILE, sentryConfig,
+          "ClassDoesNotExist");
+      Assert.assertTrue(config.isEnabled());      fail("Expected configuration to fail.");
     } catch (IllegalArgumentException e) {
-      Assert.assertEquals(e.getMessage(),
-          "The authorization policy provider class 'ClassDoesNotExist' was not found.");
+      Assert.assertEquals(
+          "The authorization policy provider class 'ClassDoesNotExist' was not found.",
+          e.getMessage());
     }
 
     // Valid class name, but class is not derived from ResourcePolicyProvider
-    config = new AuthorizationConfig("server1", AUTHZ_POLICY_FILE, "",
-        this.getClass().getName());
-    Assert.assertTrue(config.isEnabled());
     try {
-      config.validateConfig();
-      fail("Expected configuration to fail.");
+      config = new AuthorizationConfig("server1", AUTHZ_POLICY_FILE, sentryConfig,
+          this.getClass().getName());
+      Assert.assertTrue(config.isEnabled());      fail("Expected configuration to fail.");
     } catch (IllegalArgumentException e) {
-      Assert.assertEquals(e.getMessage(), String.format("The authorization policy " +
+      Assert.assertEquals(String.format("The authorization policy " +
           "provider class '%s' must be a subclass of '%s'.", this.getClass().getName(),
-          ResourceAuthorizationProvider.class.getName()));
+          ResourceAuthorizationProvider.class.getName()),
+          e.getMessage());
     }
 
     // Config validations skipped if authorization disabled
@@ -1027,7 +1023,7 @@ public class AuthorizationTest extends FrontendTestBase {
     // Use an authorization configuration that uses the
     // LocalGroupResourceAuthorizationProvider.
     AuthorizationConfig authzConfig = new AuthorizationConfig("server1",
-        AUTHZ_POLICY_FILE, "",
+        AUTHZ_POLICY_FILE, ctx_.authzConfig.getSentryConfig().getConfigFile(),
         LocalGroupResourceAuthorizationProvider.class.getName());
     try (ImpaladCatalog catalog = new ImpaladTestCatalog(authzConfig)) {
       // Create an analysis context + FE with the test user
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 85ecc4c..2b6a640 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -451,7 +451,6 @@ public class FrontendTestBase {
     AuthorizationConfig authzConfig = AuthorizationConfig.createHadoopGroupAuthConfig(
         "server1", null, System.getenv("IMPALA_HOME") +
             "/fe/src/test/resources/sentry-site.xml");
-    authzConfig.validateConfig();
     return authzConfig;
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/util/SentryProxyTest.java b/fe/src/test/java/org/apache/impala/util/SentryProxyTest.java
index 7ab8e56..9d24f62 100644
--- a/fe/src/test/java/org/apache/impala/util/SentryProxyTest.java
+++ b/fe/src/test/java/org/apache/impala/util/SentryProxyTest.java
@@ -61,7 +61,6 @@ public class SentryProxyTest {
     authzConfig_ = AuthorizationConfig.createHadoopGroupAuthConfig(
         SENTRY_SERVER, null, System.getenv("IMPALA_HOME") +
             "/fe/src/test/resources/sentry-site.xml");
-    authzConfig_.validateConfig();
     sentryService_ = new SentryPolicyService(authzConfig_.getSentryConfig());
   }
 


[impala] 03/06: IMPALA-8034: Improve planner tests

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b08c8e3db2e769609f47d5c0ed87c547d41d1c8b
Author: paul-rogers <pr...@cloudera.com>
AuthorDate: Mon Dec 31 17:47:13 2018 -0800

    IMPALA-8034: Improve planner tests
    
    The FE PlannerTest cases are good, but often unrealistic and overly
    complicated, especially when trying to verify selectivity and
    cardinality. This commit adds new tests that isolate each bit of the
    work for detailed inspection.
    
    The current version of the tests highlighlights a number of bugs to be
    fixed by ongoing work. This commit establishes a clear baseline of
    current behavior, even if that behavior is not quite right. A "Bug:"
    comment explains the expected result.
    
    Tests: These are tests, no production code was changed.
    
    Change-Id: I40e59e08d7ddf2b0391d42e50511aaf95d7275f4
    Reviewed-on: http://gerrit.cloudera.org:8080/12145
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/planner/PlannerTest.java     |  32 +
 .../queries/PlannerTest/card-inner-join.test       | 985 +++++++++++++++++++++
 .../queries/PlannerTest/card-multi-join.test       | 275 ++++++
 .../queries/PlannerTest/card-outer-join.test       | 812 +++++++++++++++++
 .../queries/PlannerTest/card-scan.test             | 442 +++++++++
 5 files changed, 2546 insertions(+)

diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 4345bfb..cf1b0d8 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -49,6 +49,38 @@ import com.google.common.collect.Lists;
 // All planner tests, except for S3 specific tests should go here.
 public class PlannerTest extends PlannerTestBase {
 
+  /**
+   * Scan node cardinality test
+   */
+  @Test
+  public void testScanCardinality() {
+    runPlannerTestFile("card-scan");
+  }
+
+  /**
+   * Inner join cardinality test
+   */
+  @Test
+  public void testInnerJoinCardinality() {
+    runPlannerTestFile("card-inner-join");
+  }
+
+  /**
+   * Outer join cardinality test
+   */
+  @Test
+  public void testOuterJoinCardinality() {
+    runPlannerTestFile("card-outer-join");
+  }
+
+  /**
+   * 3+ table join cardinality test
+   */
+  @Test
+  public void testMultiJoinCardinality() {
+    runPlannerTestFile("card-multi-join");
+  }
+
   @Test
   public void testPredicatePropagation() {
     runPlannerTestFile("predicate-propagation");
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/card-inner-join.test b/testdata/workloads/functional-planner/queries/PlannerTest/card-inner-join.test
new file mode 100644
index 0000000..2da3d1a
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/card-inner-join.test
@@ -0,0 +1,985 @@
+# Join cardinality tests
+# Each has a qualitative description of the quantitative math
+#
+# Simplest join: Cartesian product
+# card = |T1| * |T2|
+select m.id, d.id
+from functional.alltypestiny m,
+     functional.alltypessmall d
+---- PLAN
+PLAN-ROOT SINK
+|
+02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=8B cardinality=800
+|
+|--00:SCAN HDFS [functional.alltypestiny m]
+|     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
+|
+01:SCAN HDFS [functional.alltypessmall d]
+   partitions=4/4 files=4 size=6.32KB
+   row-size=4B cardinality=100
+====
+# Cartesian product between table and a no-stats, 0-row table
+select a.id, b.id
+from functional.alltypestiny a,
+     functional.alltypesnopart b
+---- PLAN
+PLAN-ROOT SINK
+|
+02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=8B cardinality=0
+|
+|--01:SCAN HDFS [functional.alltypesnopart b]
+|     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
+|
+00:SCAN HDFS [functional.alltypestiny a]
+   partitions=4/4 files=4 size=460B
+   row-size=4B cardinality=8
+====
+# Cartesian product between two empty tables
+select a.id, b.id
+from functional.alltypesnopart a,
+     functional.alltypesnopart b
+---- PLAN
+PLAN-ROOT SINK
+|
+02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=8B cardinality=0
+|
+|--01:SCAN HDFS [functional.alltypesnopart b]
+|     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
+|
+00:SCAN HDFS [functional.alltypesnopart a]
+   partitions=1/1 files=0 size=0B
+   row-size=4B cardinality=0
+====
+# Simplest M:1 join, join between a master table (M) and a detail table (D)
+# (small):(tiny)
+# |join| = |D|
+select m.id, d.id
+from functional.alltypestiny m,
+     functional.alltypessmall d
+where m.id = d.id
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.id = m.id
+|  runtime filters: RF000 <- m.id
+|  row-size=8B cardinality=9
+|
+|--00:SCAN HDFS [functional.alltypestiny m]
+|     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
+|
+01:SCAN HDFS [functional.alltypessmall d]
+   partitions=4/4 files=4 size=6.32KB
+   runtime filters: RF000 -> d.id
+   row-size=4B cardinality=100
+====
+# Same, but reverse WHERE order
+select m.id, d.id
+from functional.alltypestiny m,
+     functional.alltypessmall d
+where d.id = m.id
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.id = m.id
+|  runtime filters: RF000 <- m.id
+|  row-size=8B cardinality=9
+|
+|--00:SCAN HDFS [functional.alltypestiny m]
+|     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
+|
+01:SCAN HDFS [functional.alltypessmall d]
+   partitions=4/4 files=4 size=6.32KB
+   runtime filters: RF000 -> d.id
+   row-size=4B cardinality=100
+====
+# Same, but use JOIN ... ON
+select m.id, d.id
+from functional.alltypestiny m
+join functional.alltypessmall d on m.id = d.id
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.id = m.id
+|  runtime filters: RF000 <- m.id
+|  row-size=8B cardinality=9
+|
+|--00:SCAN HDFS [functional.alltypestiny m]
+|     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
+|
+01:SCAN HDFS [functional.alltypessmall d]
+   partitions=4/4 files=4 size=6.32KB
+   runtime filters: RF000 -> d.id
+   row-size=4B cardinality=100
+====
+# Same, reverse ON order
+select m.id, d.id
+from functional.alltypestiny m
+join functional.alltypessmall d on d.id = m.id
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.id = m.id
+|  runtime filters: RF000 <- m.id
+|  row-size=8B cardinality=9
+|
+|--00:SCAN HDFS [functional.alltypestiny m]
+|     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
+|
+01:SCAN HDFS [functional.alltypessmall d]
+   partitions=4/4 files=4 size=6.32KB
+   runtime filters: RF000 -> d.id
+   row-size=4B cardinality=100
+====
+# Same, but reverse FROM order
+select m.id, d.id
+from functional.alltypessmall d,
+     functional.alltypestiny m
+where m.id = d.id
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.id = m.id
+|  runtime filters: RF000 <- m.id
+|  row-size=8B cardinality=9
+|
+|--01:SCAN HDFS [functional.alltypestiny m]
+|     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
+|
+00:SCAN HDFS [functional.alltypessmall d]
+   partitions=4/4 files=4 size=6.32KB
+   runtime filters: RF000 -> d.id
+   row-size=4B cardinality=100
+====
+# Same, but use JOIN ... ON
+select m.id, d.id
+from functional.alltypessmall m
+join functional.alltypestiny d on m.id = d.id
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: m.id = d.id
+|  runtime filters: RF000 <- d.id
+|  row-size=8B cardinality=9
+|
+|--01:SCAN HDFS [functional.alltypestiny d]
+|     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
+|
+00:SCAN HDFS [functional.alltypessmall m]
+   partitions=4/4 files=4 size=6.32KB
+   runtime filters: RF000 -> m.id
+   row-size=4B cardinality=100
+====
+# Similar, but with big tables
+# |M| = |customer| = 150K
+# |M.pk| = |customer.c_custkey| = |customer| = 150K
+# |D| = |orders| = 1.5M
+# |D.fk| = |orders.o_custkey| = 100K
+# |join| = |D| = |orders|
+select m.c_custkey, d.o_orderkey
+from tpch.customer m,
+     tpch.orders d
+where m.c_custkey = d.o_custkey
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.o_custkey = m.c_custkey
+|  runtime filters: RF000 <- m.c_custkey
+|  row-size=24B cardinality=1.50M
+|
+|--00:SCAN HDFS [tpch.customer m]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=8B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders d]
+   partitions=1/1 files=1 size=162.56MB
+   runtime filters: RF000 -> d.o_custkey
+   row-size=16B cardinality=1.50M
+====
+# Simple join between table and a no-stats, 0-row table
+# Bug: expected cardinality = 0
+select a.id, b.id
+from functional.alltypestiny a,
+     functional.alltypesnopart b
+WHERE a.id = b.id
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  runtime filters: RF000 <- b.id
+|  row-size=8B cardinality=8
+|
+|--01:SCAN HDFS [functional.alltypesnopart b]
+|     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
+|
+00:SCAN HDFS [functional.alltypestiny a]
+   partitions=4/4 files=4 size=460B
+   runtime filters: RF000 -> a.id
+   row-size=4B cardinality=8
+====
+# Cartesian product between two empty tables
+select a.id, b.id
+from functional.alltypesnopart a,
+     functional.alltypesnopart b
+WHERE a.id = b.id
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  runtime filters: RF000 <- b.id
+|  row-size=8B cardinality=0
+|
+|--01:SCAN HDFS [functional.alltypesnopart b]
+|     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
+|
+00:SCAN HDFS [functional.alltypesnopart a]
+   partitions=1/1 files=0 size=0B
+   runtime filters: RF000 -> a.id
+   row-size=4B cardinality=0
+====
+# Selective master filtering
+# |join| = |D|/|D.fk| = 15
+select m.c_custkey, d.o_orderkey
+from tpch.customer m,
+     tpch.orders d
+where m.c_custkey = d.o_custkey
+  and m.c_name = 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.o_custkey = m.c_custkey
+|  runtime filters: RF000 <- m.c_custkey
+|  row-size=54B cardinality=16
+|
+|--00:SCAN HDFS [tpch.customer m]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: m.c_name = 'foo'
+|     row-size=38B cardinality=1
+|
+01:SCAN HDFS [tpch.orders d]
+   partitions=1/1 files=1 size=162.56MB
+   runtime filters: RF000 -> d.o_custkey
+   row-size=16B cardinality=1.50M
+====
+# Selective master filtering, column common with detail
+# |join| = |D|/|D.fk| = 15
+# Bug: IMPALA-8014, expected cardinality ~15
+select m.c_custkey, d.o_orderkey
+from tpch.customer m,
+     tpch.orders d
+where m.c_custkey = d.o_custkey
+  and m.c_custkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.o_custkey = m.c_custkey
+|  runtime filters: RF000 <- m.c_custkey
+|  row-size=24B cardinality=1
+|
+|--00:SCAN HDFS [tpch.customer m]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: m.c_custkey = 10
+|     row-size=8B cardinality=1
+|
+01:SCAN HDFS [tpch.orders d]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: d.o_custkey = 10
+   runtime filters: RF000 -> d.o_custkey
+   row-size=16B cardinality=15
+====
+# Selective detail filtering
+# Card = 1: one detail record finds its master record
+select m.c_custkey, d.o_orderkey
+from tpch.customer m,
+     tpch.orders d
+where m.c_custkey = d.o_custkey
+  and d.o_orderkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: m.c_custkey = d.o_custkey
+|  runtime filters: RF000 <- d.o_custkey
+|  row-size=24B cardinality=1
+|
+|--01:SCAN HDFS [tpch.orders d]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: d.o_orderkey = 10
+|     row-size=16B cardinality=1
+|
+00:SCAN HDFS [tpch.customer m]
+   partitions=1/1 files=1 size=23.08MB
+   runtime filters: RF000 -> m.c_custkey
+   row-size=8B cardinality=150.00K
+====
+# Broad master filtering
+# |M'| = |M| * 0.33 = 50K
+# Each master finds all its details
+# |join|= |M'| * |D|/|D.fk| = 50K * 15 = 750K
+# Though |M.pk| > |D.fk|, we assume that filtering eliminated the unmatched keys
+# Bug: Several, expected cardinality ~750K
+select m.c_custkey, d.o_orderkey
+from tpch.customer m,
+     tpch.orders d
+where m.c_custkey = d.o_custkey
+  and m.c_name < 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.o_custkey = m.c_custkey
+|  runtime filters: RF000 <- m.c_custkey
+|  row-size=54B cardinality=228.68K
+|
+|--00:SCAN HDFS [tpch.customer m]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: m.c_name < 'foo'
+|     row-size=38B cardinality=15.00K
+|
+01:SCAN HDFS [tpch.orders d]
+   partitions=1/1 files=1 size=162.56MB
+   runtime filters: RF000 -> d.o_custkey
+   row-size=16B cardinality=1.50M
+====
+# Broad master filtering, column common with detail
+# |D'| = |D| * 0.33 = 500K
+# Each detail finds its master
+# |join| = |D'| = 500K
+# Bug: Expected cardinality ~500K
+select m.c_custkey, d.o_orderkey
+from tpch.customer m,
+     tpch.orders d
+where m.c_custkey = d.o_custkey
+  and m.c_custkey < 1234
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.o_custkey = m.c_custkey
+|  runtime filters: RF000 <- m.c_custkey
+|  row-size=24B cardinality=22.87K
+|
+|--00:SCAN HDFS [tpch.customer m]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: m.c_custkey < 1234
+|     row-size=8B cardinality=15.00K
+|
+01:SCAN HDFS [tpch.orders d]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: d.o_custkey < 1234
+   runtime filters: RF000 -> d.o_custkey
+   row-size=16B cardinality=150.00K
+====
+# Broad detail filtering
+# |D'| = |D| * 0.33 = 500K
+# Every detail finds its master
+# Card = 500K
+# Bug: Expected cardinlity ~500K
+select m.c_custkey, d.o_orderkey
+from tpch.customer m,
+     tpch.orders d
+where m.c_custkey = d.o_custkey
+  and d.o_orderkey < 1234
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.o_custkey = m.c_custkey
+|  runtime filters: RF000 <- m.c_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--00:SCAN HDFS [tpch.customer m]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=8B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders d]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: d.o_orderkey < 1234
+   runtime filters: RF000 -> d.o_custkey
+   row-size=16B cardinality=150.00K
+====
+# Filtering on join between table and a no-stats, 0-row table
+# Inner join with a zero-sized table produces 0 rows.
+select a.id, b.id
+from functional.alltypestiny a,
+     functional.alltypesnopart b
+WHERE a.id = b.id
+  AND a.id = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  runtime filters: RF000 <- b.id
+|  row-size=8B cardinality=1
+|
+|--01:SCAN HDFS [functional.alltypesnopart b]
+|     partitions=1/1 files=0 size=0B
+|     predicates: b.id = 10
+|     row-size=4B cardinality=0
+|
+00:SCAN HDFS [functional.alltypestiny a]
+   partitions=4/4 files=4 size=460B
+   predicates: a.id = 10
+   runtime filters: RF000 -> a.id
+   row-size=4B cardinality=1
+====
+# Filtering on join between table and a no-stats, 0-row table
+# Bug: Expected cardinality ~1
+select a.id, b.id
+from functional.alltypestiny a,
+     functional.alltypesnopart b
+WHERE a.id = b.id
+  AND b.id = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  runtime filters: RF000 <- b.id
+|  row-size=8B cardinality=1
+|
+|--01:SCAN HDFS [functional.alltypesnopart b]
+|     partitions=1/1 files=0 size=0B
+|     predicates: b.id = 10
+|     row-size=4B cardinality=0
+|
+00:SCAN HDFS [functional.alltypestiny a]
+   partitions=4/4 files=4 size=460B
+   predicates: a.id = 10
+   runtime filters: RF000 -> a.id
+   row-size=4B cardinality=1
+====
+# Cartesian product between two empty tables
+select a.id, b.id
+from functional.alltypesnopart a,
+     functional.alltypesnopart b
+WHERE a.id = b.id
+  AND a.id = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  runtime filters: RF000 <- b.id
+|  row-size=8B cardinality=0
+|
+|--01:SCAN HDFS [functional.alltypesnopart b]
+|     partitions=1/1 files=0 size=0B
+|     predicates: b.id = 10
+|     row-size=4B cardinality=0
+|
+00:SCAN HDFS [functional.alltypesnopart a]
+   partitions=1/1 files=0 size=0B
+   predicates: a.id = 10
+   runtime filters: RF000 -> a.id
+   row-size=4B cardinality=0
+====
+# Selective filtering on both sides
+# |D'| = 1
+# |C'| = 1
+# |D' >< C'| = 0 or 1, depending whether we're lucky
+select m.c_custkey, d.o_orderkey
+from tpch.customer m,
+     tpch.orders d
+where m.c_custkey = d.o_custkey
+  and m.c_name = 'foo'
+  and d.o_orderkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: m.c_custkey = d.o_custkey
+|  runtime filters: RF000 <- d.o_custkey
+|  row-size=54B cardinality=1
+|
+|--01:SCAN HDFS [tpch.orders d]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: d.o_orderkey = 10
+|     row-size=16B cardinality=1
+|
+00:SCAN HDFS [tpch.customer m]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: m.c_name = 'foo'
+   runtime filters: RF000 -> m.c_custkey
+   row-size=38B cardinality=1
+====
+# Correlated filtering on master
+# |join| = |D| / |D.fk|
+# Bug: Expected cardinality 15
+select m.c_custkey, d.o_orderkey
+from tpch.customer m,
+     tpch.orders d
+where m.c_custkey = d.o_custkey
+  and m.c_custkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.o_custkey = m.c_custkey
+|  runtime filters: RF000 <- m.c_custkey
+|  row-size=24B cardinality=1
+|
+|--00:SCAN HDFS [tpch.customer m]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: m.c_custkey = 10
+|     row-size=8B cardinality=1
+|
+01:SCAN HDFS [tpch.orders d]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: d.o_custkey = 10
+   runtime filters: RF000 -> d.o_custkey
+   row-size=16B cardinality=15
+====
+# Correlated filtering on detail
+# |join| = |D| / |D.fk|
+# Bug: Expected cardinality 15
+select m.c_custkey, d.o_orderkey
+from tpch.customer m,
+     tpch.orders d
+where m.c_custkey = d.o_custkey
+  and d.o_custkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.o_custkey = m.c_custkey
+|  runtime filters: RF000 <- m.c_custkey
+|  row-size=24B cardinality=1
+|
+|--00:SCAN HDFS [tpch.customer m]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: m.c_custkey = 10
+|     row-size=8B cardinality=1
+|
+01:SCAN HDFS [tpch.orders d]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: d.o_custkey = 10
+   runtime filters: RF000 -> d.o_custkey
+   row-size=16B cardinality=15
+====
+# Redundant correlated filtering on both sides
+# Same as above case internally
+# Bug: Expected cardinality ~1
+select m.c_custkey, d.o_orderkey
+from tpch.customer m,
+     tpch.orders d
+where m.c_custkey = d.o_custkey
+  and m.c_name = 'foo'
+  and m.c_custkey = 10
+  and d.o_custkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.o_custkey = m.c_custkey
+|  runtime filters: RF000 <- m.c_custkey
+|  row-size=54B cardinality=1
+|
+|--00:SCAN HDFS [tpch.customer m]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: m.c_custkey = 10, m.c_name = 'foo'
+|     row-size=38B cardinality=1
+|
+01:SCAN HDFS [tpch.orders d]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: d.o_custkey = 10
+   runtime filters: RF000 -> d.o_custkey
+   row-size=16B cardinality=15
+====
+# Selective filtering on master, broad on detail
+# |M'| = 1
+# |D'| = |D| * 0.33, uncorrelated with above
+# |D.fk'| = |D.fk| * 0.33
+# Since we assume containment: that the smaller key set
+# is a subset of the larger one.
+# Card = |D'| / max(1, |D.fk'|) = 5
+# Bug: Expected cardinality ~5
+select m.c_custkey, d.o_orderkey
+from tpch.customer m,
+     tpch.orders d
+where m.c_custkey = d.o_custkey
+  and m.c_name = 'foo'
+  and d.o_orderkey < 1234
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: d.o_custkey = m.c_custkey
+|  runtime filters: RF000 <- m.c_custkey
+|  row-size=54B cardinality=2
+|
+|--00:SCAN HDFS [tpch.customer m]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: m.c_name = 'foo'
+|     row-size=38B cardinality=1
+|
+01:SCAN HDFS [tpch.orders d]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: d.o_orderkey < 1234
+   runtime filters: RF000 -> d.o_custkey
+   row-size=16B cardinality=150.00K
+====
+# Selective filtering on detail, broad on master
+# |M'| = |M| * 0.33
+# |D'| = 1
+# 1/3 chance that the detail finds its master, so card = 0 or 1
+select m.c_custkey, d.o_orderkey
+from tpch.customer m,
+     tpch.orders d
+where m.c_custkey = d.o_custkey
+  and m.c_name < 'foo'
+  and d.o_orderkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: m.c_custkey = d.o_custkey
+|  runtime filters: RF000 <- d.o_custkey
+|  row-size=54B cardinality=1
+|
+|--01:SCAN HDFS [tpch.orders d]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: d.o_orderkey = 10
+|     row-size=16B cardinality=1
+|
+00:SCAN HDFS [tpch.customer m]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: m.c_name < 'foo'
+   runtime filters: RF000 -> m.c_custkey
+   row-size=38B cardinality=15.00K
+====
+# M:N join
+# |partsupp| = 800K
+# |lineitem| = 6M
+# NDV(ps_suppkey) = NDV(l_suppkey) = |suppkey| = 9.7K
+# |lineitem >< partsupp| = (|lineitem| * |partsupp|) / |suppkey|
+#                        = 800K * 6M / 10K = 80 * 600 = 480M
+select t1.ps_suppkey, t2.l_suppkey
+from tpch.partsupp t1,
+     tpch.lineitem t2
+where t1.ps_suppkey = t2.l_suppkey
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: t2.l_suppkey = t1.ps_suppkey
+|  runtime filters: RF000 <- t1.ps_suppkey
+|  row-size=16B cardinality=494.33M
+|
+|--00:SCAN HDFS [tpch.partsupp t1]
+|     partitions=1/1 files=1 size=112.71MB
+|     row-size=8B cardinality=800.00K
+|
+01:SCAN HDFS [tpch.lineitem t2]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF000 -> t2.l_suppkey
+   row-size=8B cardinality=6.00M
+====
+# M:N join with filtering on smaller table
+# NDV(availqty) = = |availqty| = 10K
+# |partsupp'| = |partsupp| / |availqty|
+#             = 800K / 10K = 80
+# |lineitem >< partsupp'| = |lineitem| * |partsupp'| / |suppkey|
+#                         = 6M * 80 / 10K = 48K
+select t1.ps_suppkey, t2.l_suppkey
+from tpch.partsupp t1,
+     tpch.lineitem t2
+where t1.ps_suppkey = t2.l_suppkey
+  and t1.ps_availqty = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: t2.l_suppkey = t1.ps_suppkey
+|  runtime filters: RF000 <- t1.ps_suppkey
+|  row-size=20B cardinality=48.20K
+|
+|--00:SCAN HDFS [tpch.partsupp t1]
+|     partitions=1/1 files=1 size=112.71MB
+|     predicates: t1.ps_availqty = 10
+|     row-size=12B cardinality=78
+|
+01:SCAN HDFS [tpch.lineitem t2]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF000 -> t2.l_suppkey
+   row-size=8B cardinality=6.00M
+====
+# M:N join with filtering on larger table
+# NDV(l_partkey) = |l_partkey| = 200K
+# |lineitem'| = |lineitem| / |l_partkey| = 6M / 200K = 30
+# |lineitem' >< partsupp| = |lineitem'| * |partsupp| / |suppkey|
+#                         = 30 * 800K / 10K = 2400
+select t1.ps_suppkey, t2.l_suppkey
+from tpch.partsupp t1,
+     tpch.lineitem t2
+where t1.ps_suppkey = t2.l_suppkey
+  and t2.l_partkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: t1.ps_suppkey = t2.l_suppkey
+|  runtime filters: RF000 <- t2.l_suppkey
+|  row-size=24B cardinality=2.47K
+|
+|--01:SCAN HDFS [tpch.lineitem t2]
+|     partitions=1/1 files=1 size=718.94MB
+|     predicates: t2.l_partkey = 10
+|     row-size=16B cardinality=30
+|
+00:SCAN HDFS [tpch.partsupp t1]
+   partitions=1/1 files=1 size=112.71MB
+   runtime filters: RF000 -> t1.ps_suppkey
+   row-size=8B cardinality=800.00K
+====
+# M:N join with selective filtering on shared column
+# |partsupp'| = |partsupp| / |suppkey| = 800K / 10K = 80
+# But, filtering is done on the join column, so we want:
+# |lineitem >< partsupp'| = |lineitem| * |partsupp'|
+#                         = (80 * 6M) / 10K = 80 * 600 = 48K
+select t1.ps_suppkey, t2.l_suppkey
+from tpch.partsupp t1,
+     tpch.lineitem t2
+where t1.ps_suppkey = t2.l_suppkey
+  and t1.ps_suppkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: t2.l_suppkey = t1.ps_suppkey
+|  runtime filters: RF000 <- t1.ps_suppkey
+|  row-size=16B cardinality=50.67K
+|
+|--00:SCAN HDFS [tpch.partsupp t1]
+|     partitions=1/1 files=1 size=112.71MB
+|     predicates: t1.ps_suppkey = 10
+|     row-size=8B cardinality=82
+|
+01:SCAN HDFS [tpch.lineitem t2]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: t2.l_suppkey = 10
+   runtime filters: RF000 -> t2.l_suppkey
+   row-size=8B cardinality=618
+====
+# M:N join with broad filtering on smaller table
+# Most general join case
+# |partsupp'| = |partsupp| * 0.33
+#             = 800K / 3 = 267K
+# |lineietem >< partsupp'| = |lineitem| * |partsupp'| / |suppkey|
+#             = 267K * 6M / 10K = 160M
+# Bug: Expected cardinality ~160M
+select t1.ps_suppkey, t2.l_suppkey
+from tpch.partsupp t1,
+     tpch.lineitem t2
+where t1.ps_suppkey = t2.l_suppkey
+  and t1.ps_availqty < 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: t2.l_suppkey = t1.ps_suppkey
+|  runtime filters: RF000 <- t1.ps_suppkey
+|  row-size=20B cardinality=49.43M
+|
+|--00:SCAN HDFS [tpch.partsupp t1]
+|     partitions=1/1 files=1 size=112.71MB
+|     predicates: t1.ps_availqty < 10
+|     row-size=12B cardinality=80.00K
+|
+01:SCAN HDFS [tpch.lineitem t2]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF000 -> t2.l_suppkey
+   row-size=8B cardinality=6.00M
+====
+# M:N join with broad filtering on larger table
+# |lineitem'| = |lineitem| * 0.33 = 6M / 3 = 2M
+# |lineitem' >< partsupp| = |lineitem'| * |partsupp| / |suppkey|
+#             = 2M * 800K / 10K = 160M
+# Bug: Expected cardinality ~161M
+select t1.ps_suppkey, t2.l_suppkey
+from tpch.partsupp t1,
+     tpch.lineitem t2
+where t1.ps_suppkey = t2.l_suppkey
+  and t2.l_partkey < 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: t2.l_suppkey = t1.ps_suppkey
+|  runtime filters: RF000 <- t1.ps_suppkey
+|  row-size=24B cardinality=49.43M
+|
+|--00:SCAN HDFS [tpch.partsupp t1]
+|     partitions=1/1 files=1 size=112.71MB
+|     row-size=8B cardinality=800.00K
+|
+01:SCAN HDFS [tpch.lineitem t2]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: t2.l_partkey < 10
+   runtime filters: RF000 -> t2.l_suppkey
+   row-size=16B cardinality=600.12K
+====
+# M:N join with filtering on both sides
+# Most general join case
+# |lineitem'| = |lineitem| * 0.33
+# |partsup'| = |partsupp| * 0.33
+# |suppkey'| = |suppkey| * 0.33
+# |lineitem' >< partsupp'| = |lineitem'| * |partsupp'| / |suppkey'|
+#              = 800K/3 * 6M/3 / 3.3K = 161M
+# (Recall the containment assumption above.)
+# Bug: Expected cardinality ~161M
+select t1.ps_suppkey, t2.l_suppkey
+from tpch.partsupp t1,
+     tpch.lineitem t2
+where t1.ps_suppkey = t2.l_suppkey
+  and t1.ps_availqty < 10
+  and t2.l_partkey < 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: t2.l_suppkey = t1.ps_suppkey
+|  runtime filters: RF000 <- t1.ps_suppkey
+|  row-size=28B cardinality=49.43M
+|
+|--00:SCAN HDFS [tpch.partsupp t1]
+|     partitions=1/1 files=1 size=112.71MB
+|     predicates: t1.ps_availqty < 10
+|     row-size=12B cardinality=80.00K
+|
+01:SCAN HDFS [tpch.lineitem t2]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: t2.l_partkey < 10
+   runtime filters: RF000 -> t2.l_suppkey
+   row-size=16B cardinality=600.12K
+====
+# Join with join-level filter predicates
+# |join| = |orders| * sel(c.nationkey + o.o_shippriority = 10)
+# |orders| = 1.5M
+# sel(c.nationkey + o.o_shippriority = 10) = 0.1 (can't use NDV)
+# |join| = 1.5M * .1 = 150K
+# Bug: Several, expected cardinality ~1.5M
+select c.c_custkey, o.o_orderkey
+from tpch.customer c,
+     tpch.orders o
+where c.c_custkey = o.o_custkey
+  and c.c_nationkey + o.o_shippriority = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_nationkey + o.o_shippriority = 10
+|  runtime filters: RF000 <- c.c_custkey
+|  row-size=30B cardinality=1.50M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=10B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   runtime filters: RF000 -> o.o_custkey
+   row-size=20B cardinality=1.50M
+====
+# Join with a table that has no stats and zero rows.
+# The zero rows trumps the stats.
+# Zero-row table is on the build side, |join| = 0
+# Bug: Expected cardinality = 0
+select a.int_col, b.int_col
+from functional.alltypesagg a, functional.alltypesnopart b
+where a.id = b.id
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  runtime filters: RF000 <- b.id
+|  row-size=16B cardinality=11.00K
+|
+|--01:SCAN HDFS [functional.alltypesnopart b]
+|     partitions=1/1 files=0 size=0B
+|     row-size=8B cardinality=0
+|
+00:SCAN HDFS [functional.alltypesagg a]
+   partitions=11/11 files=11 size=814.73KB
+   runtime filters: RF000 -> a.id
+   row-size=8B cardinality=11.00K
+====
+# Adding table and join filters should not affect the result.
+# Bug: Expected cardinality = 0
+select a.int_col, b.int_col
+from functional.alltypesagg a, functional.alltypesnopart b
+where a.id = b.id
+  and a.smallint_col = 10
+  and b.smallint_col = 20
+  and a.int_col + b.int_col > 30
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  other predicates: a.int_col + b.int_col > 30
+|  runtime filters: RF000 <- b.id
+|  row-size=20B cardinality=113
+|
+|--01:SCAN HDFS [functional.alltypesnopart b]
+|     partitions=1/1 files=0 size=0B
+|     predicates: b.smallint_col = 20
+|     row-size=10B cardinality=0
+|
+00:SCAN HDFS [functional.alltypesagg a]
+   partitions=11/11 files=11 size=814.73KB
+   predicates: a.smallint_col = 10
+   runtime filters: RF000 -> a.id
+   row-size=10B cardinality=113
+====
+# Join on a computed column
+# Assumes Cartesian product * 0.1
+# |join| = 11K * 7K * 0.1 = 7M
+# Bug: Expected cardinality ~7M
+select a.id, b.id
+from functional.alltypes a, functional.alltypesagg b
+where a.id = b.id + b.int_col
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: b.id + b.int_col = a.id
+|  runtime filters: RF000 <- a.id
+|  row-size=12B cardinality=11.00K
+|
+|--00:SCAN HDFS [functional.alltypes a]
+|     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
+|
+01:SCAN HDFS [functional.alltypesagg b]
+   partitions=11/11 files=11 size=814.73KB
+   runtime filters: RF000 -> b.id + b.int_col
+   row-size=8B cardinality=11.00K
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/card-multi-join.test b/testdata/workloads/functional-planner/queries/PlannerTest/card-multi-join.test
new file mode 100644
index 0000000..02cf737
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/card-multi-join.test
@@ -0,0 +1,275 @@
+# Three table 1:M join
+# |customer| = 150K
+# |customer.c_custkey| = |customer| = 150K
+# |orders| = 1.5M
+# |orders.o_orderKey| = 100K
+# |lineitem| = 6M
+# |lineitem.l_orderkey| = 1.5M
+# |join| = |lineitem| = 6M
+select c.c_name, o.o_orderkey, i.l_linenumber
+from tpch.customer c,
+     tpch.orders o,
+     tpch.lineitem i
+where c.c_custkey = o.o_custkey
+  and o.o_orderkey = i.l_orderkey
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  runtime filters: RF000 <- c.c_custkey
+|  row-size=66B cardinality=5.76M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=38B cardinality=150.00K
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: i.l_orderkey = o.o_orderkey
+|  runtime filters: RF002 <- o.o_orderkey
+|  row-size=28B cardinality=5.76M
+|
+|--01:SCAN HDFS [tpch.orders o]
+|     partitions=1/1 files=1 size=162.56MB
+|     runtime filters: RF000 -> o.o_custkey
+|     row-size=16B cardinality=1.50M
+|
+02:SCAN HDFS [tpch.lineitem i]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF002 -> i.l_orderkey
+   row-size=12B cardinality=6.00M
+====
+# Filter on customer table
+# |join| = 1/3 of above = 2M
+# Bug: Expected cardinality ~2M
+select c.c_name, o.o_orderkey, i.l_linenumber
+from tpch.customer c,
+     tpch.orders o,
+     tpch.lineitem i
+where c.c_custkey = o.o_custkey
+  and o.o_orderkey = i.l_orderkey
+  and c.c_name < 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  runtime filters: RF000 <- c.c_custkey
+|  row-size=66B cardinality=877.79K
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: c.c_name < 'foo'
+|     row-size=38B cardinality=15.00K
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: i.l_orderkey = o.o_orderkey
+|  runtime filters: RF002 <- o.o_orderkey
+|  row-size=28B cardinality=5.76M
+|
+|--01:SCAN HDFS [tpch.orders o]
+|     partitions=1/1 files=1 size=162.56MB
+|     runtime filters: RF000 -> o.o_custkey
+|     row-size=16B cardinality=1.50M
+|
+02:SCAN HDFS [tpch.lineitem i]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF002 -> i.l_orderkey
+   row-size=12B cardinality=6.00M
+====
+# Filter on orders table
+# |join| = 1/3 of full join = 2M
+# Bug: Expected cardinality ~2M
+select c.c_name, o.o_orderkey, i.l_linenumber
+from tpch.customer c,
+     tpch.orders o,
+     tpch.lineitem i
+where c.c_custkey = o.o_custkey
+  and o.o_orderkey = i.l_orderkey
+  and o.o_clerk < 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  runtime filters: RF000 <- c.c_custkey
+|  row-size=93B cardinality=575.77K
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=38B cardinality=150.00K
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: i.l_orderkey = o.o_orderkey
+|  runtime filters: RF002 <- o.o_orderkey
+|  row-size=55B cardinality=575.77K
+|
+|--01:SCAN HDFS [tpch.orders o]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: o.o_clerk < 'foo'
+|     runtime filters: RF000 -> o.o_custkey
+|     row-size=43B cardinality=150.00K
+|
+02:SCAN HDFS [tpch.lineitem i]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF002 -> i.l_orderkey
+   row-size=12B cardinality=6.00M
+====
+# Filter on items table
+# |join| = 1/3 of full join = 2M
+# Bug: Expected cardinality ~2M
+select c.c_name, o.o_orderkey, i.l_linenumber
+from tpch.customer c,
+     tpch.orders o,
+     tpch.lineitem i
+where c.c_custkey = o.o_custkey
+  and o.o_orderkey = i.l_orderkey
+  and i.l_suppkey < 1234
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  runtime filters: RF000 <- c.c_custkey
+|  row-size=74B cardinality=600.12K
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=38B cardinality=150.00K
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = i.l_orderkey
+|  runtime filters: RF002 <- i.l_orderkey
+|  row-size=36B cardinality=600.12K
+|
+|--02:SCAN HDFS [tpch.lineitem i]
+|     partitions=1/1 files=1 size=718.94MB
+|     predicates: i.l_suppkey < 1234
+|     row-size=20B cardinality=600.12K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   runtime filters: RF000 -> o.o_custkey, RF002 -> o.o_orderkey
+   row-size=16B cardinality=1.50M
+====
+# Correlated filter on two columns
+# |join| = Ave. line items per customer
+# |lineitem| / |o.custkey| = 6M / 100K = 60
+# Bug: Expected cardinality ~60
+select c.c_name, o.o_orderkey, i.l_linenumber
+from tpch.customer c,
+     tpch.orders o,
+     tpch.lineitem i
+where c.c_custkey = o.o_custkey
+  and o.o_orderkey = i.l_orderkey
+  and c.c_custkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  runtime filters: RF000 <- c.c_custkey
+|  row-size=66B cardinality=1
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: c.c_custkey = 10
+|     row-size=38B cardinality=1
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: i.l_orderkey = o.o_orderkey
+|  runtime filters: RF002 <- o.o_orderkey
+|  row-size=28B cardinality=58
+|
+|--01:SCAN HDFS [tpch.orders o]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: o.o_custkey = 10
+|     runtime filters: RF000 -> o.o_custkey
+|     row-size=16B cardinality=15
+|
+02:SCAN HDFS [tpch.lineitem i]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF002 -> i.l_orderkey
+   row-size=12B cardinality=6.00M
+====
+# Correlated filter on two columns
+# |join| = Ave. line items per customer
+# |lineitem| / |o.orderkey| = 6M / 1.5M = 4
+select c.c_name, o.o_orderkey, i.l_linenumber
+from tpch.customer c,
+     tpch.orders o,
+     tpch.lineitem i
+where c.c_custkey = o.o_custkey
+  and o.o_orderkey = i.l_orderkey
+  and o.o_orderkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = i.l_orderkey
+|  runtime filters: RF000 <- i.l_orderkey
+|  row-size=66B cardinality=4
+|
+|--02:SCAN HDFS [tpch.lineitem i]
+|     partitions=1/1 files=1 size=718.94MB
+|     predicates: i.l_orderkey = 10
+|     row-size=12B cardinality=4
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  runtime filters: RF002 <- o.o_custkey
+|  row-size=54B cardinality=1
+|
+|--01:SCAN HDFS [tpch.orders o]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: o.o_orderkey = 10
+|     runtime filters: RF000 -> o.o_orderkey
+|     row-size=16B cardinality=1
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   runtime filters: RF002 -> c.c_custkey
+   row-size=38B cardinality=150.00K
+====
+# Correlated filter on two pairs of columns
+# |join| = Ave. line items per customer
+# |lineitem| / |o.orderkey| = 6M / 1.5M = 4
+# Bug: wrong join plan, expected (lineitem >< order) >< customer
+select c.c_name, o.o_orderkey, i.l_linenumber
+from tpch.customer c,
+     tpch.orders o,
+     tpch.lineitem i
+where c.c_custkey = o.o_custkey
+  and o.o_orderkey = i.l_orderkey
+  and o.o_orderkey = 10
+  and c.c_custkey = 20
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  runtime filters: RF000 <- o.o_custkey
+|  row-size=66B cardinality=1
+|
+|--03:HASH JOIN [INNER JOIN]
+|  |  hash predicates: i.l_orderkey = o.o_orderkey
+|  |  runtime filters: RF002 <- o.o_orderkey
+|  |  row-size=28B cardinality=1
+|  |
+|  |--01:SCAN HDFS [tpch.orders o]
+|  |     partitions=1/1 files=1 size=162.56MB
+|  |     predicates: o.o_orderkey = 10, o.o_custkey = 20
+|  |     row-size=16B cardinality=1
+|  |
+|  02:SCAN HDFS [tpch.lineitem i]
+|     partitions=1/1 files=1 size=718.94MB
+|     predicates: i.l_orderkey = 10
+|     runtime filters: RF002 -> i.l_orderkey
+|     row-size=12B cardinality=4
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey = 20
+   runtime filters: RF000 -> c.c_custkey
+   row-size=38B cardinality=1
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/card-outer-join.test b/testdata/workloads/functional-planner/queries/PlannerTest/card-outer-join.test
new file mode 100644
index 0000000..d54e307
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/card-outer-join.test
@@ -0,0 +1,812 @@
+# Right outer tests
+#
+# Right outer
+# |join| = |orders| = 1.5M
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+right outer join tpch.orders o on c.c_custkey = o.o_custkey
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  row-size=24B cardinality=1.50M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=8B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=16B cardinality=1.50M
+====
+# Right outer that is the inverse of a left-outer test
+# 1/3 of customers have no orders, or 50K
+# |join| = |orders|  + 50K = 1.55M
+select c.c_custkey, o.o_orderkey
+from tpch.orders o
+right outer join tpch.customer c on c.c_custkey = o.o_custkey
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [RIGHT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  runtime filters: RF000 <- c.c_custkey
+|  row-size=24B cardinality=1.50M
+|
+|--01:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=8B cardinality=150.00K
+|
+00:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   runtime filters: RF000 -> o.o_custkey
+   row-size=16B cardinality=1.50M
+====
+# Right outer join with zero-sized right table
+# Bug: Expected cardinality = 0
+select o.id, i.id
+from functional.alltypestiny i
+right outer join functional.alltypesnopart o using (id)
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [RIGHT OUTER JOIN]
+|  hash predicates: i.id = o.id
+|  runtime filters: RF000 <- o.id
+|  row-size=8B cardinality=8
+|
+|--01:SCAN HDFS [functional.alltypesnopart o]
+|     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
+|
+00:SCAN HDFS [functional.alltypestiny i]
+   partitions=4/4 files=4 size=460B
+   runtime filters: RF000 -> i.id
+   row-size=4B cardinality=8
+====
+# Right outer join with zero-sized left table
+select o.id, i.id
+from functional.alltypesnopart i
+right outer join functional.alltypestiny o using (id)
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: o.id = i.id
+|  row-size=8B cardinality=8
+|
+|--00:SCAN HDFS [functional.alltypesnopart i]
+|     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
+|
+01:SCAN HDFS [functional.alltypestiny o]
+   partitions=4/4 files=4 size=460B
+   row-size=4B cardinality=8
+====
+# Right outer with left predicate
+# |join| = |customers'| * avg. orders per customer = 15
+# predicate removes null rows, so the right outer is a no-op
+# Using wrong NDV so answer is 25. See IMPALA-8045, Outer Join section
+# Bug: Expected cardinality ~25
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+right outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_name = 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_name = 'foo'
+|  row-size=54B cardinality=1.50M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: c.c_name = 'foo'
+|     row-size=38B cardinality=1
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=16B cardinality=1.50M
+====
+# Right outer with broad left predicate
+# |join| = |customers'| * avg. orders per customer = 150K/3 * 15 = 750K
+# predicate removes null rows, so the right outer is a no-op
+# Bug: Expected cardinality ~750
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+right outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_name < 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_name < 'foo'
+|  row-size=54B cardinality=1.50M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: c.c_name < 'foo'
+|     row-size=38B cardinality=15.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=16B cardinality=1.50M
+====
+# Right outer with right predicate
+# |clerk| = 1K
+# sel(clerk = 'foo') = 1/1000
+# |orders'| = |orders| * sel(clerk = 'foo') = 1.5K
+# |join| = |orders'| = 1.5K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+right outer join tpch.orders o on c.c_custkey = o.o_custkey
+where o.o_clerk = 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [RIGHT OUTER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  runtime filters: RF000 <- o.o_custkey
+|  row-size=51B cardinality=1.49K
+|
+|--01:SCAN HDFS [tpch.orders o]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: o.o_clerk = 'foo'
+|     row-size=43B cardinality=1.49K
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   runtime filters: RF000 -> c.c_custkey
+   row-size=8B cardinality=150.00K
+====
+# Right outer with broad right predicate
+# |join| = |orders| * sel(left pred) = 500K
+# Bug: Expected cardinality ~500K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+right outer join tpch.orders o on c.c_custkey = o.o_custkey
+where o.o_clerk < 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  row-size=51B cardinality=150.00K
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=8B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: o.o_clerk < 'foo'
+   row-size=43B cardinality=150.00K
+====
+# Right outer with join predicate
+# |join| = |orders| * sel(pred) = 150K
+# Bug: Expected cardinality ~150K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+right outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_nationkey + o.o_shippriority = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_nationkey + o.o_shippriority = 10
+|  row-size=30B cardinality=1.50M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=10B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=20B cardinality=1.50M
+====
+# Right outer with broad join predicate
+# sel(c.c_nationkey + o.o_shippriority < 10) = 0.33
+# |join| = |orders| * sel(pred) = 500K
+# Bug: Expected cardinality ~500K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+right outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_nationkey + o.o_shippriority < 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_nationkey + o.o_shippriority < 10
+|  row-size=30B cardinality=1.50M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=10B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=20B cardinality=1.50M
+====
+# Right outer with left, right and join predicate
+# |customers'| = |customers| * sel(left pred) = 50K
+# |orders'| = |orders| * sel(right pred) = 500K
+# max(|key|) = 150K / 3 = 50K
+# sel(join pred) = 0.1
+# |join| = |orders'| * |customers'| / max(|key|) * sel(join pred)
+# = 500K * 50K / 50K * .1 = 50K
+# Since inner has more keys than outer, outer join adds no rows
+# The c.c_name < 'foo' is reapplied after the join
+# Bug: Expected cardinality ~50K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+right outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_nationkey + o.o_shippriority = 10
+  and o.o_clerk < 'foo'
+  and c.c_name < 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_nationkey + o.o_shippriority = 10, c.c_name < 'foo'
+|  row-size=87B cardinality=150.00K
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: c.c_name < 'foo'
+|     row-size=40B cardinality=15.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: o.o_clerk < 'foo'
+   row-size=47B cardinality=150.00K
+====
+# Left outer tests
+#
+# Left outer
+# |join| = |orders| + unmatched customers = 1.55M
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+left outer join tpch.orders o on c.c_custkey = o.o_custkey
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [RIGHT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  runtime filters: RF000 <- c.c_custkey
+|  row-size=24B cardinality=1.50M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=8B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   runtime filters: RF000 -> o.o_custkey
+   row-size=16B cardinality=1.50M
+====
+# Left outer join which is the inverse of the right outer test
+# |join| = |orders| = 1.5M
+select c.c_custkey, o.o_orderkey
+from tpch.orders o
+left outer join tpch.customer c on c.c_custkey = o.o_custkey
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  row-size=24B cardinality=1.50M
+|
+|--01:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=8B cardinality=150.00K
+|
+00:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=16B cardinality=1.50M
+====
+# Left outer join with zero-sized right table
+select o.id, i.id
+from functional.alltypestiny i
+left outer join functional.alltypesnopart o using (id)
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: i.id = o.id
+|  row-size=8B cardinality=8
+|
+|--01:SCAN HDFS [functional.alltypesnopart o]
+|     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
+|
+00:SCAN HDFS [functional.alltypestiny i]
+   partitions=4/4 files=4 size=460B
+   row-size=4B cardinality=8
+====
+# Left outer join with zero-sized left table
+select o.id, i.id
+from functional.alltypesnopart i
+left outer join functional.alltypestiny o using (id)
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [RIGHT OUTER JOIN]
+|  hash predicates: o.id = i.id
+|  runtime filters: RF000 <- i.id
+|  row-size=8B cardinality=0
+|
+|--00:SCAN HDFS [functional.alltypesnopart i]
+|     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
+|
+01:SCAN HDFS [functional.alltypestiny o]
+   partitions=4/4 files=4 size=460B
+   runtime filters: RF000 -> o.id
+   row-size=4B cardinality=8
+====
+# Left outer with left predicate
+# |customers'| = |customers| * sel(pred) = 50K
+# |join| = |customers'| * avg. orders per customer = 50 * 15 = 750K
+# (Skipping some steps in the math.)
+# Bug: Expected cardinality ~750K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+left outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_name < 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [RIGHT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  runtime filters: RF000 <- c.c_custkey
+|  row-size=54B cardinality=228.68K
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: c.c_name < 'foo'
+|     row-size=38B cardinality=15.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   runtime filters: RF000 -> o.o_custkey
+   row-size=16B cardinality=1.50M
+====
+# Left outer with right predicate
+# The filter on the right table works with non-null values.
+# It is applied again after the (outer) join, and will remove any
+# null rows created via the outer join (undoing the join.)
+# While this may be a meaninless query, it is an interesting odd test case.
+# The meaning is thus to match customers who have orders.
+# |join| = |orders'| = |orders| * sel(pred)
+#        = 1.5M * 0.33 = 500K
+# Bug: Expected cardinality ~500K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+left outer join tpch.orders o on c.c_custkey = o.o_custkey
+where o.o_clerk < 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [RIGHT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: o.o_clerk < 'foo'
+|  runtime filters: RF000 <- c.c_custkey
+|  row-size=51B cardinality=150.00K
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=8B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: o.o_clerk < 'foo'
+   runtime filters: RF000 -> o.o_custkey
+   row-size=43B cardinality=150.00K
+====
+# Left outer with right predicate
+# Predicate matches both right columns and the generated, null
+# outer join rows.
+# Bug: Invalid IS NULL logic uses orders null count to estimate output of outer
+# join. Since orders has no nulls, we incorrectly assume the outer join does not either.
+# See IMPALA-8050
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+left outer join tpch.orders o on c.c_custkey = o.o_custkey
+where o.o_clerk is null
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [RIGHT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: o.o_clerk IS NULL
+|  runtime filters: RF000 <- c.c_custkey
+|  row-size=51B cardinality=1.50M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=8B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   runtime filters: RF000 -> o.o_custkey
+   row-size=43B cardinality=1.50M
+====
+# Left outer with join predicate
+# sel(c.c_nationkey + o.o_shippriority = 10) = 0.1 (guess)
+# |join| = |orders| * sel(pred) = 150K
+# Bug: Expected cardinality ~150K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+right outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_nationkey + o.o_shippriority = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_nationkey + o.o_shippriority = 10
+|  row-size=30B cardinality=1.50M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=10B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=20B cardinality=1.50M
+====
+# As above, but with two predicates since the estimate above happens
+# to also be the customer table cardinality.
+# Remember exponential back-off
+# |join| = 0.31 above, 0.031 net, or ~45K
+# Bug: Expected cardinality ~50K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+right outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_nationkey + o.o_shippriority = 10
+  and c.c_nationkey * o.o_shippriority = 100
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_nationkey * o.o_shippriority = 100, c.c_nationkey + o.o_shippriority = 10
+|  row-size=30B cardinality=1.50M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=10B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=20B cardinality=1.50M
+====
+# Left outer with broad join predicate
+# |join| = |orders| * sel(pred) = 500K
+# (Skipping some math steps.)
+# Bug: Expected cardinality ~500K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+right outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_nationkey + o.o_shippriority < 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_nationkey + o.o_shippriority < 10
+|  row-size=30B cardinality=1.50M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=10B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=20B cardinality=1.50M
+====
+# Left outer with broad join predicate and right table predicate
+# sel(c.c_nationkey + o.o_shippriority < 10) = 0.33
+# |customers'| = |customers| * 0.33 = 50K
+# |orders'| = |orders| * 0.33 = 500K
+# |join| = same as inner join = 50K
+# Order key < customer key, so no extra rows from outer
+# Bug: Expected cardinality ~50K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+left outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_nationkey + o.o_shippriority = 10
+  and o.o_clerk < 'foo'
+  and c.c_name < 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [RIGHT OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_nationkey + o.o_shippriority = 10, o.o_clerk < 'foo'
+|  runtime filters: RF000 <- c.c_custkey
+|  row-size=87B cardinality=150.00K
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: c.c_name < 'foo'
+|     row-size=40B cardinality=15.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: o.o_clerk < 'foo'
+   runtime filters: RF000 -> o.o_custkey
+   row-size=47B cardinality=150.00K
+====
+# Full outer join tests
+#
+# |join| = |left-only| + |inner-join| + |right-only|
+# |customers.c_custKey| = 150K
+# |orders.o_custkey| = 100K
+# |custkey| = max(above) = 150K
+# |left-only| = |left| * (1 - |o_custKey| / |custkey|)
+#             = 150K * (1 - 100K/150K) = 150K * 1/3 = 50K
+# |right-only| = |right| * (1 - |c_custKey| / |custkey|) = 0
+# |inner-join| = |left| * |right| / |custKey| = 1.5M
+# |join| = |left-only| + |inner-join| + |right-only|
+#        = 50K + 1.5M + 0 = 1.55M
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+full outer join tpch.orders o on c.c_custkey = o.o_custkey
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [FULL OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  row-size=24B cardinality=1.65M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=8B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=16B cardinality=1.50M
+====
+# Invers of the above
+select c.c_custkey, o.o_orderkey
+from tpch.orders o
+full outer join tpch.customer c on c.c_custkey = o.o_custkey
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [FULL OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  row-size=24B cardinality=1.65M
+|
+|--01:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=8B cardinality=150.00K
+|
+00:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=16B cardinality=1.50M
+====
+# Full outer join with zero-sized right table
+select o.id, i.id
+from functional.alltypestiny i
+full outer join functional.alltypesnopart o using (id)
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [FULL OUTER JOIN]
+|  hash predicates: i.id = o.id
+|  row-size=8B cardinality=8
+|
+|--01:SCAN HDFS [functional.alltypesnopart o]
+|     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
+|
+00:SCAN HDFS [functional.alltypestiny i]
+   partitions=4/4 files=4 size=460B
+   row-size=4B cardinality=8
+====
+# Full outer join with zero-sized left table
+select o.id, i.id
+from functional.alltypesnopart i
+full outer join functional.alltypestiny o using (id)
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [FULL OUTER JOIN]
+|  hash predicates: o.id = i.id
+|  row-size=8B cardinality=8
+|
+|--00:SCAN HDFS [functional.alltypesnopart i]
+|     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
+|
+01:SCAN HDFS [functional.alltypestiny o]
+   partitions=4/4 files=4 size=460B
+   row-size=4B cardinality=8
+====
+# Full outer with left predicate
+# predicate removes null rows, so the right outer is a no-op
+# Using wrong NDV so answer is 25. See IMPALA-8045, Outer Join section
+# Bug: Expected cardinality ~25
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+full outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_name = 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [FULL OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_name = 'foo'
+|  row-size=54B cardinality=1.50M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: c.c_name = 'foo'
+|     row-size=38B cardinality=1
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=16B cardinality=1.50M
+====
+# Full outer with broad left predicate
+# |join| = |customers'| * avg. orders per customer = 150K/3 * 15 = 750K
+# predicate removes null rows, so the right outer is a no-op
+# Bug: Using wrong NDV, so get answer of 1M, see IMPALA-8045
+# Bug: Expected cardinality ~1M
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+full outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_name < 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [FULL OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_name < 'foo'
+|  row-size=54B cardinality=1.51M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: c.c_name < 'foo'
+|     row-size=38B cardinality=15.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=16B cardinality=1.50M
+====
+# Full outer with right predicate
+# |clerk| = 1K
+# sel(clerk = 'foo') = 1/1000
+# |orders'| = |orders| * sel(clerk = 'foo') = 1.5K
+# |join| = |orders'| = 1.5K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+full outer join tpch.orders o on c.c_custkey = o.o_custkey
+where o.o_clerk = 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [FULL OUTER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  other predicates: o.o_clerk = 'foo'
+|  row-size=51B cardinality=151.49K
+|
+|--01:SCAN HDFS [tpch.orders o]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: o.o_clerk = 'foo'
+|     row-size=43B cardinality=1.49K
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   row-size=8B cardinality=150.00K
+====
+# Full outer with broad right predicate
+# |join| = |orders| * sel(left pred) = 500K
+# Bug: Expected cardinality ~500K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+full outer join tpch.orders o on c.c_custkey = o.o_custkey
+where o.o_clerk < 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [FULL OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: o.o_clerk < 'foo'
+|  row-size=51B cardinality=300.00K
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=8B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: o.o_clerk < 'foo'
+   row-size=43B cardinality=150.00K
+====
+# Full outer with join predicate
+# |join| = |orders| * sel(pred) = 150K
+# Bug: Expected cardinality ~150K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+full outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_nationkey + o.o_shippriority = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [FULL OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_nationkey + o.o_shippriority = 10
+|  row-size=30B cardinality=1.65M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=10B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=20B cardinality=1.50M
+====
+# Full outer with broad join predicate
+# sel(c.c_nationkey + o.o_shippriority < 10) = 0.33
+# |join| = |orders| * sel(pred) = 500K
+# Bug: Expected cardinality ~500K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+full outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_nationkey + o.o_shippriority < 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [FULL OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_nationkey + o.o_shippriority < 10
+|  row-size=30B cardinality=1.65M
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     row-size=10B cardinality=150.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   row-size=20B cardinality=1.50M
+====
+# Full outer with left, right and join predicate
+# |customers'| = |customers| * sel(left pred) = 50K
+# |orders'| = |orders| * sel(right pred) = 500K
+# |join| = |orders'| * sel(join pred) * sel(children) = 16K
+# sel(children) = 0.33
+# The c.c_name < 'foo' is reapplied after the join
+# Bug: Expected cardinality ~50K
+select c.c_custkey, o.o_orderkey
+from tpch.customer c
+full outer join tpch.orders o on c.c_custkey = o.o_custkey
+where c.c_nationkey + o.o_shippriority = 10
+  and o.o_clerk < 'foo'
+  and c.c_name < 'foo'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [FULL OUTER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  other predicates: c.c_nationkey + o.o_shippriority = 10, o.o_clerk < 'foo', c.c_name < 'foo'
+|  row-size=87B cardinality=165.00K
+|
+|--00:SCAN HDFS [tpch.customer c]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: c.c_name < 'foo'
+|     row-size=40B cardinality=15.00K
+|
+01:SCAN HDFS [tpch.orders o]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: o.o_clerk < 'foo'
+   row-size=47B cardinality=150.00K
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/card-scan.test b/testdata/workloads/functional-planner/queries/PlannerTest/card-scan.test
new file mode 100644
index 0000000..fb21e84
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/card-scan.test
@@ -0,0 +1,442 @@
+# Predicate tests
+# In this file, "inequality" means not equals. It is a general
+# term for <, <=, >, >=. The term "not equals" is used for the
+# != (AKA <>) operator.
+# No predicate at all. Establishes cardinality baseline.
+select * from tpch.customer
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer]
+   partitions=1/1 files=1 size=23.08MB
+   row-size=218B cardinality=150.00K
+====
+# Predicate on a single value: card = |T|/ndv
+# Unique key, NDV=|T|
+select *
+from tpch.customer c
+where c.c_custkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey = 10
+   row-size=218B cardinality=1
+====
+# Predicate on a single value: card = |T|/ndv
+# Non-unique key, NDV=25
+select *
+from tpch.customer c
+where c.c_nationkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_nationkey = 10
+   row-size=218B cardinality=6.00K
+====
+# OR'ed predicate, card = 2/ndv
+select *
+from tpch.customer c
+where c.c_custkey = 10 OR c.c_custkey = 20
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey IN (10, 20)
+   row-size=218B cardinality=2
+====
+# OR'ed predicate, distinct columns
+# card = max(card of each OR term)
+select *
+from tpch.customer c
+where c.c_custkey = 10
+   or c.c_nationkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey = 10 OR c.c_nationkey = 10
+   row-size=218B cardinality=6.00K
+====
+# As above, but with smaller table to see effect
+# NDV(id) = |T| = 8
+# NDV(bool_val) = 2
+# card = max(|T|/NDV(id),|T|/NDV(bool_val)) = max(1, 4)
+# But done by adding cardinalities, which seems right, but is wrong
+# Bug: IMPALA-8038
+select *
+from functional.alltypestiny t
+where t.id = 10
+   or t.bool_col = true
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypestiny t]
+   partitions=4/4 files=4 size=460B
+   predicates: t.id = 10 OR t.bool_col = TRUE
+   row-size=89B cardinality=5
+====
+# IN, card = x/ndv
+select *
+from tpch.customer c
+where c.c_custkey in (10, 20, 30)
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey IN (10, 20, 30)
+   row-size=218B cardinality=3
+====
+# IN with duplicate values. Remove dups.
+# Bug: IMPALA-8030
+select *
+from tpch.customer c
+where c.c_custkey in (10, 20, 30, 30, 10, 20)
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey IN (10, 20, 30, 30, 10, 20)
+   row-size=218B cardinality=6
+====
+# OR on same value: card = 1/ndv
+select *
+from tpch.customer c
+where c.c_custkey = 10 OR c.c_custkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey = 10
+   row-size=218B cardinality=1
+====
+# OR on same value: card = 1/ndv
+# Different expression order
+# Bug: IMPALA-8030
+select *
+from tpch.customer c
+where c.c_custkey = 10 OR 10 = c.c_custkey
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey IN (10, 10)
+   row-size=218B cardinality=2
+====
+# AND'ed predicate, card = 0
+select *
+from tpch.customer c
+where c.c_custkey = 10 AND c.c_custkey = 20
+---- PLAN
+PLAN-ROOT SINK
+|
+00:EMPTYSET
+====
+# AND on same value: card = 1/ndv
+select *
+from tpch.customer c
+where c.c_custkey = 10 AND c.c_custkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey = 10
+   row-size=218B cardinality=1
+====
+# Not-equal, card = 1 - 1/ndv
+# Use smaller table so effect is clear
+# |T|=8, NDV=8
+# Bug: IMPALA-8039
+# Bug, expected cardinality ~7
+select *
+from functional.alltypestiny
+where id != 10
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypestiny]
+   partitions=4/4 files=4 size=460B
+   predicates: id != 10
+   row-size=89B cardinality=1
+====
+# Inequality. No useful stats.
+# Bug: IMPALA-8037, Assumes sel = 0.1
+# Bug: Expected cardinality ~49.5K
+select *
+from tpch.customer c
+where c.c_custkey < 1234
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey < 1234
+   row-size=218B cardinality=15.00K
+====
+# Inequality twice on same value. Remove duplicate.
+# Bug: IMPALA-8037
+# Bug: Expected cardinality ~49.5K
+select *
+from tpch.customer c
+where c.c_custkey < 1234
+  and c.c_custkey < 1234
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey < 1234
+   row-size=218B cardinality=15.00K
+====
+# Inequality twice on same value, but reversed "direction". Remove duplicate.
+# Bug: IMPALA-8037
+# Bug: Expected cardinality ~49.5K
+select *
+from tpch.customer c
+where c.c_custkey < 1234
+  and 1234 > c.c_custkey
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey < 1234
+   row-size=218B cardinality=15.00K
+====
+# Two inequalities of the same "direction" Assume only one applies.
+# Bugs: IMPALA-8031, IMPALA-8037
+# Bug: Expected cardinality ~28.5K
+select *
+from tpch.customer c
+where c.c_custkey < 1234
+  and c.c_custkey < 2345
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey < 1234, c.c_custkey < 2345
+   row-size=218B cardinality=15.00K
+====
+# Two inequalities. No useful stats.
+# But with effect of exponential-backoff
+# Bug: IMPALA-8037
+# Bug: Expected cardinality ~28.5K
+select *
+from tpch.customer c
+where c.c_custkey < 1234
+  and c.c_nationkey < 100
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey < 1234, c.c_nationkey < 100
+   row-size=218B cardinality=15.00K
+====
+# Between. No useful stats. Should assume, say 0.16
+# But, gets rewritten before computing selectivity,
+# So same as AND'ed inequalities
+# sel = 0.1, but Ramakrishnan and Gherke suggest 1/4
+# Bug: IMPALA-8037
+# Bug: Expected cardinality ~28.5K
+select *
+from tpch.customer c
+where c.c_custkey between 1234 and 2345
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey <= 2345, c.c_custkey >= 1234
+   row-size=218B cardinality=15.00K
+====
+# Compound inequality (< and >) that is the same as BETWEEN.
+# Gets compouted as AND of inequalities.
+# sel = 0.1 (single sel for all)
+# But, these conditions are not independent, so a better estimate
+# would be, say 1/6
+# Bugs: TBD
+# Bug: Expected cardinality ~28.5K
+# |<<< <<< ===| c > x
+# |=== >>> >>>| c < y
+# |<<< === >>>| c > x AND c < y
+select *
+from tpch.customer c
+where c.c_custkey >= 1234 and c.c_custkey <= 2345
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey <= 2345, c.c_custkey >= 1234
+   row-size=218B cardinality=15.00K
+====
+# Between and redundant inequality
+# Bug: expected cardinality ~15K
+select *
+from tpch.customer c
+where c.c_custkey between 1234 and 2345
+  and c.c_custkey <= 2345
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_custkey <= 2345, c.c_custkey >= 1234
+   row-size=218B cardinality=15.00K
+====
+# Partitioned scan.
+# |table| = 11K
+# |partition| = 1000
+# Five patitions match
+# |table'| = 5000
+select *
+from functional.alltypesagg a
+where a.day >= 6
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypesagg a]
+   partition predicates: a.`day` >= 6
+   partitions=5/11 files=5 size=372.38KB
+   row-size=95B cardinality=5.00K
+====
+# Partitioned table, one partition matches
+select *
+from functional.alltypesagg a
+where a.day = 6
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypesagg a]
+   partition predicates: a.`day` = 6
+   partitions=1/11 files=1 size=74.48KB
+   row-size=95B cardinality=1.00K
+====
+# Partitioned table, no partitions match
+select *
+from functional.alltypesagg a
+where a.day = 23
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypesagg a]
+   partition predicates: a.`day` = 23
+   partitions=0/11 files=0 size=0B
+   row-size=95B cardinality=0
+====
+# IS NULL
+# Estimated from null count, which is 0 for TPCH tables.
+# TODO: Add tests for ndv stats but no null counts
+# Impala does not suport NOT NULL fields, so we have to assume the
+# field can be null. That means null is one of the distinct values.
+# sel(mktsegment is null) = 1/NDV = 0.2
+# Bug: Many, expected cardinality ~15K
+select *
+from tpch.customer c
+where c.c_mktsegment is null
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_mktsegment IS NULL
+   row-size=218B cardinality=15.00K
+====
+# IS NOT NULL
+# Similar to above.
+# Bug: Expected cardinality ~15K
+select *
+from tpch.customer c
+where c.c_mktsegment is not null
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: c.c_mktsegment IS NOT NULL
+   row-size=218B cardinality=15.00K
+====
+# |alltypesagg| = 11K. Null count of tinyint_col = 2000.
+select *
+from functional.alltypesagg
+where tinyint_col is null
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypesagg]
+   partitions=11/11 files=11 size=814.73KB
+   predicates: tinyint_col IS NULL
+   row-size=95B cardinality=2.00K
+====
+# As above.
+select *
+from functional.alltypesagg
+where tinyint_col is not null
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypesagg]
+   partitions=11/11 files=11 size=814.73KB
+   predicates: tinyint_col IS NOT NULL
+   row-size=95B cardinality=9.00K
+====
+# IS NULL on an expression. Guess 0.1 selectivity
+select *
+from tpch.customer c
+where concat(c.c_mktsegment, c_comment) is null
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: concat(c.c_mktsegment, c_comment) IS NULL
+   row-size=218B cardinality=15.00K
+====
+# IS NOT NULL.
+# Bug: Should guess 0.9 selectivity, actually guesses 0.1
+# Expected cardinality ~15K
+select *
+from tpch.customer c
+where concat(c.c_mktsegment, c_comment) is not null
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [tpch.customer c]
+   partitions=1/1 files=1 size=23.08MB
+   predicates: concat(c.c_mktsegment, c_comment) IS NOT NULL
+   row-size=218B cardinality=15.00K
+====
+# Scan of a table with no stats and zero rows
+select *
+from functional.alltypesnopart
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypesnopart]
+   partitions=1/1 files=0 size=0B
+   row-size=72B cardinality=0
+====
+# Filter on the no-stats table
+select *
+from functional.alltypesnopart
+where int_col = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypesnopart]
+   partitions=1/1 files=0 size=0B
+   predicates: int_col = 10
+   row-size=72B cardinality=0
+====


[impala] 06/06: IMPALA-7999: clean up start-*d.sh scripts

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f9ced753badae46fe2f23cd9aa4383f1bf6844b3
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Wed Jan 23 17:12:49 2019 -0800

    IMPALA-7999: clean up start-*d.sh scripts
    
    Delete these wrapper scripts and replace with a generic
    start-daemon.sh script that sets environment variables
    without the other logic.
    
    Move the logic for setting JAVA_TOOL_OPTIONS into
    start-impala-cluster.py.
    
    Remove some options like -jvm_suspend, -gdb, -perf that
    may not be used. These can be reintroduced if needed.
    
    Port across the kerberized minicluster logic (which has
    probably bitrotted) in case it needs to be revived.
    
    Remove --verbose option that didn't appear to be useful
    (it claims to print daemon output to the console,
    but output is still redirected regardless).
    
    Removed a level of quoting in custom cluster test argument
    handling - this was made unnecessary by properly escaping
    arguments with pipes.escape() in run_daemon().
    
    Testing:
    * Ran exhaustive tests.
    * Ran on CentOS 6 to confirm we didn't reintroduce Popen issue
      worked around by kwho.
    
    Change-Id: Ib67444fd4def8da119db5d3a0832ef1de15b068b
    Reviewed-on: http://gerrit.cloudera.org:8080/12271
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/start-catalogd.sh                     |  75 -----------------
 bin/start-daemon.sh                       |  33 ++++++++
 bin/start-impala-cluster.py               | 129 +++++++++++++++---------------
 bin/start-impalad.sh                      | 103 ------------------------
 bin/start-statestored.sh                  |  64 ---------------
 tests/common/custom_cluster_test_suite.py |   2 +-
 tests/common/impala_cluster.py            |  66 ++++++++++-----
 tests/custom_cluster/test_breakpad.py     |   4 +-
 tests/custom_cluster/test_redaction.py    |  10 +--
 tests/custom_cluster/test_scratch_disk.py |  10 +--
 10 files changed, 157 insertions(+), 339 deletions(-)

diff --git a/bin/start-catalogd.sh b/bin/start-catalogd.sh
deleted file mode 100755
index ec1fe5c..0000000
--- a/bin/start-catalogd.sh
+++ /dev/null
@@ -1,75 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Starts up a Catalog Service with the specified command line arguments. An optional
-# -build_type parameter can be passed to determine the build type to use.
-
-set -euo pipefail
-. $IMPALA_HOME/bin/report_build_error.sh
-setup_report_build_error
-
-BUILD_TYPE=latest
-CATALOGD_ARGS=""
-BINARY_BASE_DIR=${IMPALA_HOME}/be/build
-JVM_DEBUG_PORT=""
-JVM_SUSPEND="n"
-JVM_ARGS=""
-
-# Everything except for -build_type should be passed as a catalogd argument
-for ARG in $*
-do
-  case "$ARG" in
-    -build_type=debug) BUILD_TYPE=debug;;
-    -build_type=release) BUILD_TYPE=release;;
-    -build_type=latest) ;;
-    -build_type=*)
-      echo "Invalid build type. Valid values are: debug, release"
-      exit 1;;
-    -jvm_debug_port=*) JVM_DEBUG_PORT="${ARG#*=}";;
-    -jvm_suspend) JVM_SUSPEND="y";;
-    -jvm_args=*) JVM_ARGS="${ARG#*=}";;
-    -kudu_masters=*) CATALOGD_ARGS+=" ${ARG#*=}";;
-    *) CATALOGD_ARGS="${CATALOGD_ARGS} ${ARG}";;
-  esac
-done
-
-: ${JAVA_TOOL_OPTIONS=}
-# Optionally enable Java debugging.
-if [ -n "$JVM_DEBUG_PORT" ]; then
-  export JAVA_TOOL_OPTIONS="-agentlib:jdwp=transport=dt_socket,address=localhost:${JVM_DEBUG_PORT},server=y,suspend=${JVM_SUSPEND} ${JAVA_TOOL_OPTIONS}"
-fi
-# Optionally add additional JVM args.
-if [ -n "$JVM_ARGS" ]; then
-  export JAVA_TOOL_OPTIONS="${JAVA_TOOL_OPTIONS} ${JVM_ARGS}"
-fi
-
-# If Kerberized, source appropriate vars and set startup options
-if ${CLUSTER_DIR}/admin is_kerberized; then
-  . ${MINIKDC_ENV}
-  CATALOGD_ARGS="${CATALOGD_ARGS} -principal=${MINIKDC_PRINC_IMPALA_BE}"
-  CATALOGD_ARGS="${CATALOGD_ARGS} -keytab_file=${KRB5_KTNAME}"
-  CATALOGD_ARGS="${CATALOGD_ARGS} -krb5_conf=${KRB5_CONFIG}"
-  if [ "${MINIKDC_DEBUG}" = "true" ]; then
-      CATALOGD_ARGS="${CATALOGD_ARGS} -krb5_debug_file=/tmp/catalogd.krb5_debug"
-  fi
-fi
-
-. ${IMPALA_HOME}/bin/set-classpath.sh
-export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
-exec ${BINARY_BASE_DIR}/${BUILD_TYPE}/service/catalogd ${CATALOGD_ARGS}
diff --git a/bin/start-daemon.sh b/bin/start-daemon.sh
new file mode 100755
index 0000000..9df3934
--- /dev/null
+++ b/bin/start-daemon.sh
@@ -0,0 +1,33 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Runs the provided command line with the required environment variables for
+# the various daemon processes - impalad, catalogd, statestored. Used to
+# start up minicluster daemon processes.
+
+set -euo pipefail
+# If Kerberized, source appropriate vars.
+if ${CLUSTER_DIR}/admin is_kerberized; then
+  . ${MINIKDC_ENV}
+fi
+
+. ${IMPALA_HOME}/bin/set-classpath.sh
+# LLVM must be on path to symbolise sanitiser stack traces.
+export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
+"$@"
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index b86e6c5..261022d 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -39,7 +39,8 @@ from tests.common.impala_cluster import (ImpalaCluster, DEFAULT_BEESWAX_PORT,
     DEFAULT_HS2_PORT, DEFAULT_BE_PORT, DEFAULT_KRPC_PORT,
     DEFAULT_STATE_STORE_SUBSCRIBER_PORT, DEFAULT_IMPALAD_WEBSERVER_PORT,
     DEFAULT_STATESTORED_WEBSERVER_PORT, DEFAULT_CATALOGD_WEBSERVER_PORT,
-    find_user_processes)
+    DEFAULT_CATALOGD_JVM_DEBUG_PORT, DEFAULT_IMPALAD_JVM_DEBUG_PORT,
+    find_user_processes, run_daemon)
 
 logging.basicConfig(level=logging.ERROR, format="%(asctime)s %(threadName)s: %(message)s",
     datefmt="%H:%M:%S")
@@ -91,8 +92,6 @@ parser.add_option("--log_dir", dest="log_dir",
                   help="Directory to store output logs to.")
 parser.add_option("--max_log_files", default=DEFAULT_IMPALA_MAX_LOG_FILES,
                   help="Max number of log files before rotation occurs.")
-parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False,
-                  help="Prints all output to stderr/stdout.")
 parser.add_option("--log_level", type="int", dest="log_level", default=1,
                    help="Set the impalad backend logging level")
 parser.add_option("--jvm_args", dest="jvm_args", default="",
@@ -128,16 +127,6 @@ options, args = parser.parse_args()
 
 IMPALA_HOME = os.environ["IMPALA_HOME"]
 KNOWN_BUILD_TYPES = ["debug", "release", "latest"]
-IMPALAD_PATH = os.path.join(IMPALA_HOME,
-    "bin/start-impalad.sh -build_type={build_type}".format(
-        build_type=options.build_type))
-STATE_STORE_PATH = os.path.join(IMPALA_HOME,
-    "bin/start-statestored.sh -build_type={build_type}".format(
-        build_type=options.build_type))
-CATALOGD_PATH = os.path.join(IMPALA_HOME,
-    "bin/start-catalogd.sh -build_type={build_type}".format(
-        build_type=options.build_type))
-MINI_IMPALA_CLUSTER_PATH = IMPALAD_PATH + " -in-process"
 
 # Kills have a timeout to prevent automated scripts from hanging indefinitely.
 # It is set to a high value to avoid failing if processes are slow to shut down.
@@ -157,20 +146,24 @@ def check_process_exists(binary, attempts=1):
     sleep(1)
   return False
 
-def exec_impala_process(cmd, args, stderr_log_file_path):
-  redirect_output = str()
-  if options.verbose:
-    args += " -logtostderr=1"
-  else:
-    redirect_output = "1>{stderr_log_file_path}".format(
-        stderr_log_file_path=stderr_log_file_path)
-  # TODO: it would be better to use Popen and pass in tokenised arguments rather than
-  # relying on shell tokenisation.
-  cmd = "{cmd} {args} {redirect_output} 2>&1 &".format(
-      cmd=cmd,
-      args=args,
-      redirect_output=redirect_output)
-  os.system(cmd)
+
+def run_daemon_with_options(daemon_binary, args, output_file, jvm_debug_port=None):
+  """Wrapper around run_daemon() with options determined from command-line options."""
+  env_vars = {"JAVA_TOOL_OPTIONS": build_java_tool_options(jvm_debug_port)}
+  run_daemon(daemon_binary, args, build_type=options.build_type, env_vars=env_vars,
+      output_file=output_file)
+
+
+def build_java_tool_options(jvm_debug_port=None):
+  """Construct the value of the JAVA_TOOL_OPTIONS environment variable to pass to
+  daemons."""
+  java_tool_options = ""
+  if jvm_debug_port is not None:
+    java_tool_options = ("-agentlib:jdwp=transport=dt_socket,address={debug_port}," +
+        "server=y,suspend=n ").format(debug_port=jvm_debug_port) + java_tool_options
+  if options.jvm_args is not None:
+    java_tool_options += " " + options.jvm_args
+  return java_tool_options
 
 def kill_matching_processes(binary_names, force=False):
   """Kills all processes with the given binary name, waiting for them to exit"""
@@ -231,18 +224,6 @@ def build_logging_args(service_name):
   return result
 
 
-def build_jvm_args(instance_num):
-  """Return a list of command line arguments to pass to start-*.sh to configure the JVM.
-  """
-  # TODO: IMPALA-7999 - Docker build doesn't use the start-*.sh scripts that process
-  # these args. Skip generating until that is fixed.
-  if options.docker_network is not None:
-    return []
-  DEFAULT_JVM_DEBUG_PORT = 30000
-  return ["-jvm_debug_port={0}".format(DEFAULT_JVM_DEBUG_PORT + instance_num),
-          "-jvm_args={0}".format(options.jvm_args)]
-
-
 def impalad_service_name(i):
   """Return the name to use for the ith impala daemon in the cluster."""
   if i == 0:
@@ -264,7 +245,7 @@ def combine_arg_list_opts(opt_args):
 
 def build_statestored_arg_list():
   """Build a list of command line arguments to pass to the statestored."""
-  return (build_logging_args("statestored") +
+  return (build_logging_args("statestored") + build_kerberos_args("statestored") +
       combine_arg_list_opts(options.state_store_args))
 
 
@@ -272,21 +253,19 @@ def build_catalogd_arg_list():
   """Build a list of command line arguments to pass to the catalogd."""
   return (build_logging_args("catalogd") +
       ["-kudu_master_hosts", options.kudu_master_hosts] +
-      combine_arg_list_opts(options.catalogd_args) +
-      build_jvm_args(options.cluster_size))
+      build_kerberos_args("catalogd") +
+      combine_arg_list_opts(options.catalogd_args))
 
 
 def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordinators,
     remap_ports):
   """Build the argument lists for impala daemons in the cluster. Returns a list of
   argument lists, one for each impala daemon in the cluster. Each argument list is
-  simply a string that will be tokenized based on shell rules. 'num_coordinators' and
-  'use_exclusive_coordinators' allow setting up the cluster with dedicated coordinators.
-  If 'remap_ports' is true, the impalad ports are changed from their default values to
-  avoid port conflicts."""
-  # TODO: it would be better to produce a list of arguments rather than a big string
-  # blob that has to be tokenized later. However, that may require more substantial
-  # refactoring.
+  a list of strings. 'num_coordinators' and 'use_exclusive_coordinators' allow setting
+  up the cluster with dedicated coordinators.  If 'remap_ports' is true, the impalad
+  ports are changed from their default values to avoid port conflicts."""
+  # TODO: currently we build a big string blob then split it. It would be better to
+  # build up the lists directly.
 
   mem_limit_arg = ""
   if options.docker_network is None:
@@ -317,13 +296,13 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi
     param_args = (" ".join(options.impalad_args)).replace("#ID", str(i))
     args = ("{mem_limit_arg} "
         "{impala_logging_args} "
-        "{jvm_args} "
         "{impala_port_args} "
+        "{impala_kerberos_args} "
         "{param_args}").format(
             mem_limit_arg=mem_limit_arg,  # Goes first so --impalad_args will override it.
             impala_logging_args=" ".join(build_logging_args(service_name)),
-            jvm_args=" ".join(build_jvm_args(i)),
             impala_port_args=impala_port_args,
+            impala_kerberos_args=" ".join(build_kerberos_args("impalad")),
             param_args=param_args)
     if options.kudu_master_hosts:
       # Must be prepended, otherwise the java options interfere.
@@ -351,10 +330,29 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi
     if i < len(per_impalad_args):
       args = "{args} {per_impalad_args}".format(
           args=args, per_impalad_args=per_impalad_args[i])
-    impalad_args.append(args)
+    impalad_args.append(shlex.split(args))
   return impalad_args
 
 
+def build_kerberos_args(daemon):
+  """If the cluster is kerberized, returns arguments to pass to daemon process.
+  daemon should either be "impalad", "catalogd" or "statestored"."""
+  # Note: this code has probably bit-rotted but is preserved in case someone needs to
+  # revive the kerberized minicluster.
+  assert daemon in ("impalad", "catalogd", "statestored")
+  if call([os.path.join(IMPALA_HOME, "testdata/cluster/admin"), "is_kerberized"]) != 0:
+    return []
+  args = ["-keytab_file={0}".format(os.getenv("KRB5_KTNAME")),
+          "-krb5_conf={0}".format(os.getenv("KRB5_CONFIG"))]
+  if daemon == "impalad":
+    args += ["-principal={0}".format(os.getenv("MINIKDC_PRINC_IMPALA")),
+             "-be_principal={0}".format(os.getenv("MINIKDC_PRINC_IMPALA_BE"))]
+  else:
+    args.append("-principal={0}".format(os.getenv("MINIKDC_PRINC_IMPALA_BE")))
+  if os.getenv("MINIKDC_DEBUG", "") == "true":
+    args.append("-krb5_debug_file=/tmp/{0}.krb5_debug".format(daemon))
+
+
 def compute_impalad_mem_limit(cluster_size):
   # Set mem_limit of each impalad to the smaller of 12GB or
   # 1/cluster_size (typically 1/3) of 70% of system memory.
@@ -396,9 +394,8 @@ class MiniClusterOperations(object):
   def start_statestore(self):
     LOG.info("Starting State Store logging to {log_dir}/statestored.INFO".format(
         log_dir=options.log_dir))
-    stderr_log_file_path = os.path.join(options.log_dir, "statestore-error.log")
-    args = " ".join(build_statestored_arg_list())
-    exec_impala_process(STATE_STORE_PATH, args, stderr_log_file_path)
+    output_file = os.path.join(options.log_dir, "statestore-out.log")
+    run_daemon_with_options("statestored", build_statestored_arg_list(), output_file)
     if not check_process_exists("statestored", 10):
       raise RuntimeError("Unable to start statestored. Check log or file permissions"
                          " for more details.")
@@ -406,9 +403,9 @@ class MiniClusterOperations(object):
   def start_catalogd(self):
     LOG.info("Starting Catalog Service logging to {log_dir}/catalogd.INFO".format(
         log_dir=options.log_dir))
-    stderr_log_file_path = os.path.join(options.log_dir, "catalogd-error.log")
-    args = " ".join(build_catalogd_arg_list())
-    exec_impala_process(CATALOGD_PATH, args, stderr_log_file_path)
+    output_file = os.path.join(options.log_dir, "catalogd-out.log")
+    run_daemon_with_options("catalogd", build_catalogd_arg_list(), output_file,
+        jvm_debug_port=DEFAULT_CATALOGD_JVM_DEBUG_PORT)
     if not check_process_exists("catalogd", 10):
       raise RuntimeError("Unable to start catalogd. Check log or file permissions"
                          " for more details.")
@@ -428,9 +425,10 @@ class MiniClusterOperations(object):
       service_name = impalad_service_name(i)
       LOG.info("Starting Impala Daemon logging to {log_dir}/{service_name}.INFO".format(
           log_dir=options.log_dir, service_name=service_name))
-      stderr_log_file_path = os.path.join(
-          options.log_dir, "{service_name}-error.log".format(service_name=service_name))
-      exec_impala_process(IMPALAD_PATH, impalad_arg_lists[i], stderr_log_file_path)
+      output_file = os.path.join(
+          options.log_dir, "{service_name}-out.log".format(service_name=service_name))
+      run_daemon_with_options("impalad", impalad_arg_lists[i],
+          jvm_debug_port=DEFAULT_IMPALAD_JVM_DEBUG_PORT + i, output_file=output_file)
 
 
 class DockerMiniClusterOperations(object):
@@ -495,8 +493,8 @@ class DockerMiniClusterOperations(object):
       port_map = {DEFAULT_BEESWAX_PORT: chosen_ports['beeswax_port'],
                   DEFAULT_HS2_PORT: chosen_ports['hs2_port'],
                   DEFAULT_IMPALAD_WEBSERVER_PORT: chosen_ports['webserver_port']}
-      self.__run_container__("impalad_coord_exec",
-          shlex.split(impalad_arg_lists[i]), port_map, i, mem_limit=mem_limit)
+      self.__run_container__("impalad_coord_exec", impalad_arg_lists[i], port_map, i,
+          mem_limit=mem_limit)
 
   def __gen_container_name__(self, daemon, instance=None):
     """Generate the name for the container, which should be unique among containers
@@ -528,7 +526,10 @@ class DockerMiniClusterOperations(object):
                    for src, dst in port_map.iteritems()]
     # Impersonate the current user for operations against the minicluster. This is
     # necessary because the user name inside the container is "root".
-    env_args = ["-e", "HADOOP_USER_NAME={0}".format(getpass.getuser())]
+    # TODO: pass in the actual options
+    env_args = ["-e", "HADOOP_USER_NAME={0}".format(getpass.getuser()),
+                "-e", "JAVA_TOOL_OPTIONS={0}".format(
+                    build_java_tool_options(DEFAULT_IMPALAD_JVM_DEBUG_PORT))]
     # The container build processes tags the generated image with the daemon name.
     image_tag = daemon
     host_name = self.__gen_host_name__(daemon, instance)
diff --git a/bin/start-impalad.sh b/bin/start-impalad.sh
deleted file mode 100755
index dacd44c..0000000
--- a/bin/start-impalad.sh
+++ /dev/null
@@ -1,103 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Starts up an impalad with the specified command line arguments. An optional -build_type
-# parameter can be passed to determine the build type to use for the impalad instance.
-
-set -euo pipefail
-. $IMPALA_HOME/bin/report_build_error.sh
-setup_report_build_error
-
-BUILD_TYPE=latest
-IMPALAD_ARGS=""
-BINARY_BASE_DIR=${IMPALA_HOME}/be/build
-TOOL_PREFIX=""
-IMPALAD_BINARY=service/impalad
-BINARY=${IMPALAD_BINARY}
-JVM_DEBUG_PORT=""
-JVM_SUSPEND="n"
-JVM_ARGS=""
-PERF_ARGS=${PERF_ARGS:-"record -F 99"}
-
-for ARG in $*
-do
-  case "$ARG" in
-    -build_type=debug)
-      BUILD_TYPE=debug
-      ;;
-    -build_type=release)
-      BUILD_TYPE=release
-      ;;
-    -build_type=latest)
-      ;;
-    -build_type=*)
-      echo "Invalid build type. Valid values are: debug, release"
-      exit 1
-      ;;
-    -gdb)
-      echo "Starting Impala under gdb..."
-      TOOL_PREFIX="gdb --args"
-      ;;
-    -jvm_debug_port=*)
-      JVM_DEBUG_PORT="${ARG#*=}"
-      ;;
-    -jvm_suspend)
-      JVM_SUSPEND="y"
-      ;;
-    -jvm_args=*)
-      JVM_ARGS="${ARG#*=}"
-      ;;
-    -perf)
-      echo "Starting Impala with 'perf' tracing. Set \$PERF_ARGS to customize use."
-      TOOL_PREFIX="perf ${PERF_ARGS}"
-      ;;
-    # Pass all other options as an Impalad argument
-    *)
-      IMPALAD_ARGS="${IMPALAD_ARGS} ${ARG}"
-  esac
-done
-
-IMPALA_CMD=${BINARY_BASE_DIR}/${BUILD_TYPE}/${BINARY}
-
-# Temporarily disable unbound variable checking in case JAVA_TOOL_OPTIONS is not set.
-set +u
-# Optionally enable Java debugging.
-if [ -n "$JVM_DEBUG_PORT" ]; then
-  export JAVA_TOOL_OPTIONS="-agentlib:jdwp=transport=dt_socket,address=${JVM_DEBUG_PORT},server=y,suspend=${JVM_SUSPEND} ${JAVA_TOOL_OPTIONS}"
-fi
-# Optionally add additional JVM args.
-if [ -n "$JVM_ARGS" ]; then
-  export JAVA_TOOL_OPTIONS="${JAVA_TOOL_OPTIONS} ${JVM_ARGS}"
-fi
-
-# If Kerberized, source appropriate vars and set startup options
-if ${CLUSTER_DIR}/admin is_kerberized; then
-  . ${MINIKDC_ENV}
-  IMPALAD_ARGS="${IMPALAD_ARGS} -principal=${MINIKDC_PRINC_IMPALA}"
-  IMPALAD_ARGS="${IMPALAD_ARGS} -be_principal=${MINIKDC_PRINC_IMPALA_BE}"
-  IMPALAD_ARGS="${IMPALAD_ARGS} -keytab_file=${KRB5_KTNAME}"
-  IMPALAD_ARGS="${IMPALAD_ARGS} -krb5_conf=${KRB5_CONFIG}"
-  if [ "${MINIKDC_DEBUG}" = "true" ]; then
-      IMPALAD_ARGS="${IMPALAD_ARGS} -krb5_debug_file=/tmp/impalad.krb5_debug"
-  fi
-fi
-
-. ${IMPALA_HOME}/bin/set-classpath.sh
-export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
-exec ${TOOL_PREFIX} ${IMPALA_CMD} ${IMPALAD_ARGS}
diff --git a/bin/start-statestored.sh b/bin/start-statestored.sh
deleted file mode 100755
index b14f3fe..0000000
--- a/bin/start-statestored.sh
+++ /dev/null
@@ -1,64 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Starts up the StateStored with the specified command line arguments.
-
-set -euo pipefail
-. $IMPALA_HOME/bin/report_build_error.sh
-setup_report_build_error
-
-BUILD_TYPE=latest
-STATESTORED_ARGS=""
-BINARY_BASE_DIR=${IMPALA_HOME}/be/build
-
-# Everything except for -build_type should be passed as a statestored argument
-for ARG in $*
-do
-  case "$ARG" in
-    -build_type=debug)
-      BUILD_TYPE=debug
-      ;;
-    -build_type=release)
-      BUILD_TYPE=release
-      ;;
-    -build_type=latest)
-      ;;
-    -build_type=*)
-      echo "Invalid build type. Valid values are: debug, release"
-      exit 1
-      ;;
-    *)
-      STATESTORED_ARGS="${STATESTORED_ARGS} ${ARG}"
-      ;;
-  esac
-done
-
-# If Kerberized, source appropriate vars and set startup options
-if ${CLUSTER_DIR}/admin is_kerberized; then
-  . ${MINIKDC_ENV}
-  STATESTORED_ARGS="${STATESTORED_ARGS} -principal=${MINIKDC_PRINC_IMPALA_BE}"
-  STATESTORED_ARGS="${STATESTORED_ARGS} -keytab_file=${KRB5_KTNAME}"
-  STATESTORED_ARGS="${STATESTORED_ARGS} -krb5_conf=${KRB5_CONFIG}"
-  if [ "${MINIKDC_DEBUG}" = "true" ]; then
-      STATESTORED_ARGS="${STATESTORED_ARGS} -krb5_debug_file=/tmp/statestored.krb5_debug"
-  fi
-fi
-
-export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
-exec ${BINARY_BASE_DIR}/${BUILD_TYPE}/service/statestored ${STATESTORED_ARGS}
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 7cea974..f6c7997 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -131,7 +131,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     cluster_args = list()
     for arg in [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS]:
       if arg in method.func_dict:
-        cluster_args.append("--%s=\"%s\" " % (arg, method.func_dict[arg]))
+        cluster_args.append("--%s=%s " % (arg, method.func_dict[arg]))
     if START_ARGS in method.func_dict:
       cluster_args.append(method.func_dict[START_ARGS])
 
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 246c272..751c792 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -20,6 +20,7 @@
 import json
 import logging
 import os
+import pipes
 import psutil
 import socket
 import sys
@@ -44,8 +45,7 @@ LOG = logging.getLogger('impala_cluster')
 LOG.setLevel(level=logging.DEBUG)
 
 IMPALA_HOME = os.environ['IMPALA_HOME']
-CATALOGD_PATH = os.path.join(IMPALA_HOME, 'bin/start-catalogd.sh')
-IMPALAD_PATH = os.path.join(IMPALA_HOME, 'bin/start-impalad.sh -build_type=latest')
+START_DAEMON_PATH = os.path.join(IMPALA_HOME, 'bin/start-daemon.sh')
 
 DEFAULT_BEESWAX_PORT = 21000
 DEFAULT_HS2_PORT = 21050
@@ -57,6 +57,9 @@ DEFAULT_IMPALAD_WEBSERVER_PORT = 25000
 DEFAULT_STATESTORED_WEBSERVER_PORT = 25010
 DEFAULT_CATALOGD_WEBSERVER_PORT = 25020
 
+DEFAULT_IMPALAD_JVM_DEBUG_PORT = 30000
+DEFAULT_CATALOGD_JVM_DEBUG_PORT = 30030
+
 # Timeout to use when waiting for a cluster to start up. Set quite high to avoid test
 # flakiness.
 CLUSTER_WAIT_TIMEOUT_IN_SECONDS = 240
@@ -308,18 +311,6 @@ class Process(object):
     LOG.info("No PID found for process cmdline: %s. Process is dead?" % self.cmd)
     return None
 
-  def start(self):
-    if self.container_id is None:
-      LOG.info("Starting process: {0}".format(' '.join(self.cmd)))
-      # Use os.system() to start 'cmd' in the background via a shell so its parent will be
-      # init after the shell exits. Otherwise, the parent of 'cmd' will be py.test and we
-      # cannot cleanly kill it until py.test exits. In theory, Popen(shell=True) should
-      # achieve the same thing but it doesn't work on some platforms for some reasons.
-      os.system(' '.join(self.cmd) + ' &')
-    else:
-      LOG.info("Starting container: {0}".format(self.container_id))
-      check_call(["docker", "container", "start", self.container_id])
-
   def kill(self, signal=SIGKILL):
     """
     Kills the given processes.
@@ -334,6 +325,16 @@ class Process(object):
       LOG.info("Stopping container: {0}".format(self.container_id))
       check_call(["docker", "container", "stop", self.container_id])
 
+  def start(self):
+    """Start the process with the same arguments after it was stopped."""
+    if self.container_id is None:
+      binary = os.path.basename(self.cmd[0])
+      restart_args = self.cmd[1:]
+      LOG.info("Starting {0} with arguments".format(binary, restart_args))
+      run_daemon(binary, restart_args)
+    else:
+      LOG.info("Starting container: {0}".format(self.container_id))
+      check_call(["docker", "container", "start", self.container_id])
 
   def restart(self):
     """Kills and restarts the process"""
@@ -409,9 +410,9 @@ class ImpaladProcess(BaseImpalaProcess):
 
   def start(self, wait_until_ready=True):
     """Starts the impalad and waits until the service is ready to accept connections."""
-    restart_cmd = [IMPALAD_PATH] + self.cmd[1:] + ['&']
-    LOG.info("Starting Impalad process: %s" % ' '.join(restart_cmd))
-    os.system(' '.join(restart_cmd))
+    restart_args = self.cmd[1:]
+    LOG.info("Starting Impalad process with args: {0}".format(restart_args))
+    run_daemon("impalad", restart_args)
     if wait_until_ready:
       self.service.wait_for_metric_value('impala-server.ready',
                                          expected_value=1, timeout=30)
@@ -466,9 +467,9 @@ class CatalogdProcess(BaseImpalaProcess):
 
   def start(self, wait_until_ready=True):
     """Starts catalogd and waits until the service is ready to accept connections."""
-    restart_cmd = [CATALOGD_PATH] + self.cmd[1:] + ["&"]
-    LOG.info("Starting Catalogd process: %s" % ' '.join(restart_cmd))
-    os.system(' '.join(restart_cmd))
+    restart_args = self.cmd[1:]
+    LOG.info("Starting Catalogd process: {0}".format(restart_args))
+    run_daemon("catalogd", restart_args)
     if wait_until_ready:
       self.service.wait_for_metric_value('statestore-subscriber.connected',
                                          expected_value=1, timeout=30)
@@ -487,3 +488,28 @@ def find_user_processes(binaries):
     except psutil.NoSuchProcess, e:
       # Ignore the case when a process no longer exists.
       pass
+
+
+def run_daemon(daemon_binary, args, build_type="latest", env_vars={}, output_file=None):
+  """Starts up an impalad with the specified command line arguments. args must be a list
+  of strings. An optional build_type parameter can be passed to determine the build type
+  to use for the impalad instance.  Any values in the env_vars override environment
+  variables inherited from this process. If output_file is specified, stdout and stderr
+  are redirected to that file.
+  """
+  bin_path = os.path.join(IMPALA_HOME, "be", "build", build_type, "service",
+                          daemon_binary)
+  redirect = ""
+  if output_file is not None:
+    redirect = "1>{0} 2>&1".format(output_file)
+  cmd = [START_DAEMON_PATH, bin_path] + args
+  # Use os.system() to start 'cmd' in the background via a shell so its parent will be
+  # init after the shell exits. Otherwise, the parent of 'cmd' will be py.test and we
+  # cannot cleanly kill it until py.test exits. In theory, Popen(shell=True) should
+  # achieve the same thing but it doesn't work on some platforms for some reasons.
+  sys_cmd = ("{set_cmds} {cmd} {redirect} &".format(
+      set_cmds=''.join(["export {0}={1};".format(k, pipes.quote(v))
+                         for k, v in env_vars.iteritems()]),
+      cmd=' '.join([pipes.quote(tok) for tok in cmd]),
+      redirect=redirect))
+  os.system(sys_cmd)
diff --git a/tests/custom_cluster/test_breakpad.py b/tests/custom_cluster/test_breakpad.py
index 0726a2b..91f79d9 100644
--- a/tests/custom_cluster/test_breakpad.py
+++ b/tests/custom_cluster/test_breakpad.py
@@ -66,8 +66,8 @@ class TestBreakpadBase(CustomClusterTestSuite):
   def start_cluster_with_args(self, **kwargs):
     cluster_options = []
     for daemon_arg in DAEMON_ARGS:
-      daemon_options = " ".join("-%s=%s" % i for i in kwargs.iteritems())
-      cluster_options.append("""--%s='%s'""" % (daemon_arg, daemon_options))
+      daemon_options = " ".join("-{0}={1}".format(k, v) for k, v in kwargs.iteritems())
+      cluster_options.append("--{0}={1}".format(daemon_arg, daemon_options))
     self._start_impala_cluster(cluster_options)
 
   def start_cluster(self):
diff --git a/tests/custom_cluster/test_redaction.py b/tests/custom_cluster/test_redaction.py
index 3bf831e..e13d365 100644
--- a/tests/custom_cluster/test_redaction.py
+++ b/tests/custom_cluster/test_redaction.py
@@ -92,10 +92,10 @@ class TestRedaction(CustomClusterTestSuite, unittest.TestCase):
       file.write(redaction_rules)
 
     self._start_impala_cluster(
-        ["""--impalad_args='-audit_event_log_dir=%s
-                            -profile_log_dir=%s
-                            -redaction_rules_file=%s
-                            -vmodule=%s'"""
+        ["""--impalad_args=-audit_event_log_dir=%s
+                           -profile_log_dir=%s
+                           -redaction_rules_file=%s
+                           -vmodule=%s"""
             % (self.audit_dir, self.profile_dir, self.rules_file, vmodule)],
         impala_log_dir=self.log_dir,
         log_level=log_level)
@@ -121,7 +121,7 @@ class TestRedaction(CustomClusterTestSuite, unittest.TestCase):
     except Exception:
       if self.cluster.impalads:
         raise Exception("No impalads should have started")
-    with open(os.path.join(self.log_dir, 'impalad-error.log')) as file:
+    with open(os.path.join(self.log_dir, 'impalad-out.log')) as file:
       result = grep_file(file, expected_error_message)
     assert result, 'The expected error message was not found'
 
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index c50ba24..5640293 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -82,7 +82,7 @@ class TestScratchDir(CustomClusterTestSuite):
         scratch because all directories are on same disk."""
     normal_dirs = self.generate_dirs(5)
     self._start_impala_cluster([
-      '--impalad_args="-logbuflevel=-1 -scratch_dirs={0}"'.format(','.join(normal_dirs)),
+      '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=false'])
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=1)
@@ -96,7 +96,7 @@ class TestScratchDir(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   def test_no_dirs(self, vector):
     """ Test we can execute a query with no scratch dirs """
-    self._start_impala_cluster(['--impalad_args="-logbuflevel=-1 -scratch_dirs="'])
+    self._start_impala_cluster(['--impalad_args=-logbuflevel=-1 -scratch_dirs='])
     self.assert_impalad_log_contains("WARNING",
         "Running without spill to disk: no scratch directories provided\.")
     exec_option = vector.get_value('exec_option')
@@ -113,7 +113,7 @@ class TestScratchDir(CustomClusterTestSuite):
     """ Test we can execute a query with only bad non-writable scratch """
     non_writable_dirs = self.generate_dirs(5, writable=False)
     self._start_impala_cluster([
-      '--impalad_args="-logbuflevel=-1 -scratch_dirs={0}"'.format(
+      '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(
       ','.join(non_writable_dirs))])
     self.assert_impalad_log_contains("ERROR", "Running without spill to disk: could "
         + "not use any scratch directories in list:.*. See previous "
@@ -135,7 +135,7 @@ class TestScratchDir(CustomClusterTestSuite):
     """ Test that non-existing directories are not created or used """
     non_existing_dirs = self.generate_dirs(5, non_existing=True)
     self._start_impala_cluster([
-      '--impalad_args="-logbuflevel=-1 -scratch_dirs={0}"'.format(
+      '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(
       ','.join(non_existing_dirs))])
     self.assert_impalad_log_contains("ERROR", "Running without spill to disk: could "
         + "not use any scratch directories in list:.*. See previous "
@@ -159,7 +159,7 @@ class TestScratchDir(CustomClusterTestSuite):
         have permissions changed or are removed after impalad startup."""
     dirs = self.generate_dirs(3);
     self._start_impala_cluster([
-      '--impalad_args="-logbuflevel=-1 -scratch_dirs={0}"'.format(','.join(dirs)),
+      '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'])
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(dirs))