You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/21 04:35:24 UTC

[impala] branch master updated (09182c8 -> b3b00da)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 09182c8  Backport KUDU-2871 (part 1): disable TLS 1.3.
     new 65175a2  IMPALA-8443: Record time spent in authorization in the runtime profile
     new 8431a95  IMPALA-7534. Handle invalidation races in CatalogdMetaProvider
     new 67cd55e  IMPALA-7802: Close connections of idle client sessions
     new b3b00da  IMPALA-7608: Estimate row count from file size when no stats available

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/catalog-op-executor.cc                 |   7 +
 be/src/rpc/TAcceptQueueServer.cpp                  |  63 ++-
 be/src/rpc/TAcceptQueueServer.h                    |   8 +-
 be/src/rpc/thrift-server.cc                        |  71 +--
 be/src/rpc/thrift-server.h                         | 115 +++-
 be/src/rpc/thrift-util.cc                          |  19 +-
 be/src/rpc/thrift-util.h                           |   7 +-
 be/src/runtime/client-cache.h                      |   4 +-
 be/src/service/impala-server.cc                    |  57 +-
 be/src/service/impala-server.h                     |   5 +
 be/src/service/query-options.cc                    |   4 +
 be/src/service/query-options.h                     |  12 +-
 common/thrift/ImpalaInternalService.thrift         |   4 +
 common/thrift/ImpalaService.thrift                 |   4 +
 .../apache/impala/analysis/AnalysisContext.java    |   4 +-
 .../impala/authorization/AuthorizationChecker.java |   6 +-
 .../impala/authorization/AuthorizationContext.java |  15 +-
 .../authorization/BaseAuthorizationChecker.java    |  19 +-
 .../authorization/NoopAuthorizationFactory.java    |   6 +-
 .../ranger/RangerAuthorizationChecker.java         |  10 +-
 .../ranger/RangerAuthorizationContext.java         |   6 +-
 .../sentry/SentryAuthorizationChecker.java         |   6 +-
 .../org/apache/impala/catalog/HdfsCompression.java |   2 +-
 .../impala/catalog/local/CatalogdMetaProvider.java | 158 +++++-
 .../org/apache/impala/planner/HdfsScanNode.java    | 118 +++-
 .../java/org/apache/impala/service/Frontend.java   |   3 +-
 .../java/org/apache/impala/util/EventSequence.java |   7 +-
 .../catalog/local/CatalogdMetaProviderTest.java    |  78 +++
 .../org/apache/impala/common/FrontendTestBase.java |   7 +-
 .../org/apache/impala/planner/CardinalityTest.java | 629 ++++++++++++++++++++-
 .../org/apache/impala/planner/PlannerTest.java     |  83 ++-
 .../org/apache/impala/planner/PlannerTestBase.java |   5 +-
 ...tr-mode-shuffle-hdfs-num-rows-est-enabled.test} |  28 +-
 ...k-join-detection-hdfs-num-rows-est-enabled.test |  86 +++
 .../joins-hdfs-num-rows-est-enabled.test           | 574 +++++++++++++++++++
 .../queries/PlannerTest/joins.test                 |  88 +++
 ...-runtime-filters-hdfs-num-rows-est-enabled.test |  59 ++
 ...t-dop-validation-hdfs-num-rows-est-enabled.test | 281 +++++++++
 .../PlannerTest/spillable-buffer-sizing.test       |   4 +-
 ...subquery-rewrite-hdfs-num-rows-est-enabled.test |  31 +
 .../QueryTest/admission-max-min-mem-limits.test    |  14 +
 .../queries/QueryTest/explain-level2.test          |  14 +
 .../queries/QueryTest/inline-view.test             |  31 +
 .../queries/QueryTest/runtime_row_filters.test     |  13 +
 .../functional-query/queries/QueryTest/set.test    |   3 +
 .../queries/QueryTest/stats-extrapolation.test     |  33 +-
 tests/custom_cluster/test_hs2.py                   |   4 +-
 tests/custom_cluster/test_local_catalog.py         |  39 +-
 tests/custom_cluster/test_session_expiration.py    |  70 ++-
 tests/observability/test_log_fragments.py          |   3 +-
 tests/query_test/test_observability.py             |  19 +-
 51 files changed, 2757 insertions(+), 179 deletions(-)
 copy testdata/workloads/functional-planner/queries/PlannerTest/{default-join-distr-mode-broadcast.test => default-join-distr-mode-shuffle-hdfs-num-rows-est-enabled.test} (71%)
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection-hdfs-num-rows-est-enabled.test
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/joins-hdfs-num-rows-est-enabled.test
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation-hdfs-num-rows-est-enabled.test
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite-hdfs-num-rows-est-enabled.test


[impala] 02/04: IMPALA-7534. Handle invalidation races in CatalogdMetaProvider

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8431a95698b6f687ac8862cc6549e1949af0b034
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Mon Jun 17 16:17:27 2019 -0700

    IMPALA-7534. Handle invalidation races in CatalogdMetaProvider
    
    This handles a race condition in which a cache invalidation concurrent
    with a cache load would potentially be skipped, causing out-of-date data
    to persist in the cache. This would present itself as spurious "table
    not found" errors.
    
    A new test case triggers the issue reliably by injecting latency into
    the metadata fetch RPC and running DDLs concurrently on the same
    database across 8 threads. With the fix, the test passes reliably.
    
    Another option to fix this might have been to switch to Caffeine instead
    of Guava's loading cache. However, Caffeine requires Java 8, and
    LocalCatalog is being backported to Impala 2.x which still can run on
    Java 7. So, working around the Guava issue will make backporting (and
    future backports) easier.
    
    Change-Id: I70f377db88e204825a909389f28dc3451815235c
    Reviewed-on: http://gerrit.cloudera.org:8080/13664
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/catalog-op-executor.cc                 |   7 +
 .../impala/catalog/local/CatalogdMetaProvider.java | 158 ++++++++++++++++++---
 .../catalog/local/CatalogdMetaProviderTest.java    |  78 ++++++++++
 tests/custom_cluster/test_local_catalog.py         |  39 ++++-
 4 files changed, 260 insertions(+), 22 deletions(-)

diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index e820b0a..3e07613 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -31,6 +31,7 @@
 #include "util/runtime-profile-counters.h"
 #include "util/string-parser.h"
 #include "util/test-info.h"
+#include "util/time.h"
 #include "gen-cpp/CatalogService.h"
 #include "gen-cpp/CatalogService_types.h"
 #include "gen-cpp/CatalogObjects_types.h"
@@ -48,6 +49,9 @@ DECLARE_bool(use_local_catalog);
 DECLARE_int32(catalog_service_port);
 DECLARE_string(catalog_service_host);
 
+DEFINE_int32_hidden(inject_latency_after_catalog_fetch_ms, 0,
+    "Latency (ms) to be injected after fetching catalog data from the catalogd");
+
 Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) {
   Status status;
   DCHECK(profile_ != NULL);
@@ -296,6 +300,9 @@ Status CatalogOpExecutor::GetPartialCatalogObject(
   RETURN_IF_ERROR(status);
   RETURN_IF_ERROR(
       client.DoRpc(&CatalogServiceClientWrapper::GetPartialCatalogObject, req, resp));
+  if (FLAGS_inject_latency_after_catalog_fetch_ms > 0) {
+    SleepForMs(FLAGS_inject_latency_after_catalog_fetch_ms);
+  }
   return Status::OK();
 }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index a15d889..9a36bb4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -26,8 +26,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -52,7 +55,6 @@ import org.apache.impala.catalog.Principal;
 import org.apache.impala.catalog.PrincipalPrivilege;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
-import org.apache.impala.common.Reference;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.service.FrontendProfile;
 import org.apache.impala.thrift.CatalogLookupStatus;
@@ -99,6 +101,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.UncheckedExecutionException;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.errorprone.annotations.Immutable;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 
@@ -232,7 +235,34 @@ public class CatalogdMetaProvider implements MetaProvider {
   // to the "direct" provider for now and circumvent catalogd.
   private DirectMetaProvider directProvider_ = new DirectMetaProvider();
 
+  /**
+   * Number of requests which piggy-backed on a concurrent request for the same key,
+   * and resulted in success. Used only for test assertions.
+   */
+  @VisibleForTesting
+  final AtomicInteger piggybackSuccessCountForTests = new AtomicInteger();
+
+  /**
+   * Number of requests which piggy-backed on a concurrent request for the same key,
+   * and resulted in an exception. Used only for test assertions.
+   */
+  @VisibleForTesting
+  final AtomicInteger piggybackExceptionCountForTests = new AtomicInteger();
+
+  /**
+   * The underlying cache.
+   *
+   * The keys in this cache are various types of objects (strings, DbCacheKey, etc).
+   * The values are also variant depending on the type of cache key. While any key
+   * is being loaded, it is a Future<T>, which gets replaced with a non-wrapped object
+   * once it is successfully loaded (see {@link #getIfPresent(Object)} for a convenient
+   * wrapper).
+   *
+   * For details of the usage of Futures within the cache, see
+   * {@link #loadWithCaching(String, String, Object, Callable).
+   *
 
+   */
   final Cache<Object,Object> cache_;
 
   /**
@@ -393,29 +423,102 @@ public class CatalogdMetaProvider implements MetaProvider {
   private <CacheKeyType, ValueType> ValueType loadWithCaching(String itemString,
       String statsCategory, CacheKeyType key,
       final Callable<ValueType> loadCallable) throws TException {
-    // TODO(todd): there a race here if an invalidation comes in while we are
-    // fetching here. Perhaps we need some locking, or need to remember the
-    // version numbers of the invalidation messages and ensure that we don't
-    // 'put' an element with a too-old version? See:
-    // https://softwaremill.com/race-condition-cache-guava-caffeine/
-    final Reference<Boolean> hit = new Reference<>(true);
+
+    // We cache Futures during loading to deal with a particularly troublesome race
+    // around invalidation (IMPALA-7534). Namely, we have the following interleaving to
+    // worry about:
+    //
+    //  Thread 1: loadTableNames() misses and sends a request to fetch table names
+    //  Catalogd: sends a response with table list ['foo']
+    //  Thread 2:    creates a table 'bar'
+    //  Catalogd:    returns an invalidation for the table name list
+    //  Thread 2:    invalidates the table list
+    //  Thread 1: response arrives with ['foo'], which is stored in the cache
+    //
+    // In this case, we've "missed" an invalidation because it arrived concurrently
+    // with the loading of a value in the cache. This is a well-known issue with
+    // Guava:
+    //
+    //    https://softwaremill.com/race-condition-cache-guava-caffeine/
+    //
+    // In order to avoid this issue, if we don't find an element in the cache, we insert
+    // a Future while we load the value. Essentially, an entry can be in one of the
+    // following states:
+    //
+    // Missing (no entry in the cache):
+    //   invalidate would be ignored, but that's OK, because any future read would fetch
+    //   new data from the catalogd, and see a version newer than the invalidate
+    //
+    // Loading (a Future<> in the cache):
+    //    invalidate removes the future. When loading completes, its attempt to swap
+    //    in the value will fail. Any request after the invalidate will cause a second
+    //    load to be triggered, which sees the post-invalidated data in catalogd.
+    //
+    //    Any concurrent *read* of the cache (with no invalidation or prior to an
+    //    invalidation) will piggy-back on the e same Future and return its result when
+    //    it completes.
+    //
+    // Cached (non-Future in the cache):
+    //    no interesting race: an invalidation ensures that any future load will miss
+    //    and fetch a new value
+    //
+    // NOTE: we don't need to perform this dance for cache keys which embed a version
+    // number, because invalidation is not handled by removing cache entries, but
+    // rather by bumping top-level version numbers.
     Stopwatch sw = new Stopwatch().start();
+    boolean hit = false;
+    boolean isPiggybacked = false;
     try {
-      return (ValueType)cache_.get(key, new Callable<ValueType>() {
-        @Override
-        public ValueType call() throws Exception {
-          hit.setRef(false);
-          return loadCallable.call();
-        }
-      });
+      CompletableFuture<Object> f = new CompletableFuture<Object>();
+      // NOTE: the Cache ensures that this is an atomic operation of either returning
+      // an existing value or inserting our own. Only one thread can think it is the
+      // "loader" at a time.
+      Object inCache = cache_.get(key, () -> f);
+      if (!(inCache instanceof Future)) {
+        hit = true;
+        return (ValueType)inCache;
+      }
+
+      if (inCache != f) {
+        isPiggybacked = true;
+        Future<ValueType> existing = (Future<ValueType>)inCache;
+        ValueType ret = Uninterruptibles.getUninterruptibly(existing);
+        piggybackSuccessCountForTests.incrementAndGet();
+        return ret;
+      }
+
+      // No other thread was loading this value, so we need to fetch it ourselves.
+      try {
+        f.complete(loadCallable.call());
+        // Assuming we were able to load the value, store it back into the map
+        // as a plain-old object. This is important to get the proper weight in the
+        // map. If someone invalidated this load concurrently, this 'replace' will
+        // fail because 'f' will not be the current value.
+        cache_.asMap().replace(key, f, f.get());
+      } catch (Exception e) {
+        // If there was an exception, remove it from the map so that any later loads
+        // retry.
+        cache_.asMap().remove(key, f);
+        // Ensure any piggy-backed loaders get the exception. 'f.get()' below will
+        // throw to this caller.
+        f.completeExceptionally(e);
+      }
+      return (ValueType) Uninterruptibles.getUninterruptibly(f);
     } catch (ExecutionException | UncheckedExecutionException e) {
+      if (isPiggybacked) {
+        piggybackExceptionCountForTests.incrementAndGet();
+      }
+
       Throwables.propagateIfPossible(e.getCause(), TException.class);
+      // Since the loading code should only throw TException, we shouldn't get
+      // any other exceptions here. If for some reason we do, just rethrow as RTE.
       throw new RuntimeException(e);
     } finally {
       sw.stop();
-      addStatsToProfile(statsCategory, /*numHits=*/hit.getRef() ? 1 : 0,
-          /*numMisses=*/hit.getRef() ? 0 : 1, sw);
-      LOG.trace("Request for {}: {}", itemString, hit.getRef() ? "hit" : "miss");
+      addStatsToProfile(statsCategory, /*numHits=*/hit ? 1 : 0,
+          /*numMisses=*/hit ? 0 : 1, sw);
+      LOG.trace("Request for {}: {}{}", itemString, isPiggybacked ? "piggy-backed " : "",
+          hit ? "hit" : "miss");
     }
   }
 
@@ -431,7 +534,7 @@ public class CatalogdMetaProvider implements MetaProvider {
     if (profile == null) return;
     final String prefix = CATALOG_FETCH_PREFIX + "." +
         Preconditions.checkNotNull(statsCategory) + ".";
-    profile.addToCounter(prefix + "Requests", TUnit.NONE, numHits + numMisses);;
+    profile.addToCounter(prefix + "Requests", TUnit.NONE, numHits + numMisses);
     profile.addToCounter(prefix + "Time", TUnit.TIME_MS,
         stopwatch.elapsed(TimeUnit.MILLISECONDS));
     if (numHits > 0) {
@@ -581,7 +684,7 @@ public class CatalogdMetaProvider implements MetaProvider {
     List<String> missingCols = Lists.newArrayListWithCapacity(colNames.size());
     for (String colName: colNames) {
       ColStatsCacheKey cacheKey = new ColStatsCacheKey((TableMetaRefImpl)table, colName);
-      ColumnStatisticsObj val = (ColumnStatisticsObj)cache_.getIfPresent(cacheKey);
+      ColumnStatisticsObj val = (ColumnStatisticsObj) getIfPresent(cacheKey);
       if (val == null) {
         missingCols.add(colName);
       } else if (val == NEGATIVE_COLUMN_STATS_SENTINEL) {
@@ -622,6 +725,19 @@ public class CatalogdMetaProvider implements MetaProvider {
     return ret;
   }
 
+  @SuppressWarnings("unchecked")
+  private Object getIfPresent(Object cacheKey) throws TException {
+    Object existing = cache_.getIfPresent(cacheKey);
+    if (existing == null) return null;
+    if (!(existing instanceof Future)) return existing;
+    try {
+      return ((Future<Object>)existing).get();
+    } catch (InterruptedException | ExecutionException e) {
+      Throwables.propagateIfPossible(e, TException.class);
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override
   public List<PartitionRef> loadPartitionList(final TableMetaRef table)
       throws TException {
@@ -766,14 +882,14 @@ public class CatalogdMetaProvider implements MetaProvider {
    */
   private Map<PartitionRef, PartitionMetadata> loadPartitionsFromCache(
       TableMetaRefImpl table, ListMap<TNetworkAddress> hostIndex,
-      List<PartitionRef> partitionRefs) {
+      List<PartitionRef> partitionRefs) throws TException {
 
     Map<PartitionRef, PartitionMetadata> ret = Maps.newHashMapWithExpectedSize(
         partitionRefs.size());
     for (PartitionRef ref: partitionRefs) {
       PartitionRefImpl prefImpl = (PartitionRefImpl)ref;
       PartitionCacheKey cacheKey = new PartitionCacheKey(table, prefImpl.getId());
-      PartitionMetadataImpl val = (PartitionMetadataImpl)cache_.getIfPresent(cacheKey);
+      PartitionMetadataImpl val = (PartitionMetadataImpl)getIfPresent(cacheKey);
       if (val == null) continue;
 
       // The entry in the cache has file descriptors that are relative to the cache's
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
index de6dd07..60db091 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -19,9 +19,15 @@ package org.apache.impala.catalog.local;
 
 import static org.junit.Assert.*;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -45,6 +51,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.cache.CacheStats;
 import com.google.common.collect.ImmutableList;
 
@@ -241,4 +248,75 @@ public class CatalogdMetaProviderTest {
         prof.counters.get(1).toString());
     assertEquals("CatalogFetch.Tables.Time", prof.counters.get(2).name);
   }
+
+  @Test
+  public void testPiggybackSuccess() throws Exception {
+    doTestPiggyback(/*success=*/true);
+  }
+
+  @Test
+  public void testPiggybackFailure() throws Exception {
+    doTestPiggyback(/*success=*/false);
+  }
+
+  private void doTestPiggyback(boolean testSuccessCase) throws Exception {
+    // To test success, we load an existing table. Otherwise, load one that doesn't
+    // exist, which will throw an exception.
+    final String tableName = testSuccessCase ? "alltypes" : "table-does-not-exist";
+    final AtomicInteger counterToWatch = testSuccessCase ?
+        provider_.piggybackSuccessCountForTests :
+        provider_.piggybackExceptionCountForTests;
+
+    final int kNumThreads = 8;
+    ExecutorService exec = Executors.newFixedThreadPool(kNumThreads);
+    try {
+      // Run for at least 60 seconds to try to provoke the desired behavior.
+      Stopwatch sw = new Stopwatch().start();
+      while (sw.elapsed(TimeUnit.SECONDS) < 60) {
+        // Submit a wave of parallel tasks which all fetch the same table, concurently.
+        // One of these should win whereas the others are likely to piggy-back on the
+        // same request.
+        List<Future<Object>> futures = new ArrayList<>();
+        for (int i = 0; i < kNumThreads; i++) {
+          futures.add(exec.submit(() -> provider_.loadTable("functional", tableName)));
+        }
+        for (Future<Object> f : futures) {
+          try {
+            assertNotNull(f.get());
+            if (!testSuccessCase) fail("Did not get expected exception");
+          } catch (Exception e) {
+            // If we expected success, but got an exception, we should rethrow it.
+            if (testSuccessCase) throw e;
+          }
+        }
+        if (counterToWatch.get() > 20) {
+          return;
+        }
+
+        TCatalogObject obj = new TCatalogObject(TCatalogObjectType.TABLE, 0);
+        obj.setTable(new TTable("functional", tableName));
+        provider_.invalidateCacheForObject(obj);
+      }
+      fail("Did not see enough piggybacked loads!");
+    } finally {
+      exec.shutdown();
+      assertTrue(exec.awaitTermination(60, TimeUnit.SECONDS));
+    }
+
+    // Check that, in the success case, the table was left in the cache.
+    // In the failure case, we should not have any "failed" entry persisting.
+    diffStats();
+    try {
+      provider_.loadTable("functonal", tableName);
+    } catch (Exception e) {}
+    CacheStats stats = diffStats();
+    if (testSuccessCase) {
+      assertEquals(1, stats.hitCount());
+      assertEquals(0, stats.missCount());
+    } else {
+      assertEquals(0, stats.hitCount());
+      assertEquals(1, stats.missCount());
+    }
+  }
+
 }
\ No newline at end of file
diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py
index 6173bde..65e93e0 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -23,6 +23,8 @@ import random
 import threading
 import time
 
+from multiprocessing.pool import ThreadPool
+
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 
 RETRY_PROFILE_MSG = 'Retried query planning due to inconsistent metadata'
@@ -313,9 +315,44 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
       client1.close()
       client2.close()
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--use_local_catalog=true --inject_latency_after_catalog_fetch_ms=50",
+      catalogd_args="--catalog_topic_mode=minimal",
+      cluster_size=1)
+  def test_invalidation_races(self, unique_database):
+    """
+    Regression test for IMPALA-7534: races where invalidation of the table list
+    could be skipped, causing spurious "table not found" errors.
+    """
+    test_self = self
 
-class TestObservability(CustomClusterTestSuite):
+    class ThreadLocalClient(threading.local):
+      def __init__(self):
+        self.c = test_self.create_impala_client()
 
+    t = ThreadPool(processes=8)
+    tls = ThreadLocalClient()
+
+    def do_table(i):
+      for q in [
+        "create table {db}.t{i} (i int)",
+        "describe {db}.t{i}",
+        "drop table {db}.t{i}",
+        "create database {db}_{i}",
+        "show tables in {db}_{i}",
+        "drop database {db}_{i}"]:
+        self.execute_query_expect_success(tls.c, q.format(
+            db=unique_database, i=i))
+
+    # Prior to fixing IMPALA-7534, this test would fail within 20-30 iterations,
+    # so 100 should be quite reliable as a regression test.
+    NUM_ITERS = 100
+    for i in t.imap_unordered(do_table, xrange(NUM_ITERS)):
+      pass
+
+
+class TestObservability(CustomClusterTestSuite):
   def get_catalog_cache_metrics(self, impalad):
     """ Returns catalog cache metrics as a dict by scraping the json metrics page on the
     given impalad"""


[impala] 03/04: IMPALA-7802: Close connections of idle client sessions

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 67cd55e0445d037d834ecd120d08b0948b064491
Author: Michael Ho <kw...@cloudera.com>
AuthorDate: Thu Jun 6 13:32:03 2019 -0700

    IMPALA-7802: Close connections of idle client sessions
    
    Previously, if idle session timeout is set either via
    startup flag or query options, a client session will expire
    after that set period of inactivity. However, the network
    connection and the service thread of an expired session will
    still be around until the session is closed by the client.
    This is highly undesirable as these idle sessions still count
    towards the quota bound by --fe_esrvice_threads, so if the
    total number of sessions (including the idle ones) reaches
    that upper bound, all incoming new session will block until
    some of the existing sessions exit. There is no time bound on
    when those expired sessions will be closed. In some sense,
    leaving many idle sessions opened is a denial-of-service attack
    on Impala.
    
    This change implements support for closing expired client sessions.
    In particular, a new flag --idle_client_poll_time_s is added to
    specify a time interval in seconds of client's inactivity which
    will cause an idle service thread of a client connection to wake up
    and check if all sessions associated with the connection are idle.
    If so, the connection will be closed. This allows the service threads
    to be freed up without waiting for client to close the connections.
    
    Testing done:
    - core build
    - new targeted test which verifies the connections of expired sessions
    are closed.
    - verified the flags function as expected in a secure cluster with Kerberos + SSL
    
    Change-Id: I97c4fb8e1b741add273f8a913fb0967303683e38
    Reviewed-on: http://gerrit.cloudera.org:8080/13607
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/TAcceptQueueServer.cpp               |  63 ++++++++++++-
 be/src/rpc/TAcceptQueueServer.h                 |   8 +-
 be/src/rpc/thrift-server.cc                     |  71 +++------------
 be/src/rpc/thrift-server.h                      | 115 +++++++++++++++++++++---
 be/src/rpc/thrift-util.cc                       |  19 ++--
 be/src/rpc/thrift-util.h                        |   7 +-
 be/src/runtime/client-cache.h                   |   4 +-
 be/src/service/impala-server.cc                 |  57 +++++++++++-
 be/src/service/impala-server.h                  |   5 ++
 tests/custom_cluster/test_hs2.py                |   4 +-
 tests/custom_cluster/test_session_expiration.py |  70 +++++++++++++--
 11 files changed, 329 insertions(+), 94 deletions(-)

diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 2a77662..09d7954 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -23,8 +23,11 @@
 #include "rpc/TAcceptQueueServer.h"
 
 #include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/transport/TSocket.h>
 
 #include "util/metrics.h"
+#include "rpc/thrift-util.h"
+#include "rpc/thrift-server.h"
 #include "util/thread-pool.h"
 
 DEFINE_int32(accepted_cnxn_queue_depth, 10000,
@@ -72,8 +75,10 @@ class TAcceptQueueServer::Task : public Runnable {
         if (eventHandler != nullptr) {
           eventHandler->processContext(connectionContext, transport_);
         }
-        if (!processor_->process(input_, output_, connectionContext)
-            || !input_->getTransport()->peek()) {
+        // Setting a socket timeout for process() may lead to false positive
+        // and prematurely closes a slow client's connection.
+        if (!processor_->process(input_, output_, connectionContext) ||
+            !Peek(input_, connectionContext, eventHandler)) {
           break;
         }
       }
@@ -114,6 +119,56 @@ class TAcceptQueueServer::Task : public Runnable {
   }
 
  private:
+
+  // This function blocks until some bytes show up from the client.
+  // Returns true if some bytes are available from client;
+  // Returns false upon reading EOF, in which case the connection
+  // will be closed by the caller.
+  //
+  // If idle_poll_period_ms_ is not 0, this function will block up
+  // to idle_poll_period_ms_ milliseconds before waking up to check
+  // if the sessions associated with the connection have all expired
+  // due to inactivity. If so, it will return false and the connection
+  // will be closed by the caller.
+  bool Peek(shared_ptr<TProtocol> input, void* connectionContext,
+      boost::shared_ptr<TServerEventHandler> eventHandler) {
+    // Set a timeout on input socket if idle_poll_period_ms_ is non-zero.
+    TSocket* socket = static_cast<TSocket*>(transport_.get());
+    if (server_.idle_poll_period_ms_ > 0) {
+      socket->setRecvTimeout(server_.idle_poll_period_ms_);
+    }
+
+    // Block until some bytes show up or EOF or timeout.
+    bool bytes_pending = true;
+    for (;;) {
+      try {
+        bytes_pending = input_->getTransport()->peek();
+        break;
+      } catch (const TTransportException& ttx) {
+        // Implementaion of the underlying transport's peek() may call either
+        // read() or peek() of the socket.
+        if (eventHandler != nullptr && server_.idle_poll_period_ms_ > 0 &&
+            (IsReadTimeoutTException(ttx) || IsPeekTimeoutTException(ttx))) {
+          ThriftServer::ThriftServerEventProcessor* thriftServerHandler =
+              static_cast<ThriftServer::ThriftServerEventProcessor*>(eventHandler.get());
+          if (thriftServerHandler->IsIdleContext(connectionContext)) {
+            const string& client = socket->getSocketInfo();
+            GlobalOutput.printf(
+               "TAcceptQueueServer closing connection to idle client %s", client.c_str());
+            bytes_pending = false;
+            break;
+          }
+        } else {
+          // Rethrow the exception to be handled by callers.
+          throw;
+        }
+      }
+    }
+    // Unset the socket timeout.
+    if (server_.idle_poll_period_ms_ > 0) socket->setRecvTimeout(0);
+    return bytes_pending;
+  }
+
   TAcceptQueueServer& server_;
   friend class TAcceptQueueServer;
 
@@ -128,10 +183,10 @@ TAcceptQueueServer::TAcceptQueueServer(const boost::shared_ptr<TProcessor>& proc
     const boost::shared_ptr<TTransportFactory>& transportFactory,
     const boost::shared_ptr<TProtocolFactory>& protocolFactory,
     const boost::shared_ptr<ThreadFactory>& threadFactory, const string& name,
-    int32_t maxTasks, int64_t timeout_ms)
+    int32_t maxTasks, int64_t queue_timeout_ms, int64_t idle_poll_period_ms)
     : TServer(processor, serverTransport, transportFactory, protocolFactory),
       threadFactory_(threadFactory), name_(name), maxTasks_(maxTasks),
-      queue_timeout_ms_(timeout_ms) {
+      queue_timeout_ms_(queue_timeout_ms), idle_poll_period_ms_(idle_poll_period_ms) {
   init();
 }
 
diff --git a/be/src/rpc/TAcceptQueueServer.h b/be/src/rpc/TAcceptQueueServer.h
index 8f16add..08a7244 100644
--- a/be/src/rpc/TAcceptQueueServer.h
+++ b/be/src/rpc/TAcceptQueueServer.h
@@ -62,7 +62,8 @@ class TAcceptQueueServer : public TServer {
       const boost::shared_ptr<TTransportFactory>& transportFactory,
       const boost::shared_ptr<TProtocolFactory>& protocolFactory,
       const boost::shared_ptr<ThreadFactory>& threadFactory,
-      const std::string& name, int32_t maxTasks = 0, int64_t timeout_ms = 0);
+      const std::string& name, int32_t maxTasks = 0,
+      int64_t queue_timeout_ms = 0, int64_t idle_poll_period_ms = 0);
 
   ~TAcceptQueueServer() override = default;
 
@@ -116,6 +117,11 @@ class TAcceptQueueServer : public TServer {
   /// Amount of time in milliseconds after which a connection request will be timed out.
   /// Default value is 0, which means no timeout.
   int64_t queue_timeout_ms_;
+
+  /// Amount of time, in milliseconds, of client's inactivity before the service thread
+  /// wakes up to check if the connection should be closed due to inactivity. If 0, no
+  /// polling happens.
+  int64_t idle_poll_period_ms_;
 };
 
 } // namespace server
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index a859c9a..c1ce270 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -106,60 +106,6 @@ bool SSLProtoVersions::IsSupported(const SSLProtocol& protocol) {
   }
 }
 
-// Helper class that starts a server in a separate thread, and handles
-// the inter-thread communication to monitor whether it started
-// correctly.
-class ThriftServer::ThriftServerEventProcessor : public TServerEventHandler {
- public:
-  ThriftServerEventProcessor(ThriftServer* thrift_server)
-      : thrift_server_(thrift_server),
-        signal_fired_(false) { }
-
-  // Called by the Thrift server implementation when it has acquired its resources and is
-  // ready to serve, and signals to StartAndWaitForServer that start-up is finished. From
-  // TServerEventHandler.
-  virtual void preServe();
-
-  // Called when a client connects; we create per-client state and call any
-  // ConnectionHandlerIf handler.
-  virtual void* createContext(boost::shared_ptr<TProtocol> input,
-      boost::shared_ptr<TProtocol> output);
-
-  // Called when a client starts an RPC; we set the thread-local connection context.
-  virtual void processContext(void* context, boost::shared_ptr<TTransport> output);
-
-  // Called when a client disconnects; we call any ConnectionHandlerIf handler.
-  virtual void deleteContext(void* serverContext, boost::shared_ptr<TProtocol> input,
-      boost::shared_ptr<TProtocol> output);
-
-  // Waits for a timeout of TIMEOUT_MS for a server to signal that it has started
-  // correctly.
-  Status StartAndWaitForServer();
-
- private:
-  // Lock used to ensure that there are no missed notifications between starting the
-  // supervision thread and calling signal_cond_.WaitUntil. Also used to ensure
-  // thread-safe access to members of thrift_server_
-  boost::mutex signal_lock_;
-
-  // Condition variable that is notified by the supervision thread once either
-  // a) all is well or b) an error occurred.
-  ConditionVariable signal_cond_;
-
-  // The ThriftServer under management. This class is a friend of ThriftServer, and
-  // reaches in to change member variables at will.
-  ThriftServer* thrift_server_;
-
-  // Guards against spurious condition variable wakeups
-  bool signal_fired_;
-
-  // The time, in milliseconds, to wait for a server to come up
-  static const int TIMEOUT_MS = 2500;
-
-  // Called in a separate thread
-  void Supervise();
-};
-
 Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer() {
   // Locking here protects against missed notifications if Supervise executes quickly
   unique_lock<mutex> lock(signal_lock_);
@@ -310,9 +256,17 @@ void ThriftServer::ThriftServerEventProcessor::processContext(void* context,
   __connection_context__ = reinterpret_cast<ConnectionContext*>(context);
 }
 
-void ThriftServer::ThriftServerEventProcessor::deleteContext(void* serverContext,
+bool ThriftServer::ThriftServerEventProcessor::IsIdleContext(void* context) {
+  __connection_context__ = reinterpret_cast<ConnectionContext*>(context);
+  if (thrift_server_->connection_handler_ != nullptr) {
+    return thrift_server_->connection_handler_->IsIdleConnection(*__connection_context__);
+  }
+  return false;
+}
+
+void ThriftServer::ThriftServerEventProcessor::deleteContext(void* context,
     boost::shared_ptr<TProtocol> input, boost::shared_ptr<TProtocol> output) {
-  __connection_context__ = (ConnectionContext*) serverContext;
+  __connection_context__ = reinterpret_cast<ConnectionContext*>(context);
 
   if (thrift_server_->connection_handler_ != NULL) {
     thrift_server_->connection_handler_->ConnectionEnd(*__connection_context__);
@@ -331,12 +285,13 @@ void ThriftServer::ThriftServerEventProcessor::deleteContext(void* serverContext
 ThriftServer::ThriftServer(const string& name,
     const boost::shared_ptr<TProcessor>& processor, int port, AuthProvider* auth_provider,
     MetricGroup* metrics, int max_concurrent_connections, int64_t queue_timeout_ms,
-    TransportType transport_type)
+    int64_t idle_poll_period_ms, TransportType transport_type)
   : started_(false),
     port_(port),
     ssl_enabled_(false),
     max_concurrent_connections_(max_concurrent_connections),
     queue_timeout_ms_(queue_timeout_ms),
+    idle_poll_period_ms_(idle_poll_period_ms),
     name_(name),
     metrics_name_(Substitute("impala.thrift-server.$0", name_)),
     server_(NULL),
@@ -497,7 +452,7 @@ Status ThriftServer::Start() {
 
   server_.reset(new TAcceptQueueServer(processor_, server_socket, transport_factory,
       protocol_factory, thread_factory, name_, max_concurrent_connections_,
-      queue_timeout_ms_));
+      queue_timeout_ms_, idle_poll_period_ms_));
   if (metrics_ != NULL) {
     (static_cast<TAcceptQueueServer*>(server_.get()))
         ->InitMetrics(metrics_, metrics_name_);
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index f208a10..bf9a78b 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -32,6 +32,17 @@
 #include "util/metrics-fwd.h"
 #include "util/thread.h"
 
+namespace apache {
+namespace thrift {
+namespace protocol {
+class TProtocol;
+}
+namespace server {
+class TAcceptQueueServer;
+}
+}
+}
+
 namespace impala {
 
 class AuthProvider;
@@ -99,6 +110,11 @@ class ThriftServer {
     /// valid and clients must not refer to it again.
     virtual void ConnectionEnd(const ConnectionContext& connection_context) = 0;
 
+    /// Returns true if the connection is considered idle. A connection is considered
+    /// idle if all the sessions associated with it have expired due to idle timeout.
+    /// Called when a client has been inactive for --idle_client_poll_period_s seconds.
+    virtual bool IsIdleConnection(const ConnectionContext& connection_context) = 0;
+
     virtual ~ConnectionHandlerIf() = default;
   };
 
@@ -147,6 +163,74 @@ class ThriftServer {
 
  private:
   friend class ThriftServerBuilder;
+  friend class apache::thrift::server::TAcceptQueueServer;
+
+  /// Helper class which monitors starting servers. Needs access to internal members, and
+  /// is not used outside of this class.
+  friend class ThriftServerEventProcessor;
+
+  /// Helper class that starts a server in a separate thread, and handles
+  /// the inter-thread communication to monitor whether it started
+  /// correctly.
+  class ThriftServerEventProcessor : public apache::thrift::server::TServerEventHandler {
+   public:
+    ThriftServerEventProcessor(ThriftServer* thrift_server)
+      : thrift_server_(thrift_server),
+        signal_fired_(false) { }
+
+    /// Called by the Thrift server implementation when it has acquired its resources and
+    /// is ready to serve, and signals to StartAndWaitForServer that start-up is finished.
+    /// From TServerEventHandler.
+    virtual void preServe();
+
+    /// Called when a client connects; we create per-client state and call any
+    /// ConnectionHandlerIf handler.
+    virtual void* createContext(
+        boost::shared_ptr<apache::thrift::protocol::TProtocol> input,
+        boost::shared_ptr<apache::thrift::protocol::TProtocol> output);
+
+    /// Called when a client starts an RPC; we set the thread-local connection context.
+    virtual void processContext(void* context,
+        boost::shared_ptr<apache::thrift::transport::TTransport> output);
+
+    /// Called when a client disconnects; we call any ConnectionHandlerIf handler.
+    virtual void deleteContext(void* context,
+        boost::shared_ptr<apache::thrift::protocol::TProtocol> input,
+        boost::shared_ptr<apache::thrift::protocol::TProtocol> output);
+
+    /// Returns true if a client's connection is idle. A client's connection is idle iff
+    /// all the sessions associated with it have expired due to idle timeout. Called from
+    /// TAcceptQueueServer::Task::run() after clients have been inactive for
+    /// --idle_client_poll_period_s seconds.
+    bool IsIdleContext(void* context);
+
+    /// Waits for a timeout of TIMEOUT_MS for a server to signal that it has started
+    /// correctly.
+    Status StartAndWaitForServer();
+
+   private:
+    /// Lock used to ensure that there are no missed notifications between starting the
+    /// supervision thread and calling signal_cond_.WaitUntil. Also used to ensure
+    /// thread-safe access to members of thrift_server_
+    boost::mutex signal_lock_;
+
+    /// Condition variable that is notified by the supervision thread once either
+    /// a) all is well or b) an error occurred.
+    ConditionVariable signal_cond_;
+
+    /// The ThriftServer under management. This class is a friend of ThriftServer, and
+    /// reaches in to change member variables at will.
+    ThriftServer* thrift_server_;
+
+    /// Guards against spurious condition variable wakeups
+    bool signal_fired_;
+
+    /// The time, in milliseconds, to wait for a server to come up
+    static const int TIMEOUT_MS = 2500;
+
+    /// Called in a separate thread
+    void Supervise();
+  };
 
   /// Creates, but does not start, a new server on the specified port
   /// that exports the supplied interface.
@@ -158,13 +242,17 @@ class ThriftServer {
   ///  - metrics: if not nullptr, the server will register metrics on this object
   ///  - max_concurrent_connections: The maximum number of concurrent connections allowed.
   ///    If 0, there will be no enforced limit on the number of concurrent connections.
-  ///  - amount of time in milliseconds an accepted client connection will be held in
-  ///    the accepted queue, after which the request will be rejected if a server
-  ///    thread can't be found. If 0, no timeout is enforced.
+  ///  - queue_timeout_ms: amount of time in milliseconds an accepted client connection
+  ///    will be held in the accepted queue, after which the request will be rejected if
+  ///    a service thread can't be found. If 0, no timeout is enforced.
+  ///  - idle_poll_period_ms: Amount of time, in milliseconds, of client's inactivity
+  ///    before the service thread wakes up to check if the connection should be closed
+  ///    due to inactivity. If 0, no polling happens.
   ThriftServer(const std::string& name,
       const boost::shared_ptr<apache::thrift::TProcessor>& processor, int port,
       AuthProvider* auth_provider = nullptr, MetricGroup* metrics = nullptr,
       int max_concurrent_connections = 0, int64_t queue_timeout_ms = 0,
+      int64_t idle_poll_period_ms = 0,
       TransportType server_transport = TransportType::BINARY);
 
   /// Enables secure access over SSL. Must be called before Start(). The first three
@@ -218,6 +306,11 @@ class ThriftServer {
   /// Used in TAcceptQueueServer.
   int64_t queue_timeout_ms_;
 
+  /// Amount of time, in milliseconds, of client's inactivity before the service thread
+  /// wakes up to check if the connection should be closed due to inactivity. If 0, no
+  /// polling happens.
+  int64_t idle_poll_period_ms_;
+
   /// User-specified identifier that shows up in logs
   const std::string name_;
 
@@ -263,11 +356,6 @@ class ThriftServer {
 
   /// Underlying transport type used by this thrift server.
   TransportType transport_type_;
-
-  /// Helper class which monitors starting servers. Needs access to internal members, and
-  /// is not used outside of this class.
-  class ThriftServerEventProcessor;
-  friend class ThriftServerEventProcessor;
 };
 
 /// Helper class to build new ThriftServer instances.
@@ -297,11 +385,16 @@ class ThriftServerBuilder {
     return *this;
   }
 
-  ThriftServerBuilder& queue_timeout(int64_t timeout_ms) {
+  ThriftServerBuilder& queue_timeout_ms(int64_t timeout_ms) {
     queue_timeout_ms_ = timeout_ms;
     return *this;
   }
 
+  ThriftServerBuilder& idle_poll_period_ms(int64_t timeout_ms) {
+    idle_poll_period_ms_ = timeout_ms;
+    return *this;
+  }
+
   /// Enables SSL for this server.
   ThriftServerBuilder& ssl(
       const std::string& certificate, const std::string& private_key) {
@@ -344,7 +437,8 @@ class ThriftServerBuilder {
   Status Build(ThriftServer** server) {
     std::unique_ptr<ThriftServer> ptr(
         new ThriftServer(name_, processor_, port_, auth_provider_, metrics_,
-            max_concurrent_connections_, queue_timeout_ms_, server_transport_type_));
+            max_concurrent_connections_, queue_timeout_ms_, idle_poll_period_ms_,
+            server_transport_type_));
     if (enable_ssl_) {
       RETURN_IF_ERROR(ptr->EnableSsl(
           version_, certificate_, private_key_, pem_password_cmd_, ciphers_));
@@ -355,6 +449,7 @@ class ThriftServerBuilder {
 
  private:
   int64_t queue_timeout_ms_ = 0;
+  int64_t idle_poll_period_ms_ = 0;
   int max_concurrent_connections_ = 0;
   std::string name_;
   boost::shared_ptr<apache::thrift::TProcessor> processor_;
diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc
index 2c3ebc4..34384a0 100644
--- a/be/src/rpc/thrift-util.cc
+++ b/be/src/rpc/thrift-util.cc
@@ -58,9 +58,10 @@ using namespace apache::thrift::server;
 using namespace apache::thrift::protocol;
 using namespace apache::thrift::concurrency;
 
-// IsRecvTimeoutTException() and IsConnResetTException() make assumption about the
-// implementation of read(), write() and write_partial() in TSocket.cpp and those
-// functions may change between different versions of Thrift.
+// IsReadTimeoutTException(), IsPeekTimeoutTException() and IsConnResetTException() make
+// assumption about the implementation of read(), peek(), write() and write_partial() in
+// TSocket.cpp and TSSLSocket.cpp. Those functions may change between different versions
+// of Thrift.
 static_assert(PACKAGE_VERSION[0] == '0', "");
 static_assert(PACKAGE_VERSION[1] == '.', "");
 static_assert(PACKAGE_VERSION[2] == '9', "");
@@ -158,14 +159,22 @@ bool TNetworkAddressComparator(const TNetworkAddress& a, const TNetworkAddress&
   return false;
 }
 
-bool IsRecvTimeoutTException(const TTransportException& e) {
-  // String taken from TSocket::read() Thrift's TSocket.cpp.
+bool IsReadTimeoutTException(const TTransportException& e) {
+  // String taken from TSocket::read() Thrift's TSocket.cpp and TSSLSocket.cpp.
   return (e.getType() == TTransportException::TIMED_OUT &&
              strstr(e.what(), "EAGAIN (timed out)") != nullptr) ||
          (e.getType() == TTransportException::INTERNAL_ERROR &&
              strstr(e.what(), "SSL_read: Resource temporarily unavailable") != nullptr);
 }
 
+bool IsPeekTimeoutTException(const TTransportException& e) {
+  // String taken from TSocket::peek() Thrift's TSocket.cpp and TSSLSocket.cpp.
+  return (e.getType() == TTransportException::UNKNOWN &&
+             strstr(e.what(), "recv(): Resource temporarily unavailable") != nullptr) ||
+         (e.getType() == TTransportException::INTERNAL_ERROR &&
+             strstr(e.what(), "SSL_peek: Resource temporarily unavailable") != nullptr);
+}
+
 bool IsConnResetTException(const TTransportException& e) {
   // Strings taken from TTransport::readAll(). This happens iff TSocket::read() returns 0.
   // As readAll() is reading non-zero length payload, this can only mean recv() called
diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h
index bd15491..05e7e55 100644
--- a/be/src/rpc/thrift-util.h
+++ b/be/src/rpc/thrift-util.h
@@ -145,8 +145,11 @@ void PrintTColumnValue(std::ostream& out, const TColumnValue& colval);
 /// string representation
 bool TNetworkAddressComparator(const TNetworkAddress& a, const TNetworkAddress& b);
 
-/// Returns true if the TTransportException corresponds to a TCP socket recv timeout.
-bool IsRecvTimeoutTException(const apache::thrift::transport::TTransportException& e);
+/// Returns true if the TTransportException corresponds to a TCP socket read timeout.
+bool IsReadTimeoutTException(const apache::thrift::transport::TTransportException& e);
+
+/// Returns true if the TTransportException corresponds to a TCP socket peek timeout.
+bool IsPeekTimeoutTException(const apache::thrift::transport::TTransportException& e);
 
 /// Returns true if the exception indicates the other end of the TCP socket was closed.
 bool IsConnResetTException(const apache::thrift::transport::TTransportException& e);
diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index 0b164f1..c06b979 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -241,7 +241,7 @@ class ClientConnection {
     try {
       (client_->*f)(*response, request, &send_done);
     } catch (const apache::thrift::transport::TTransportException& e) {
-      if (send_done && IsRecvTimeoutTException(e)) {
+      if (send_done && IsReadTimeoutTException(e)) {
         return RecvTimeoutStatus(typeid(*response).name());
       }
 
@@ -310,7 +310,7 @@ class ClientConnection {
     try {
       (client_->*recv_func)(*response);
     } catch (const apache::thrift::transport::TTransportException& e) {
-      if (IsRecvTimeoutTException(e)) {
+      if (IsReadTimeoutTException(e)) {
         return RecvTimeoutStatus(typeid(*response).name());
       }
       // If it's not timeout exception, then the connection is broken, stop retrying.
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index bf040f9..d7ee0df 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -30,6 +30,7 @@
 #include <boost/lexical_cast.hpp>
 #include <gperftools/malloc_extension.h>
 #include <gutil/strings/substitute.h>
+#include <gutil/walltime.h>
 #include <openssl/evp.h>
 #include <openssl/err.h>
 #include <rapidjson/rapidjson.h>
@@ -217,7 +218,12 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be i
 DEFINE_int32(disconnected_session_timeout, 15 * 60, "The time, in seconds, that a "
     "hiveserver2 session will be maintained after the last connection that it has been "
     "used over is disconnected.");
-
+DEFINE_int32(idle_client_poll_period_s, 30, "The poll period, in seconds, after "
+    "no activity from an Impala client which an Impala service thread (beeswax and HS2) "
+    "wakes up to check if the connection should be closed. If --idle_session_timeout is "
+    "also set, a client connection will be closed if all the sessions associated with it "
+    "have become idle. Set this to 0 to disable the polling behavior and clients' "
+    "connection will remain opened until they are explicitly closed.");
 DEFINE_int32(status_report_interval_ms, 5000, "(Advanced) Interval between profile "
     "reports in milliseconds. If set to <= 0, periodic reporting is disabled and only "
     "the final report is sent.");
@@ -2058,6 +2064,10 @@ void ImpalaServer::ConnectionEnd(
     // Not every connection must have an associated session
     if (it == connection_to_sessions_map_.end()) return;
 
+    // Sessions are not removed from the map even after they are closed and an entry
+    // won't be added to the map unless a session is established.
+    DCHECK(!it->second.empty());
+
     // We don't expect a large number of sessions per connection, so we copy it, so that
     // we can drop the map lock early.
     disconnected_sessions = std::move(it->second);
@@ -2099,6 +2109,42 @@ void ImpalaServer::ConnectionEnd(
   }
 }
 
+bool ImpalaServer::IsIdleConnection(
+    const ThriftServer::ConnectionContext& connection_context) {
+  // The set of sessions associated with this connection.
+  std::set<TUniqueId> session_ids;
+  {
+    TUniqueId connection_id = connection_context.connection_id;
+    unique_lock<mutex> l(connection_to_sessions_map_lock_);
+    ConnectionToSessionMap::iterator it = connection_to_sessions_map_.find(connection_id);
+
+    // Not every connection must have an associated session
+    if (it == connection_to_sessions_map_.end()) return false;
+
+    session_ids = it->second;
+
+    // Sessions are not removed from the map even after they are closed and an entry
+    // won't be added to the map unless a session is established. The code below relies
+    // on this invariant to not mark a connection with no session yet as idle.
+    DCHECK(!session_ids.empty());
+  }
+
+  // Check if all the sessions associated with the connection are idle.
+  {
+    lock_guard<mutex> map_lock(session_state_map_lock_);
+    for (const TUniqueId& session_id : session_ids) {
+      const auto it = session_state_map_.find(session_id);
+      if (it == session_state_map_.end()) continue;
+
+      // If any session associated with this connection is not idle,
+      // the connection is not idle.
+      lock_guard<mutex> state_lock(it->second->lock);
+      if (!it->second->expired) return false;
+    }
+  }
+  return true;
+}
+
 void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
   if (session_timeout <= 0) return;
   {
@@ -2468,7 +2514,8 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t
           builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
           .metrics(exec_env_->metrics())
           .max_concurrent_connections(FLAGS_fe_service_threads)
-          .queue_timeout(FLAGS_accepted_client_cnxn_timeout)
+          .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
+          .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
           .Build(&server));
       beeswax_server_.reset(server);
       beeswax_server_->SetConnectionHandler(this);
@@ -2496,7 +2543,8 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t
           builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
           .metrics(exec_env_->metrics())
           .max_concurrent_connections(FLAGS_fe_service_threads)
-          .queue_timeout(FLAGS_accepted_client_cnxn_timeout)
+          .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
+          .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
           .Build(&server));
       hs2_server_.reset(server);
       hs2_server_->SetConnectionHandler(this);
@@ -2529,7 +2577,8 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t
               .transport_type(ThriftServer::TransportType::HTTP)
               .metrics(exec_env_->metrics())
               .max_concurrent_connections(FLAGS_fe_service_threads)
-              .queue_timeout(FLAGS_accepted_client_cnxn_timeout)
+              .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
+              .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
               .Build(&http_server));
       hs2_http_server_.reset(http_server);
       hs2_http_server_->SetConnectionHandler(this);
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 7f334ef..6131659 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -362,6 +362,11 @@ class ImpalaServer : public ImpalaServiceIf,
   /// associated with the closed connection.
   virtual void ConnectionEnd(const ThriftServer::ConnectionContext& session_context);
 
+  /// Returns true if the connection is considered idle. A connection is considered
+  /// idle if all the sessions associated with it have expired due to idle timeout.
+  /// Called when a client has been inactive for --idle_client_poll_period_s seconds.
+  virtual bool IsIdleConnection(const ThriftServer::ConnectionContext& session_context);
+
   void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& topic_deltas,
       std::vector<TTopicDelta>* topic_updates);
 
diff --git a/tests/custom_cluster/test_hs2.py b/tests/custom_cluster/test_hs2.py
index 9b641de..15a9153 100644
--- a/tests/custom_cluster/test_hs2.py
+++ b/tests/custom_cluster/test_hs2.py
@@ -76,8 +76,8 @@ class TestHS2(CustomClusterTestSuite):
     assert status == "Session closed because it has no active connections"
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-      "--idle_session_timeout=1 --disconnected_session_timeout=5")
+  @CustomClusterTestSuite.with_args("--idle_session_timeout=1 "
+       "--disconnected_session_timeout=5 --idle_client_poll_period_s=0")
   def test_expire_disconnected_session(self):
     """Test for the interaction between idle_session_timeout and
     disconnected_session_timeout"""
diff --git a/tests/custom_cluster/test_session_expiration.py b/tests/custom_cluster/test_session_expiration.py
index 027cc41..61b31d6 100644
--- a/tests/custom_cluster/test_session_expiration.py
+++ b/tests/custom_cluster/test_session_expiration.py
@@ -18,31 +18,40 @@
 # Tests for query expiration.
 
 import pytest
+import socket
 from time import sleep
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_cluster import DEFAULT_HS2_PORT
 
 class TestSessionExpiration(CustomClusterTestSuite):
   """Tests query expiration logic"""
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--idle_session_timeout=6")
+  @CustomClusterTestSuite.with_args("--idle_session_timeout=6 "
+      "--idle_client_poll_period_s=0")
   def test_session_expiration(self, vector):
     impalad = self.cluster.get_any_impalad()
     self.__close_default_clients()
     num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired")
+    num_connections = impalad.service.get_metric_value(
+        "impala.thrift-server.beeswax-frontend.connections-in-use")
     client = impalad.service.create_beeswax_client()
     # Sleep for half the expiration time to confirm that the session is not expired early
     # (see IMPALA-838)
     sleep(3)
     assert num_expired == impalad.service.get_metric_value(
-      "impala-server.num-sessions-expired")
+        "impala-server.num-sessions-expired")
     # Wait for session expiration. Impala will poll the session expiry queue every second
     impalad.service.wait_for_metric_value(
-      "impala-server.num-sessions-expired", num_expired + 1, 20)
+        "impala-server.num-sessions-expired", num_expired + 1, 20)
+    # Verify that the idle connection is not closed.
+    assert 1 + num_connections == impalad.service.get_metric_value(
+        "impala.thrift-server.beeswax-frontend.connections-in-use")
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--idle_session_timeout=3")
+  @CustomClusterTestSuite.with_args("--idle_session_timeout=3 "
+      "--idle_client_poll_period_s=0")
   def test_session_expiration_with_set(self, vector):
     impalad = self.cluster.get_any_impalad()
     self.__close_default_clients()
@@ -64,7 +73,8 @@ class TestSessionExpiration(CustomClusterTestSuite):
 
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--idle_session_timeout=5")
+  @CustomClusterTestSuite.with_args("--idle_session_timeout=5 "
+       "--idle_client_poll_period_s=0")
   def test_unsetting_session_expiration(self, vector):
     impalad = self.cluster.get_any_impalad()
     self.__close_default_clients()
@@ -86,7 +96,8 @@ class TestSessionExpiration(CustomClusterTestSuite):
       "impala-server.num-sessions-expired")
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("-default_pool_max_requests 1")
+  @CustomClusterTestSuite.with_args("--default_pool_max_requests=1 "
+      "--idle_client_poll_period_s=0")
   def test_session_expiration_with_queued_query(self, vector):
     """Ensure that a query waiting in queue gets cancelled if the session expires."""
     impalad = self.cluster.get_any_impalad()
@@ -105,6 +116,53 @@ class TestSessionExpiration(CustomClusterTestSuite):
       queued_handle)
     assert "Admission result: Cancelled (queued)" in queued_query_profile
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="--idle_session_timeout=10 "
+      "--idle_client_poll_period_s=1", cluster_size=1)
+  def test_closing_idle_connection(self, vector):
+    """ IMPALA-7802: verifies that connections of idle sessions are closed
+    after the sessions have expired."""
+    impalad = self.cluster.get_any_impalad()
+    self.__close_default_clients()
+
+    for protocol in ['beeswax', 'hiveserver2']:
+      num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired")
+      num_connections_metrics_name = \
+          "impala.thrift-server.{}-frontend.connections-in-use".format(protocol)
+      num_connections = impalad.service.get_metric_value(num_connections_metrics_name)
+
+      # Connect to Impala using either beeswax or HS2 client and verify the number of
+      # opened connections.
+      if protocol == 'beeswax':
+        client = impalad.service.create_beeswax_client()
+      else:
+        client = impalad.service.create_hs2_client()
+      client.execute("select 1")
+      impalad.service.wait_for_metric_value(num_connections_metrics_name,
+           num_connections + 1, 20)
+
+      # Wait till the session has expired.
+      impalad.service.wait_for_metric_value("impala-server.num-sessions-expired",
+           num_expired + 1, 20)
+      # Wait till the idle connection is closed.
+      impalad.service.wait_for_metric_value(num_connections_metrics_name,
+           num_connections, 5)
+
+    # Verify that connecting to HS2 port without establishing a session will not cause
+    # the connection to be closed.
+    num_hs2_connections = impalad.service.get_metric_value(
+        "impala.thrift-server.hiveserver2-frontend.connections-in-use")
+    sock = socket.socket()
+    sock.connect((impalad._get_hostname(), DEFAULT_HS2_PORT))
+    impalad.service.wait_for_metric_value(
+        "impala.thrift-server.hiveserver2-frontend.connections-in-use",
+        num_hs2_connections + 1, 60)
+    # Sleep for some time for the frontend service thread to check for idleness.
+    sleep(15)
+    assert num_hs2_connections + 1 == impalad.service.get_metric_value(
+        "impala.thrift-server.hiveserver2-frontend.connections-in-use")
+    sock.close()
+
   def __close_default_clients(self):
     """Close the clients that were automatically created by setup_class(). These clients
     can expire during test, which results in metrics that tests depend on changing. Each


[impala] 04/04: IMPALA-7608: Estimate row count from file size when no stats available

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b3b00da1a1c7b98e84debe11c10258c4a0dff944
Author: Fang-Yu Rao <fa...@cloudera.com>
AuthorDate: Thu May 23 17:00:33 2019 -0700

    IMPALA-7608: Estimate row count from file size when no stats available
    
    Added the feature that computes an estimated number of rows in the current
    hdfs table if the statistics for the cardinality of the current hdfs table is not
    available.
    
    Also added an additional query option to revert the change in case of regression.
    
    Testing:
    (1) In CardinalityTest.java, replaced the original statement
    "verifyCardinality("SELECT a FROM functional.tinytable", -1);" in
    the method testBasicsWithoutStats() with
    "verifyCardinality("SELECT a FROM functional.tinytable", 2);".
    (2) In CarginalityTest.java, added more tests to check the cardinality
    of most PlanNode implementations. For each tested PlanNode, the behaviors
    before and after we disable the feature are both tested.
    (3) In set.test, modified three related test cases to make sure that
    the added query option is included after executing "set all" in various
    scenarios.
    (4) There are 8 JUnit tests in PlannerTest.java that would produce different
    distributed query plans when this feature is enabled. Added an additional
    JUnit test for 6 of those 8 affected JUnit tests when this feature is
    enabled. Specifically, each tested query in a newly added test files involves
    at least one hdfs table without available statistics.
    We do not add test cases for 2 of the affected JUnit tests when this feature
    is enabled since it results in flaky tests. These two JUnit tests are
    testResourceRequirements() and testSpillableBufferSizing(). In this patch
    we only test them when the feature is disabled.
    (5) There are 5 Python end to end tests that consist of queries that would
    produce different results. Added an additional query for each affected query
    when this feature is disabled.
    
    Change-Id: Ic414121c8df0d5222e4aeea096b5365beb04568a
    Reviewed-on: http://gerrit.cloudera.org:8080/12974
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-options.cc                    |   4 +
 be/src/service/query-options.h                     |  12 +-
 common/thrift/ImpalaInternalService.thrift         |   4 +
 common/thrift/ImpalaService.thrift                 |   4 +
 .../org/apache/impala/catalog/HdfsCompression.java |   2 +-
 .../org/apache/impala/planner/HdfsScanNode.java    | 118 +++-
 .../org/apache/impala/planner/CardinalityTest.java | 629 ++++++++++++++++++++-
 .../org/apache/impala/planner/PlannerTest.java     |  83 ++-
 .../org/apache/impala/planner/PlannerTestBase.java |   5 +-
 ...str-mode-shuffle-hdfs-num-rows-est-enabled.test |  76 +++
 ...k-join-detection-hdfs-num-rows-est-enabled.test |  86 +++
 .../joins-hdfs-num-rows-est-enabled.test           | 574 +++++++++++++++++++
 .../queries/PlannerTest/joins.test                 |  88 +++
 ...-runtime-filters-hdfs-num-rows-est-enabled.test |  59 ++
 ...t-dop-validation-hdfs-num-rows-est-enabled.test | 281 +++++++++
 .../PlannerTest/spillable-buffer-sizing.test       |   4 +-
 ...subquery-rewrite-hdfs-num-rows-est-enabled.test |  31 +
 .../QueryTest/admission-max-min-mem-limits.test    |  14 +
 .../queries/QueryTest/explain-level2.test          |  14 +
 .../queries/QueryTest/inline-view.test             |  31 +
 .../queries/QueryTest/runtime_row_filters.test     |  13 +
 .../functional-query/queries/QueryTest/set.test    |   3 +
 .../queries/QueryTest/stats-extrapolation.test     |  33 +-
 23 files changed, 2144 insertions(+), 24 deletions(-)

diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index dc1c365..d7cd260 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -796,6 +796,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_parquet_page_row_count_limit(row_count_limit);
         break;
       }
+      case TImpalaQueryOptions::DISABLE_HDFS_NUM_ROWS_ESTIMATE: {
+        query_options->__set_disable_hdfs_num_rows_estimate(IsTrue(value));
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 014e6c8..b4ffad2 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -39,9 +39,15 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // Macro to help generate functions that use or manipulate query options.
 // If the DCHECK is hit then handle the missing query option below and update
 // the DCHECK.
+// Specifically, the DCHECK will make sure that the number of elements in
+// the map _TImpalaQueryOptions_VALUES_TO_NAMES automatically generated in
+// ImpalaService_types.cpp is equal to the largest integer associated with an
+// option in the enum TImpalaQueryOptions (defined in ImpalaService.thrift)
+// plus one. Thus, the second argument to the DCHECK has to be updated every
+// time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::PARQUET_PAGE_ROW_COUNT_LIMIT + 1);\
+      TImpalaQueryOptions::DISABLE_HDFS_NUM_ROWS_ESTIMATE + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -160,7 +166,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(parquet_write_page_index, PARQUET_WRITE_PAGE_INDEX,\
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(parquet_page_row_count_limit, PARQUET_PAGE_ROW_COUNT_LIMIT,\
-      TQueryOptionLevel::ADVANCED)
+      TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(disable_hdfs_num_rows_estimate, DISABLE_HDFS_NUM_ROWS_ESTIMATE,\
+      TQueryOptionLevel::REGULAR)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 1659b4e..4f5a070 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -348,6 +348,10 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift.
   83: optional i32 parquet_page_row_count_limit;
+
+  // Disable the attempt to compute an estimated number of rows in an
+  // hdfs table.
+  84: optional bool disable_hdfs_num_rows_estimate = false;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 1b746e2..46ca91c 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -396,6 +396,10 @@ enum TImpalaQueryOptions {
 
   // Maximum number of rows written in a single Parquet data page.
   PARQUET_PAGE_ROW_COUNT_LIMIT = 82
+
+  // Disable the attempt to compute an estimated number of rows in an
+  // hdfs table.
+  DISABLE_HDFS_NUM_ROWS_ESTIMATE = 83
 }
 
 // The summary of a DML statement.
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
index 23282c3..b9218bd 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
@@ -44,7 +44,7 @@ public enum HdfsCompression {
   ZSTD;
 
   /* Map from a suffix to a compression type */
-  private static final ImmutableMap<String, HdfsCompression> SUFFIX_MAP =
+  public static final ImmutableMap<String, HdfsCompression> SUFFIX_MAP =
       ImmutableMap.<String, HdfsCompression>builder().
           put("deflate", DEFLATE).
           put("gz", GZIP).
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 2a2f4e6..4f084f2 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -30,7 +30,6 @@ import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
@@ -52,13 +51,14 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnStats;
-import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
@@ -87,7 +87,6 @@ import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.BitUtil;
 import org.apache.impala.util.ExecutorMembershipSnapshot;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,6 +94,8 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -160,6 +161,42 @@ public class HdfsScanNode extends ScanNode {
   // Read size for Parquet and ORC footers. Matches HdfsScanner::FOOTER_SIZE in backend.
   private static final long FOOTER_SIZE = 100L * 1024L;
 
+  // When the information of cardinality is not available for the underlying hdfs table,
+  // i.e., the field of cardinality_ is equal to -1, we will attempt to compute an
+  // estimate for the number of rows in getStatsNumbers().
+  // Specifically, we divide the files into 3 categories - uncompressed,
+  // legacy compressed (e.g., text, avro, rc, seq), and
+  // columnar (e.g., parquet and orc).
+  // Depending on the category of a file, we multiply the size of the file by
+  // its corresponding compression factor to derive an estimated original size
+  // of the file before compression.
+  // These estimates were computed based on the empirical compression ratios
+  // that we have observed for 3 tables in our tpch datasets:
+  // customer, lineitem, and orders.
+  // The max compression ratio we have seen for legacy formats is 3.58, whereas
+  // the max compression ratio we have seen for columnar formats is 4.97.
+  private static double ESTIMATED_COMPRESSION_FACTOR_UNCOMPRESSED = 1.0;
+  private static double ESTIMATED_COMPRESSION_FACTOR_LEGACY = 3.58;
+  private static double ESTIMATED_COMPRESSION_FACTOR_COLUMNAR = 4.97;
+
+  private static Set<HdfsFileFormat> VALID_LEGACY_FORMATS =
+      ImmutableSet.<HdfsFileFormat>builder()
+      .add(HdfsFileFormat.RC_FILE)
+      .add(HdfsFileFormat.TEXT)
+      .add(HdfsFileFormat.LZO_TEXT)
+      .add(HdfsFileFormat.SEQUENCE_FILE)
+      .add(HdfsFileFormat.AVRO)
+      .build();
+
+  private static Set<HdfsFileFormat> VALID_COLUMNAR_FORMATS =
+      ImmutableSet.<HdfsFileFormat>builder()
+      .add(HdfsFileFormat.PARQUET)
+      .add(HdfsFileFormat.ORC)
+      .build();
+
+  //An estimate of the width of a row when the information is not available.
+  private double DEFAULT_ROW_WIDTH_ESTIMATE = 1.0;
+
   private final FeFsTable tbl_;
 
   // List of partitions to be scanned. Partitions have been pruned.
@@ -275,6 +312,7 @@ public class HdfsScanNode extends ScanNode {
   // Conjuncts used to trim the set of partitions passed to this node.
   // Used only to display EXPLAIN information.
   private final List<Expr> partitionConjuncts_;
+
   /**
    * Construct a node to scan given data files into tuples described by 'desc',
    * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and
@@ -1044,7 +1082,7 @@ public class HdfsScanNode extends ScanNode {
   public void computeStats(Analyzer analyzer) {
     Preconditions.checkNotNull(scanRangeSpecs_);
     super.computeStats(analyzer);
-    computeCardinalities();
+    computeCardinalities(analyzer);
     computeNumNodes(analyzer, cardinality_);
   }
 
@@ -1065,11 +1103,11 @@ public class HdfsScanNode extends ScanNode {
    * Sets these members:
    * extrapolatedNumRows_, inputCardinality_, cardinality_
    */
-  private void computeCardinalities() {
+  private void computeCardinalities(Analyzer analyzer) {
     // Choose between the extrapolated row count and the one based on stored stats.
     extrapolatedNumRows_ = FeFsTable.Utils.getExtrapolatedNumRows(tbl_,
             sumValues(totalBytesPerFs_));
-    long statsNumRows = getStatsNumRows();
+    long statsNumRows = getStatsNumRows(analyzer.getQueryOptions());
     if (extrapolatedNumRows_ != -1) {
       // The extrapolated row count is based on the 'totalBytesPerFs_' which already
       // accounts for table sampling, so no additional adjustment for sampling is
@@ -1133,7 +1171,7 @@ public class HdfsScanNode extends ScanNode {
    * Sets these members:
    * numPartitionsWithNumRows_, partitionNumRows_, hasCorruptTableStats_.
    */
-  private long getStatsNumRows() {
+  private long getStatsNumRows(TQueryOptions queryOptions) {
     numPartitionsWithNumRows_ = 0;
     partitionNumRows_ = -1;
     hasCorruptTableStats_ = false;
@@ -1157,12 +1195,78 @@ public class HdfsScanNode extends ScanNode {
     // Table is unpartitioned or the table is partitioned but no partitions have stats.
     // Set cardinality based on table-level stats.
     long numRows = tbl_.getNumRows();
+    // Depending on the query option of disable_hdfs_num_rows_est, if numRows
+    // is still not available, we provide a crude estimation by computing
+    // sumAvgRowSizes, the sum of the slot size of each column of scalar type,
+    // and then generate the estimate using sumValues(totalBytesPerFs_), the size of
+    // the hdfs table.
+    if (!queryOptions.disable_hdfs_num_rows_estimate && numRows == -1L) {
+      // Compute the estimated table size when taking compression into consideration
+      long estimatedTableSize = computeEstimatedTableSize();
+
+      double sumAvgRowSizes = 0.0;
+      for (Column col : tbl_.getColumns()) {
+        Type currentType = col.getType();
+        if (currentType instanceof ScalarType) {
+          if (col.getStats().hasAvgSize()) {
+            sumAvgRowSizes = sumAvgRowSizes + col.getStats().getAvgSerializedSize();
+          } else {
+            sumAvgRowSizes = sumAvgRowSizes + col.getType().getSlotSize();
+          }
+        }
+      }
+
+      if (sumAvgRowSizes == 0.0) {
+        // When the type of each Column is of ArrayType or MapType,
+        // sumAvgRowSizes would be equal to 0. In this case, we use a ultimate
+        // fallback row width if sumAvgRowSizes == 0.0.
+        numRows = Math.round(estimatedTableSize / DEFAULT_ROW_WIDTH_ESTIMATE);
+      } else {
+        numRows = Math.round(estimatedTableSize / sumAvgRowSizes);
+      }
+    }
     if (numRows < -1 || (numRows == 0 && tbl_.getTotalHdfsBytes() > 0)) {
       hasCorruptTableStats_ = true;
     }
     return numRows;
   }
 
+  /** Compute the estimated table size when taking compression into consideration */
+  private long computeEstimatedTableSize() {
+    long estimatedTableSize = 0;
+    for (FeFsPartition p: partitions_) {
+      HdfsFileFormat format = p.getFileFormat();
+      long estimatedPartitionSize = 0;
+      if (format == HdfsFileFormat.TEXT) {
+        for (FileDescriptor desc : p.getFileDescriptors()) {
+          HdfsCompression compression
+            = HdfsCompression.fromFileName(desc.getRelativePath().toString());
+          if (HdfsCompression.SUFFIX_MAP.containsValue(compression)) {
+            estimatedPartitionSize += Math.round(desc.getFileLength()
+                * ESTIMATED_COMPRESSION_FACTOR_LEGACY);
+          } else {
+            // When the text file is not compressed.
+            estimatedPartitionSize += Math.round(desc.getFileLength()
+                * ESTIMATED_COMPRESSION_FACTOR_UNCOMPRESSED);
+          }
+        }
+      } else {
+        // When the current partition is not a text file.
+        if (VALID_LEGACY_FORMATS.contains(format)) {
+          estimatedPartitionSize += Math.round(p.getSize()
+              * ESTIMATED_COMPRESSION_FACTOR_LEGACY);
+        } else {
+         Preconditions.checkState(VALID_COLUMNAR_FORMATS.contains(format),
+             "Unknown HDFS compressed format: %s", this);
+         estimatedPartitionSize += Math.round(p.getSize()
+             * ESTIMATED_COMPRESSION_FACTOR_COLUMNAR);
+        }
+      }
+      estimatedTableSize += estimatedPartitionSize;
+    }
+    return estimatedTableSize;
+  }
+
   /**
    * Estimate the number of impalad nodes that this scan node will execute on (which is
    * ultimately determined by the scheduling done by the backend's Scheduler).
diff --git a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
index e1e4433..441c010 100644
--- a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
@@ -20,7 +20,10 @@ package org.apache.impala.planner;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.service.Frontend.PlanCtx;
@@ -29,12 +32,16 @@ import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryOptions;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableSet;
+
 /**
  * Test the planner's inference of tuple cardinality from metadata NDV and
  * resulting selectivity.
  */
 public class CardinalityTest extends PlannerTestBase {
 
+  private static double CARDINALITY_TOLERANCE = 0.05;
+
   /**
    * Test the happy path: table with stats, no all-null cols.
    */
@@ -206,8 +213,531 @@ public class CardinalityTest extends PlannerTestBase {
 
   @Test
   public void testBasicsWithoutStats() {
-    // IMPALA-7608: no cardinality is available (result is -1)
-    verifyCardinality("SELECT a FROM functional.tinytable", -1);
+    verifyApproxCardinality("SELECT a FROM functional.tinytable", 2);
+  }
+
+  @Test
+  public void testBasicsWithoutStatsWithHDFSNumRowsEstDisabled() {
+    verifyCardinality("SELECT a FROM functional.tinytable", -1, false,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE));
+  }
+
+  /**
+   *  functional.alltypesmixedformat is a table of 4 partitions, each having a different
+   *  format. These formats are text, sequence file, rc file, and parquet.
+   *  True cardinality of functional.alltypesmixedformat is 1200.
+   *  Estimated cardinality of functional.alltypesmixedformat is 2536.
+   */
+  @Test
+  public void testTableOfMixedTypesWithoutStats() {
+    verifyApproxCardinality("SELECT * FROM functional.alltypesmixedformat", 2536);
+  }
+
+  @Test
+  public void testTableOfMixedTypesWithoutStatsWithHDFSNumRowsEstDisabled() {
+    verifyCardinality("SELECT * FROM functional.alltypesmixedformat", -1, false,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE));
+  }
+
+
+  /**
+   * tpch_text_gzip.lineitem is a table of 1 partition but having 6 files.
+   * True cardinality of tpch_text_gzip.lineitem is 6,001,215.
+   * Estimated cardinality of tpch_text_gzip.lineitem is 5,141,177.
+   */
+  @Test
+  public void testTableOfMultipleFilesWithoutStats() {
+    verifyApproxCardinality("SELECT * FROM tpch_text_gzip.lineitem", 5_141_177);
+  }
+
+  @Test
+  public void testTableOfMultipleFilesWithoutStatsWithHDFSNumRowsEstDisabled() {
+    verifyCardinality("SELECT * FROM tpch_text_gzip.lineitem", -1, false,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE));
+  }
+
+  @Test
+  public void testAggregationNodeCount() {
+    // Create the paths to the AggregationNode's of interest
+    // in a distributed plan not involving GROUP BY.
+    // Since there are two resulting AggregationNode's, we create two paths.
+    List<Integer> pathToFirstAggregationNode = Arrays.asList();
+    List<Integer> pathToSecondAggregationNode = Arrays.asList(0, 0);
+
+    // There is no available statistics in functional.tinytable.
+    // True cardinality of functional.tinytable is 3.
+    // Estimated cardinality of functional.tinytable is 2.
+    // Estimated cardinality of the resulting AggregateNode's not involving
+    // GROUP BY is 1 for both AggregationNodes's no matter whether or not
+    // there is available statistics for the underlying hdfs table.
+    verifyApproxCardinality("SELECT COUNT(a) FROM functional.tinytable", 1, true,
+        ImmutableSet.of(), pathToFirstAggregationNode, AggregationNode.class);
+    verifyApproxCardinality("SELECT COUNT(a) FROM functional.tinytable", 1, true,
+        ImmutableSet.of(), pathToSecondAggregationNode, AggregationNode.class);
+
+  }
+
+  @Test
+  public void testAggregationNodeCountWithHDFSNumRowsEstDisabled() {
+    // Create the paths to the AggregationNode's of interest
+    // in a distributed plan not involving GROUP BY.
+    // Since there are two resulting AggregationNode's, we create two paths.
+    List<Integer> pathToFirstAggregationNode = Arrays.asList();
+    List<Integer> pathToSecondAggregationNode = Arrays.asList(0, 0);
+
+    // There is no available statistics in functional.tinytable.
+    // True cardinality of functional.tinytable is 3.
+    // Estimated cardinality of functional.tinytable is 2.
+    // Estimated cardinality of the resulting AggregateNode's not involving
+    // GROUP BY is 1 for both AggregationNodes's no matter whether or not
+    // there is available statistics for the underlying hdfs table.
+    verifyApproxCardinality("SELECT COUNT(a) FROM functional.tinytable", 1, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        pathToFirstAggregationNode, AggregationNode.class);
+    verifyApproxCardinality("SELECT COUNT(a) FROM functional.tinytable", 1, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        pathToSecondAggregationNode, AggregationNode.class);
+
+  }
+
+  @Test
+  public void testAggregationNodeGroupBy() {
+    // Create the paths to the AggregationNode's of interest
+    // in a distributed plan involving GROUP BY.
+    // Since there are two resulting AggregationNode's, we create two paths.
+    List<Integer> pathToFirstAggregationNode = Arrays.asList(0);
+    List<Integer> pathToSecondAggregationNode = Arrays.asList(0, 0, 0);
+
+    // There is no available statistics in functional.tinytable.
+    // True cardinality of functional.tinytable is 3.
+    // Estimated cardinality of functional.tinytable is 2.
+    // Estimated cardinality of the resulting AggregationNode's involving
+    // GROUP BY is 2 for both AggregationNode's.
+    verifyApproxCardinality("SELECT COUNT(a) FROM functional.tinytable "
+        + "GROUP BY a", 2, true, ImmutableSet.of(),
+        pathToFirstAggregationNode, AggregationNode.class);
+    verifyApproxCardinality("SELECT COUNT(a) FROM functional.tinytable "
+        + "GROUP BY a", 2, true, ImmutableSet.of(),
+        pathToSecondAggregationNode, AggregationNode.class);
+  }
+
+  @Test
+  public void testAggregationNodeGroupByWithHDFSNumRowsEstDisabled() {
+    // Create the paths to the AggregationNode's of interest
+    // in a distributed plan involving GROUP BY..
+    // Since there are two resulting AggregationNode's, we create two paths.
+    List<Integer> pathToFirstAggregationNode = Arrays.asList(0);
+    List<Integer> pathToSecondAggregationNode = Arrays.asList(0, 0, 0);
+
+    // There is no available statistics in functional.tinytable.
+    // True cardinality of functional.tinytable is 3.
+    // Estimated cardinality of the resulting AggregationNode's involving
+    // GROUP BY is -1 for both AggregationNode's.
+    verifyApproxCardinality("SELECT COUNT(a) FROM functional.tinytable "
+        + "GROUP BY a", -1, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        pathToFirstAggregationNode, AggregationNode.class);
+    verifyApproxCardinality("SELECT COUNT(a) FROM functional.tinytable "
+        + "GROUP BY a", -1, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        pathToSecondAggregationNode, AggregationNode.class);
+  }
+
+  @Test
+  public void testAnalyticEvalNode() {
+    // Since the root node of the generated distributed plan is
+    // the AnalyticEvalNode of interest, we do not have to create a
+    // path to it explicitly.
+    List<Integer> path = Arrays.asList();
+
+    // There is no available statistics in functional_parquet.alltypestiny.
+    // True cardinality of functional_parquet.alltypestiny is 8.
+    // Estimated cardinality of functional_parquet.alltypestiny is 742.
+    verifyApproxCardinality("SELECT SUM(int_col) OVER() int_col "
+        + "FROM functional_parquet.alltypestiny", 742, true,
+        ImmutableSet.of(), path, AnalyticEvalNode.class);
+  }
+
+  @Test
+  public void testAnalyticEvalNodeWithHDFSNumRowsEstDisabled() {
+    // Since the root node of the generated distributed plan is
+    // the AnalyticEvalNode of interest, we do not have to create a
+    // path to it explicitly.
+    List<Integer> path = Arrays.asList();
+
+    // There is no available statistics in functional_parquet.alltypestiny.
+    // True cardinality of functional_parquet.alltypestiny is 8.
+    verifyApproxCardinality("SELECT SUM(int_col) OVER() int_col "
+        + "FROM functional_parquet.alltypestiny", -1, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        path, AnalyticEvalNode.class);
+  }
+
+  @Test
+  public void testCardinalityCheckNode() {
+    // Create the path to the CardinalityCheckNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0, 1, 0);
+
+    String subQuery = "(SELECT id "
+        + "FROM functional_parquet.alltypestiny b "
+        + "WHERE id = 1)";
+
+    // There is no available statistics in functional_parquet.alltypestiny.
+    // True cardinality of functional_parquet.alltypestiny is 8.
+    // Estimated cardinality of functional_parquet.alltypestiny is 523.
+    verifyApproxCardinality("SELECT bigint_col "
+        + "FROM functional_parquet.alltypestiny a "
+        + "WHERE id = " + subQuery, 1, true,
+        ImmutableSet.of(), path, CardinalityCheckNode.class);
+  }
+
+  @Test
+  public void testCardinalityCheckNodeWithHDFSNumRowsEstDisabled() {
+    // Create the path to the CardinalityCheckNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0, 1, 0);
+
+    String subQuery = "(SELECT id "
+        + "FROM functional_parquet.alltypestiny b "
+        + "WHERE id = 1)";
+
+    // There is no available statistics in functional_parquet.alltypestiny.
+    // True cardinality of functional_parquet.alltypestiny is 8.
+    verifyApproxCardinality("SELECT bigint_col "
+        + "FROM functional_parquet.alltypestiny a "
+        + "WHERE id = " + subQuery, 1, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        path, CardinalityCheckNode.class);
+  }
+
+  @Test
+  public void testEmptySetNode() {
+    // Since the root node of the generated distributed plan is
+    // the EmptySetNode of interest, we do not have to create a
+    // path to it explicitly.
+    List<Integer> path = Arrays.asList();
+
+    // There is no available statistics in functional_parquet.alltypestiny.
+    // True cardinality of functional_parquet.alltypestiny is 8.
+    // Estimated cardinality of functional_parquet.alltypestiny is 523.
+    String subQuery = "(SELECT * "
+        + "FROM functional_parquet.alltypestiny "
+        + "LIMIT 0)";
+    verifyApproxCardinality("SELECT 1 "
+        + "FROM functional_parquet.alltypessmall "
+        + "WHERE EXISTS " + subQuery, 0, true,
+        ImmutableSet.of(), path, EmptySetNode.class);
+  }
+
+  @Test
+  public void testEmptySetNodeWithHDFSNumRowsEstDisabled() {
+    // Since the root node of the generated distributed plan is
+    // the EmptySetNode of interest, we do not have to create a
+    // path to it explicitly.
+    List<Integer> path = Arrays.asList();
+
+    // There is no available statistics in functional_parquet.alltypestiny.
+    // True cardinality of functional_parquet.alltypestiny is 8.
+    String subQuery = "(SELECT * "
+        + "FROM functional_parquet.alltypestiny "
+        + "LIMIT 0)";
+    verifyApproxCardinality("SELECT 1 "
+        + "FROM functional_parquet.alltypessmall "
+        + "WHERE EXISTS " + subQuery, 0, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        path, EmptySetNode.class);
+  }
+
+  @Test
+  public void testExchangeNode() {
+    // Since the root node of the generated distributed plan is
+    // the ExchangeNode of interest, we do not have to create a
+    // path to it explicitly.
+    List<Integer> path = Arrays.asList();
+
+    // There is no available statistics in functional.tinytable.
+    // True cardinality of functional.tinytable is 3.
+    // Estimated cardinality of functional.tinytable is 2.
+    verifyApproxCardinality("SELECT a FROM functional.tinytable", 2, true,
+        ImmutableSet.of(), path, ExchangeNode.class);
+  }
+
+  @Test
+  public void testExchangeNodeWithHDFSNumRowsEstDisabled() {
+    // Since the root node of the generated distributed plan is
+    // the ExchangeNode of interest, we do not have to create a
+    // path to it explicitly.
+    List<Integer> path = Arrays.asList();
+
+    // There is no available statistics in functional.tinytable.
+    // True cardinality of functional.tinytable is 3.
+    verifyApproxCardinality("SELECT a FROM functional.tinytable", -1, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        path, ExchangeNode.class);
+  }
+
+  @Test
+  public void testHashJoinNode() {
+    // Create the path to the HashJoinNode of interest
+    // in the distributed plan.
+    List<Integer> path = Arrays.asList(0);
+
+    // There is no available statistics in functional.tinytable.
+    // True cardinality of functional.tinytable is 3.
+    // Estimated cardinality of functional.tinytable is 2.
+    verifyApproxCardinality("SELECT * "
+        + "FROM functional.tinytable x INNER JOIN "
+        + "functional.tinytable y ON x.a = y.a", 2, true,
+        ImmutableSet.of(), path, HashJoinNode.class);
+  }
+
+  @Test
+  public void testHashJoinNodeWithHDFSNumRowsEstDisabled() {
+    // Create the path to the HashJoinNode of interest
+    // in the distributed plan.
+    List<Integer> path = Arrays.asList(0);
+
+    // There is no available statistics in functional.tinytable.
+    // True cardinality of functional.tinytable is 3.
+    verifyApproxCardinality("SELECT * "
+        + "FROM functional.tinytable x INNER JOIN "
+        + "functional.tinytable y ON x.a = y.a", -1, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        path, HashJoinNode.class);
+  }
+
+  @Test
+  public void testNestedLoopJoinNode() {
+    // Create the path to the NestedLoopJoinNode of interest
+    // in the distributed plan.
+    List<Integer> path = Arrays.asList(0);
+
+    // There is no available statistics in functional_parquet.alltypestiny.
+    // True cardinality of functional_parquet.alltypestiny is 8.
+    // Estimated cardinality of functional_parquet.alltypestiny is 742.
+    // Estimated cardinality of the NestedLoopJoinNode is 550,564 = 742 * 742.
+    verifyApproxCardinality("SELECT * "
+        + "FROM functional_parquet.alltypestiny a, "
+        + "functional_parquet.alltypestiny b", 550_564, true,
+        ImmutableSet.of(), path, NestedLoopJoinNode.class);
+  }
+
+  @Test
+  public void testNestedLoopJoinNodeWithHDFSNumRowsEstDisabled() {
+    // Create the path to the NestedLoopJoinNode of interest
+    // in the distributed plan.
+    List<Integer> path = Arrays.asList(0);
+
+    // There is no available statistics in functional_parquet.alltypestiny.
+    // True cardinality of functional_parquet.alltypestiny is 8.
+    verifyApproxCardinality("SELECT * "
+        + "FROM functional_parquet.alltypestiny a, "
+        + "functional_parquet.alltypestiny b", -1, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        path, NestedLoopJoinNode.class);
+  }
+
+  @Test
+  public void testHdfsScanNode() {
+    // Create the path to the HdfsScanNode of interest
+    // in the distributed plan.
+    List<Integer> path = Arrays.asList(0);
+
+    // There is no available statistics in functional.tinytable.
+    // True cardinality of functional.tinytable is 3.
+    // Estimated cardinality of functional.tinytable is 2.
+    verifyApproxCardinality("SELECT a FROM functional.tinytable", 2, true,
+        ImmutableSet.of(), path, HdfsScanNode.class);
+  }
+
+  @Test
+  public void testHdfsScanNodeWithHDFSNumRowsEstDisabled() {
+    // Create the path to the HdfsScanNode of interest
+    // in the distributed plan.
+    List<Integer> path = Arrays.asList(0);
+
+    // There is no available statistics in functional.tinytable.
+    // True cardinality of functional.tinytable is 3.
+    verifyApproxCardinality("SELECT a FROM functional.tinytable", -1, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        path, HdfsScanNode.class);
+  }
+
+  // TODO(IMPALA-8647): It seems that the cardinality of the SelectNode should be 1
+  // instead of 0. Specifically, if we had executed "compute stats
+  // functional_parquet.alltypestiny" before issuing this
+  // SQL statement, the returned cardinality of this SelectNode would be 1
+  // instead of 0. Not very sure if this is a bug. It looks like the cardinality
+  // of a SelectNode depends on whether there is stats information associated
+  // with its child node. The cardinality of a SelectNode would still be 0
+  // even if its child node (ExchangeNode in this case) has a non-zero cardinality.
+  @Test
+  public void testSelectNode() {
+    // Create the path to the SelectNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0, 1, 0);
+
+    // There is no available statistics in functional_parquet.alltypestiny.
+    // True cardinality of functional_parquet.alltypestiny is 8.
+    // Estimated cardinality of functional_parquet.alltypestiny is 523.
+    // There is no available statistics in functional_parquet.alltypessmall.
+    // True cardinality of functional_parquet.alltypessmall is 100.
+    // Estimated cardinality of functional_parquet.alltypessmall is 649.
+    String subQuery = "(SELECT int_col "
+        + "FROM functional_parquet.alltypestiny "
+        + "LIMIT 1)";
+    verifyApproxCardinality("SELECT * "
+        + "FROM functional_parquet.alltypessmall "
+        + "WHERE 1 IN " + subQuery, 0, true,
+        ImmutableSet.of(), path, SelectNode.class);
+  }
+
+  @Test
+  public void testSelectNodeWithHDFSNumRowsEstDisabled() {
+    // Create the path to the SelectNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0, 1, 0);
+
+    // There is no available statistics in functional_parquet.alltypestiny.
+    // True cardinality of functional_parquet.alltypestiny is 8.
+    // There is no available statistics in functional_parquet.alltypessmall.
+    // True cardinality of functional_parquet.alltypessmall is 100.
+    String subQuery = "(SELECT int_col "
+        + "FROM functional_parquet.alltypestiny "
+        + "LIMIT 1)";
+    verifyApproxCardinality("SELECT * "
+        + "FROM functional_parquet.alltypessmall "
+        + "WHERE 1 IN " + subQuery, 0, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        path, SelectNode.class);
+  }
+
+  @Test
+  public void testSingularRowSrcNode() {
+    // Create the path to the SingularRowSrcNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0, 1, 1);
+
+    verifyApproxCardinality("SELECT c_custkey, pos "
+        + "FROM tpch_nested_parquet.customer c, c.c_orders", 1, true,
+        ImmutableSet.of(), path, SingularRowSrcNode.class);
+  }
+
+  @Test
+  public void testSingularRowSrcNodeWithHDFSNumRowsEstDisabled() {
+    // Create the path to the SingularRowSrcNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0, 1, 1);
+
+    verifyApproxCardinality("SELECT c_custkey, pos "
+        + "FROM tpch_nested_parquet.customer c, c.c_orders", 1, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        path, SingularRowSrcNode.class);
+  }
+
+  @Test
+  public void testSortNode() {
+    // Create the path to the SortNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0);
+
+    // There is no available statistics in functional_parquet.alltypestiny.
+    // True cardinality of functional_parquet.alltypestiny is 8.
+    // Estimated cardinality of functional_parquet.alltypestiny is 742.
+    verifyApproxCardinality("SELECT * "
+        + "FROM functional_parquet.alltypestiny "
+        + "ORDER BY int_col", 742, true, ImmutableSet.of(), path,
+        SortNode.class);
+  }
+
+  @Test
+  public void testSortNodeWithHDFSNumRowsEstDisabled() {
+    // Create the path to the SortNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0);
+
+    // There is no available statistics in functional_parquet.alltypestiny.
+    // True cardinality of functional_parquet.alltypestiny is 8.
+    verifyApproxCardinality("SELECT * "
+        + "FROM functional_parquet.alltypestiny "
+        + "ORDER BY int_col", -1, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        path, SortNode.class);
+  }
+
+  @Test
+  public void testSubPlanNode() {
+    // Create the path to the SubplanNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0);
+
+    verifyApproxCardinality("SELECT c_custkey, pos "
+        + "FROM tpch_nested_parquet.customer c, c.c_orders", 1_500_000, true,
+        ImmutableSet.of(), path, SubplanNode.class);
+  }
+
+  @Test
+  public void testSubPlanNodeWithHDFSNumRowsEstDisabled() {
+    // Create the path to the SubplanNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0);
+
+    verifyApproxCardinality("SELECT c_custkey, pos "
+        + "FROM tpch_nested_parquet.customer c, c.c_orders", 1_500_000, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        path, SubplanNode.class);
+  }
+
+  @Test
+  public void testUnionNode() {
+    // Create the path to the UnionNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0);
+
+    // There is no available statistics in functional.tinytable.
+    // True cardinality of functional.tinytable is 3.
+    // Estimated cardinality of functional.tinytable is 2.
+    String subQuery = "SELECT * FROM functional.tinytable";
+    verifyApproxCardinality(subQuery + " UNION ALL " + subQuery, 4, true,
+        ImmutableSet.of(), path, UnionNode.class);
+  }
+
+  @Test
+  public void testUnionNodeWithHDFSNumRowsEstDisabled() {
+    // Create the path to the UnionNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0);
+
+    // There is no available statistics in functional.tinytable.
+    // True cardinality of functional.tinytable is 3.
+    String subQuery = "SELECT * FROM functional.tinytable";
+    verifyApproxCardinality(subQuery + " UNION ALL " + subQuery, 0, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        path, UnionNode.class);
+  }
+
+  @Test
+  public void testUnnestNode() {
+    // Create the path to the UnnestNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0, 1, 0);
+
+    verifyApproxCardinality("SELECT c_custkey, pos "
+        + "FROM tpch_nested_parquet.customer c, c.c_orders", 10, true,
+        ImmutableSet.of(), path, UnnestNode.class);
+  }
+
+  @Test
+  public void testUnnestNodeWithHDFSNumRowsEstDisabled() {
+    // Create the path to the UnnestNode of interest
+    // in a distributed plan.
+    List<Integer> path = Arrays.asList(0, 1, 0);
+
+    verifyApproxCardinality("SELECT c_custkey, pos "
+        + "FROM tpch_nested_parquet.customer c, c.c_orders", 10, true,
+        ImmutableSet.of(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE),
+        path, UnnestNode.class);
   }
 
   /**
@@ -222,10 +752,72 @@ public class CardinalityTest extends PlannerTestBase {
   protected void verifyCardinality(String query, long expected) {
     List<PlanFragment> plan = getPlan(query);
     PlanNode planRoot = plan.get(0).getPlanRoot();
+    assertEquals("Cardinality error for: " + query, expected,
+        planRoot.getCardinality());
+  }
+
+  protected void verifyCardinality(String query, long expected,
+      boolean isDistributedPlan, Set<PlannerTestOption> testOptions) {
+    List<PlanFragment> plan = getPlan(query, isDistributedPlan, testOptions);
+    PlanNode planRoot = plan.get(0).getPlanRoot();
+    assertEquals("Cardinality error for: " + query, expected,
+        planRoot.getCardinality());
+  }
+
+  /**
+   * The cardinality check performed by this method allows for a margin of
+   * error. Specifically, two cardinalities are considered equal as long as
+   * the difference is upper bounded by the expected number multiplied by
+   * CARDINALITY_TOLERANCE.
+   *
+   * @param query query to test
+   * @param expected expected cardinality at the root node
+   */
+  protected void verifyApproxCardinality(String query, long expected) {
+    List<PlanFragment> plan = getPlan(query);
+    PlanNode planRoot = plan.get(0).getPlanRoot();
+    assertEquals("Cardinality error for: " + query, expected,
+        planRoot.getCardinality(), expected * CARDINALITY_TOLERANCE);
+  }
+
+  /**
+   * This method allows us to inspect the cardinality of a PlanNode located by
+   * path with respect to the root of the retrieved query plan. The class of
+   * the located PlanNode by path will also be checked against cl, the class of
+   * the PlanNode of interest.
+   * In addition, the cardinality check performed by this method allows for a
+   * margin of error. Specifically, two cardinalities are considered equal as
+   * long as the difference is upper bounded by the expected number multiplied
+   * by CARDINALITY_TOLERANCE.
+   *
+   * @param query query to test
+   * @param expected expected cardinality at the PlanNode of interest
+   * @param isDistributedPlan set to true if we would like to generate
+   * a distributed plan
+   * @param testOptions specified test options
+   * @param path path to the PlanNode of interest
+   * @param cl class of the PlanNode of interest
+   */
+  protected void verifyApproxCardinality(String query, long expected,
+      boolean isDistributedPlan, Set<PlannerTestOption> testOptions,
+      List<Integer> path, Class<?> cl) {
+
+    List<PlanFragment> plan = getPlan(query, isDistributedPlan, testOptions);
+    // We use the last element on the List of PlanFragment
+    // because this PlanFragment encloses all the PlanNode's
+    // in the query plan (either the single node plan or
+    // the distributed plan).
+    PlanNode currentNode = plan.get(plan.size() - 1).getPlanRoot();
+    for (Integer currentChildIndex: path) {
+      currentNode = currentNode.getChild(currentChildIndex);
+    }
+    assertEquals("PlanNode class not matched: ", cl.getName(),
+        currentNode.getClass().getName());
     assertEquals("Cardinality error for: " + query,
-        expected, planRoot.getCardinality());
+        expected, currentNode.getCardinality(), expected * CARDINALITY_TOLERANCE);
   }
 
+
   /**
    * Given a query, run the planner and extract the physical plan prior
    * to conversion to Thrift. Extract the first (or, more typically, only
@@ -258,4 +850,35 @@ public class CardinalityTest extends PlannerTestBase {
     }
     return planCtx.getPlan();
   }
+
+  private List<PlanFragment> getPlan(String query,
+      boolean isDistributedPlan, Set<PlannerTestOption> testOptions) {
+    TQueryCtx queryCtx = TestUtils.createQueryContext(
+        "default", System.getProperty("user.name"));
+    queryCtx.client_request.setStmt(query);
+
+    // Disable the attempt to compute an estimated number of rows in an
+    // hdfs table.
+    TQueryOptions queryOptions = queryCtx.client_request.getQuery_options();
+    queryOptions.setDisable_hdfs_num_rows_estimate(
+        testOptions.contains(PlannerTestOption.DISABLE_HDFS_NUM_ROWS_ESTIMATE));
+    // Instruct the planner to generate a distributed plan
+    // by setting the query option of NUM_NODES to 0 if
+    // distributedPlan is set to true.
+    // Otherwise, set NUM_NODES to 1 to instruct the planner to
+    // create a single node plan.
+    queryOptions.setNum_nodes(isDistributedPlan? 0: 1);
+
+    // Plan the query, discard the actual execution plan, and
+    // return the plan tree.
+    PlanCtx planCtx = new PlanCtx(queryCtx);
+    planCtx.requestPlanCapture();
+    try {
+      frontend_.createExecRequest(planCtx);
+    } catch (ImpalaException e) {
+      fail(e.getMessage());
+    }
+    return planCtx.getPlan();
+  }
+
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 4223e6c..7f9d448 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -217,7 +217,17 @@ public class PlannerTest extends PlannerTestBase {
 
   @Test
   public void testJoins() {
-    runPlannerTestFile("joins",
+    TQueryOptions options = defaultQueryOptions();
+    options.setDisable_hdfs_num_rows_estimate(false);
+    runPlannerTestFile("joins-hdfs-num-rows-est-enabled", options,
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
+  }
+
+  @Test
+  public void testJoinsWithHDFSNumRowsEstDisabled() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setDisable_hdfs_num_rows_estimate(true);
+    runPlannerTestFile("joins", options,
         ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
@@ -242,8 +252,20 @@ public class PlannerTest extends PlannerTestBase {
   @Test
   public void testFkPkJoinDetection() {
     // The FK/PK detection result is included in EXTENDED or higher.
+    TQueryOptions options = defaultQueryOptions();
+    options.setDisable_hdfs_num_rows_estimate(false);
+    runPlannerTestFile("fk-pk-join-detection-hdfs-num-rows-est-enabled",
+        options, ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.VALIDATE_CARDINALITY));
+  }
+
+  @Test
+  public void testFkPkJoinDetectionWithHDFSNumRowsEstDisabled() {
+    // The FK/PK detection result is included in EXTENDED or higher.
+    TQueryOptions options = defaultQueryOptions();
+    options.setDisable_hdfs_num_rows_estimate(true);
     runPlannerTestFile("fk-pk-join-detection",
-        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+        options, ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
             PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
@@ -283,7 +305,16 @@ public class PlannerTest extends PlannerTestBase {
 
   @Test
   public void testSubqueryRewrite() {
-    runPlannerTestFile("subquery-rewrite");
+    TQueryOptions options = defaultQueryOptions();
+    options.setDisable_hdfs_num_rows_estimate(false);
+    runPlannerTestFile("subquery-rewrite-hdfs-num-rows-est-enabled", options);
+  }
+
+  @Test
+  public void testSubqueryRewriteWithHDFSNumRowsEstDisabled() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setDisable_hdfs_num_rows_estimate(true);
+    runPlannerTestFile("subquery-rewrite", options);
   }
 
   @Test
@@ -495,6 +526,24 @@ public class PlannerTest extends PlannerTestBase {
     // throw a NotImplementedException otherwise (e.g. plan has a distributed join).
     TQueryOptions options = defaultQueryOptions();
     options.setMt_dop(3);
+    options.setDisable_hdfs_num_rows_estimate(false);
+    try {
+      // Temporarily unset the test env such that unsupported queries with mt_dop > 0
+      // throw an exception. Those are otherwise allowed for testing parallel plans.
+      RuntimeEnv.INSTANCE.setTestEnv(false);
+      runPlannerTestFile("mt-dop-validation-hdfs-num-rows-est-enabled", options);
+    } finally {
+      RuntimeEnv.INSTANCE.setTestEnv(true);
+    }
+  }
+
+  @Test
+  public void testMtDopValidationWithHDFSNumRowsEstDisabled() {
+    // Tests that queries supported with mt_dop > 0 produce a parallel plan, or
+    // throw a NotImplementedException otherwise (e.g. plan has a distributed join).
+    TQueryOptions options = defaultQueryOptions();
+    options.setMt_dop(3);
+    options.setDisable_hdfs_num_rows_estimate(true);
     try {
       // Temporarily unset the test env such that unsupported queries with mt_dop > 0
       // throw an exception. Those are otherwise allowed for testing parallel plans.
@@ -550,10 +599,11 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
-  public void testResourceRequirements() {
+  public void testResourceRequirementsWithHDFSNumRowsEstDisabled() {
     // Tests the resource requirement computation from the planner.
     TQueryOptions options = defaultQueryOptions();
     options.setNum_scanner_threads(1); // Required so that output doesn't vary by machine
+    options.setDisable_hdfs_num_rows_estimate(true);
     runPlannerTestFile("resource-requirements", options,
         ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
             PlannerTestOption.INCLUDE_EXPLAIN_HEADER,
@@ -561,12 +611,13 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
-  public void testSpillableBufferSizing() {
+  public void testSpillableBufferSizingWithHDFSNumRowsEstDisabled() {
     // Tests the resource requirement computation from the planner when it is allowed to
     // vary the spillable buffer size.
     TQueryOptions options = defaultQueryOptions();
     options.setExplain_level(TExplainLevel.EXTENDED);
     options.setNum_scanner_threads(1); // Required so that output doesn't vary by machine
+    options.setDisable_hdfs_num_rows_estimate(true);
     runPlannerTestFile("spillable-buffer-sizing", options,
         ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
             PlannerTestOption.INCLUDE_EXPLAIN_HEADER,
@@ -613,7 +664,18 @@ public class PlannerTest extends PlannerTestBase {
   public void testDefaultJoinDistributionShuffleMode() {
     TQueryOptions options = defaultQueryOptions();
     options.setDefault_join_distribution_mode(TJoinDistributionMode.SHUFFLE);
-    runPlannerTestFile("default-join-distr-mode-shuffle", options);
+    options.setDisable_hdfs_num_rows_estimate(false);
+    runPlannerTestFile("default-join-distr-mode-shuffle-hdfs-num-rows-est-enabled",
+        options);
+  }
+
+  @Test
+  public void testDefaultJoinDistributionShuffleModeWithHDFSNumRowsEstDisabled() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setDefault_join_distribution_mode(TJoinDistributionMode.SHUFFLE);
+    options.setDisable_hdfs_num_rows_estimate(true);
+    runPlannerTestFile("default-join-distr-mode-shuffle",
+        options);
   }
 
   @Test
@@ -646,6 +708,15 @@ public class PlannerTest extends PlannerTestBase {
   public void testMinMaxRuntimeFilters() {
     TQueryOptions options = defaultQueryOptions();
     options.setExplain_level(TExplainLevel.EXTENDED);
+    options.setDisable_hdfs_num_rows_estimate(false);
+    runPlannerTestFile("min-max-runtime-filters-hdfs-num-rows-est-enabled", options);
+  }
+
+  @Test
+  public void testMinMaxRuntimeFiltersWithHDFSNumRowsEstDisabled() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setExplain_level(TExplainLevel.EXTENDED);
+    options.setDisable_hdfs_num_rows_estimate(true);
     runPlannerTestFile("min-max-runtime-filters", options);
   }
 
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 430c352..5823c54 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -837,7 +837,10 @@ public class PlannerTestBase extends FrontendTestBase {
     //
     // By default, this flag is disabled. So tests will ignore the values of 'HDFS',
     // 'S3', and 'ADLS' in the above explain plan.
-    VALIDATE_SCAN_FS
+    VALIDATE_SCAN_FS,
+    // If set, disables the attempt to compute an estimated number of rows in an
+    // hdfs table.
+    DISABLE_HDFS_NUM_ROWS_ESTIMATE
   }
 
   protected void runPlannerTestFile(String testFile, TQueryOptions options) {
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-shuffle-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-shuffle-hdfs-num-rows-est-enabled.test
new file mode 100644
index 0000000..5ec75f8
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-shuffle-hdfs-num-rows-est-enabled.test
@@ -0,0 +1,76 @@
+# Each tested query in this file involves at least one hdfs table
+# without available statistics.
+# The following is the hdfs table without available statistics:
+# functional.tinytable
+# Both join inputs have an unknown cardinality.
+select /* +straight_join */ * from
+functional.tinytable x inner join functional.tinytable y on x.a = y.a
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: x.a = y.a
+|  runtime filters: RF000 <- y.a
+|  row-size=48B cardinality=2
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.tinytable y]
+|     HDFS partitions=1/1 files=1 size=38B
+|     row-size=24B cardinality=2
+|
+00:SCAN HDFS [functional.tinytable x]
+   HDFS partitions=1/1 files=1 size=38B
+   runtime filters: RF000 -> x.a
+   row-size=24B cardinality=2
+====
+# Left join input has an unknown cardinality.
+select /* +straight_join */ * from
+functional.tinytable x inner join functional.alltypes y on x.a = y.string_col
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: x.a = y.string_col
+|  runtime filters: RF000 <- y.string_col
+|  row-size=113B cardinality=2
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.alltypes y]
+|     HDFS partitions=24/24 files=24 size=478.45KB
+|     row-size=89B cardinality=7.30K
+|
+00:SCAN HDFS [functional.tinytable x]
+   HDFS partitions=1/1 files=1 size=38B
+   runtime filters: RF000 -> x.a
+   row-size=24B cardinality=2
+====
+# Right join input has an unknown cardinality.
+select /* +straight_join */ * from
+functional.alltypes x inner join functional.tinytable y on x.string_col = y.a
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: x.string_col = y.a
+|  runtime filters: RF000 <- y.a
+|  row-size=113B cardinality=7.30K
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.tinytable y]
+|     HDFS partitions=1/1 files=1 size=38B
+|     row-size=24B cardinality=2
+|
+00:SCAN HDFS [functional.alltypes x]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> x.string_col
+   row-size=89B cardinality=7.30K
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection-hdfs-num-rows-est-enabled.test
new file mode 100644
index 0000000..b8263e1
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection-hdfs-num-rows-est-enabled.test
@@ -0,0 +1,86 @@
+# Each tested query in this file involves at least one hdfs table
+# without available statistics.
+# The following are the hdfs tables without available statistics:
+# tpcds_seq_snap.customer
+# tpcds_seq_snap.store_sales
+# Assumed FK/PK join due to missing stats on the rhs. Join cardinality is equal to
+# the lhs cardinality.
+select 1 from
+tpcds.store_sales inner join tpcds_seq_snap.customer
+on ss_customer_sk = c_customer_sk
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=178.94MB mem-reservation=18.94MB thread-reservation=3 runtime-filters-memory=1.00MB
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: ss_customer_sk = c_customer_sk
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF000[bloom] <- c_customer_sk
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=2.88M
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [tpcds_seq_snap.customer]
+|     partitions=1/1 files=1 size=8.58MB
+|     stored statistics:
+|       table: rows=unavailable size=8.58MB
+|       columns: unavailable
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=48.00MB mem-reservation=8.00MB thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=223.80K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpcds.store_sales]
+   partitions=1824/1824 files=1824 size=346.60MB
+   runtime filters: RF000[bloom] -> ss_customer_sk
+   stored statistics:
+     table: rows=2.88M size=346.60MB
+     partitions: 1824/1824 rows=2.88M
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=130.09K
+   mem-estimate=128.00MB mem-reservation=8.00MB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=2.88M
+   in pipelines: 00(GETNEXT)
+====
+# Assumed FK/PK join due to missing stats on the lhs. Join cardinality is unknown.
+select /* +straight_join */ 1 from
+tpcds_seq_snap.store_sales inner join tpcds.customer
+on ss_customer_sk = c_customer_sk
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=178.94MB mem-reservation=18.94MB thread-reservation=3 runtime-filters-memory=1.00MB
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: ss_customer_sk = c_customer_sk
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF000[bloom] <- c_customer_sk
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=7.99M
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [tpcds.customer]
+|     partitions=1/1 files=1 size=12.60MB
+|     stored statistics:
+|       table: rows=100.00K size=12.60MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=100.00K
+|     mem-estimate=48.00MB mem-reservation=8.00MB thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=100.00K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpcds_seq_snap.store_sales]
+   partitions=1824/1824 files=1824 size=212.80MB
+   runtime filters: RF000[bloom] -> ss_customer_sk
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/1824 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=128.00MB mem-reservation=8.00MB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=7.99M
+   in pipelines: 00(GETNEXT)
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/joins-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/joins-hdfs-num-rows-est-enabled.test
new file mode 100644
index 0000000..f711777
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/joins-hdfs-num-rows-est-enabled.test
@@ -0,0 +1,574 @@
+# Each tested query in this file involves at least one hdfs table
+# without available statistics.
+# The following are the hdfs tables without available statistics:
+# functional.testtbl
+# functional.alltypesnopart
+# functional.emptytable
+# functional.decimal_tbl
+select *
+from functional.testtbl t1 join functional.testtbl t2 using(id)
+where t1.zip = 94611
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: t1.id = t2.id
+|  runtime filters: RF000 <- t2.id
+|  row-size=48B cardinality=0
+|
+|--01:SCAN HDFS [functional.testtbl t2]
+|     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
+|
+00:SCAN HDFS [functional.testtbl t1]
+   partitions=1/1 files=0 size=0B
+   predicates: t1.zip = 94611
+   runtime filters: RF000 -> t1.id
+   row-size=24B cardinality=0
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: t1.id = t2.id
+|  runtime filters: RF000 <- t2.id
+|  row-size=48B cardinality=0
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.testtbl t2]
+|     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
+|
+00:SCAN HDFS [functional.testtbl t1]
+   partitions=1/1 files=0 size=0B
+   predicates: t1.zip = 94611
+   runtime filters: RF000 -> t1.id
+   row-size=24B cardinality=0
+====
+# general exprs on both sides of equi-join predicates
+select *
+from functional.testtbl t1 left outer join functional.testtbl t2
+on (t1.id - 1 = t2.id + 1)
+where t1.zip = 94611
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: t1.id - 1 = t2.id + 1
+|  row-size=48B cardinality=0
+|
+|--01:SCAN HDFS [functional.testtbl t2]
+|     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
+|
+00:SCAN HDFS [functional.testtbl t1]
+   partitions=1/1 files=0 size=0B
+   predicates: t1.zip = 94611
+   row-size=24B cardinality=0
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash predicates: t1.id - 1 = t2.id + 1
+|  row-size=48B cardinality=0
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.testtbl t2]
+|     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
+|
+00:SCAN HDFS [functional.testtbl t1]
+   partitions=1/1 files=0 size=0B
+   predicates: t1.zip = 94611
+   row-size=24B cardinality=0
+====
+# left join followed by right join and then aggregate
+select x.tinyint_col, count(x.day)
+from (
+       select a.day day, c.tinyint_col tinyint_col
+       from functional.alltypesagg a
+            join functional.alltypessmall b using (id, int_col)
+            right outer join functional.alltypesnopart c on (b.id = c.id)
+            join functional.alltypesagg d on (a.id = d.id)
+       order by 1,2
+       limit 10
+     ) x
+where x.day >= 6
+group by x.tinyint_col
+order by 2
+limit 5
+---- PLAN
+PLAN-ROOT SINK
+|
+10:TOP-N [LIMIT=5]
+|  order by: count(x.`day`) ASC
+|  row-size=9B cardinality=1
+|
+09:AGGREGATE [FINALIZE]
+|  output: count(day)
+|  group by: tinyint_col
+|  row-size=9B cardinality=1
+|
+08:SELECT
+|  predicates: day >= 6
+|  row-size=5B cardinality=1
+|
+07:TOP-N [LIMIT=10]
+|  order by: day ASC, tinyint_col ASC
+|  row-size=5B cardinality=10
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: d.id = a.id
+|  runtime filters: RF000 <- a.id
+|  row-size=29B cardinality=113
+|
+|--05:HASH JOIN [RIGHT OUTER JOIN]
+|  |  hash predicates: b.id = c.id
+|  |  runtime filters: RF002 <- c.id
+|  |  row-size=25B cardinality=106
+|  |
+|  |--02:SCAN HDFS [functional.alltypesnopart c]
+|  |     partitions=1/1 files=0 size=0B
+|  |     row-size=5B cardinality=0
+|  |
+|  04:HASH JOIN [INNER JOIN]
+|  |  hash predicates: a.id = b.id, a.int_col = b.int_col
+|  |  runtime filters: RF004 <- b.id, RF005 <- b.int_col
+|  |  row-size=20B cardinality=106
+|  |
+|  |--01:SCAN HDFS [functional.alltypessmall b]
+|  |     partitions=4/4 files=4 size=6.32KB
+|  |     runtime filters: RF002 -> b.id
+|  |     row-size=8B cardinality=100
+|  |
+|  00:SCAN HDFS [functional.alltypesagg a]
+|     partitions=11/11 files=11 size=814.73KB
+|     runtime filters: RF002 -> a.id, RF004 -> a.id, RF005 -> a.int_col
+|     row-size=12B cardinality=11.00K
+|
+03:SCAN HDFS [functional.alltypesagg d]
+   partitions=11/11 files=11 size=814.73KB
+   runtime filters: RF000 -> d.id
+   row-size=4B cardinality=11.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+10:TOP-N [LIMIT=5]
+|  order by: count(x.`day`) ASC
+|  row-size=9B cardinality=1
+|
+09:AGGREGATE [FINALIZE]
+|  output: count(day)
+|  group by: tinyint_col
+|  row-size=9B cardinality=1
+|
+08:SELECT
+|  predicates: day >= 6
+|  row-size=5B cardinality=1
+|
+15:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: day ASC, tinyint_col ASC
+|  limit: 10
+|
+07:TOP-N [LIMIT=10]
+|  order by: day ASC, tinyint_col ASC
+|  row-size=5B cardinality=10
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: d.id = a.id
+|  runtime filters: RF000 <- a.id
+|  row-size=29B cardinality=113
+|
+|--14:EXCHANGE [BROADCAST]
+|  |
+|  05:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
+|  |  hash predicates: b.id = c.id
+|  |  runtime filters: RF002 <- c.id
+|  |  row-size=25B cardinality=106
+|  |
+|  |--13:EXCHANGE [HASH(c.id)]
+|  |  |
+|  |  02:SCAN HDFS [functional.alltypesnopart c]
+|  |     partitions=1/1 files=0 size=0B
+|  |     row-size=5B cardinality=0
+|  |
+|  12:EXCHANGE [HASH(b.id)]
+|  |
+|  04:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash predicates: a.id = b.id, a.int_col = b.int_col
+|  |  runtime filters: RF004 <- b.id, RF005 <- b.int_col
+|  |  row-size=20B cardinality=106
+|  |
+|  |--11:EXCHANGE [BROADCAST]
+|  |  |
+|  |  01:SCAN HDFS [functional.alltypessmall b]
+|  |     partitions=4/4 files=4 size=6.32KB
+|  |     runtime filters: RF002 -> b.id
+|  |     row-size=8B cardinality=100
+|  |
+|  00:SCAN HDFS [functional.alltypesagg a]
+|     partitions=11/11 files=11 size=814.73KB
+|     runtime filters: RF002 -> a.id, RF004 -> a.id, RF005 -> a.int_col
+|     row-size=12B cardinality=11.00K
+|
+03:SCAN HDFS [functional.alltypesagg d]
+   partitions=11/11 files=11 size=814.73KB
+   runtime filters: RF000 -> d.id
+   row-size=4B cardinality=11.00K
+====
+# join conjunct is derived from equivalence classes
+# (no explicit join conjunct between t1 and t2)
+select *
+from functional.testtbl t1, functional.testtbl t2, functional.testtbl t3
+where t1.id = t3.id and t2.id = t3.id
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: t1.id = t3.id
+|  runtime filters: RF000 <- t3.id
+|  row-size=72B cardinality=0
+|
+|--02:SCAN HDFS [functional.testtbl t3]
+|     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: t1.id = t2.id
+|  runtime filters: RF002 <- t2.id
+|  row-size=48B cardinality=0
+|
+|--01:SCAN HDFS [functional.testtbl t2]
+|     partitions=1/1 files=0 size=0B
+|     runtime filters: RF000 -> t2.id
+|     row-size=24B cardinality=0
+|
+00:SCAN HDFS [functional.testtbl t1]
+   partitions=1/1 files=0 size=0B
+   runtime filters: RF000 -> t1.id, RF002 -> t1.id
+   row-size=24B cardinality=0
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: t1.id = t3.id
+|  runtime filters: RF000 <- t3.id
+|  row-size=72B cardinality=0
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN HDFS [functional.testtbl t3]
+|     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: t1.id = t2.id
+|  runtime filters: RF002 <- t2.id
+|  row-size=48B cardinality=0
+|
+|--05:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.testtbl t2]
+|     partitions=1/1 files=0 size=0B
+|     runtime filters: RF000 -> t2.id
+|     row-size=24B cardinality=0
+|
+00:SCAN HDFS [functional.testtbl t1]
+   partitions=1/1 files=0 size=0B
+   runtime filters: RF000 -> t1.id, RF002 -> t1.id
+   row-size=24B cardinality=0
+====
+# join involving a table with no table stats (functional.emptytable)
+# tests that the default join strategy is broadcast
+select * from functional.emptytable a inner join
+functional.alltypes b on a.f2 = b.int_col
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: b.int_col = a.f2
+|  runtime filters: RF000 <- a.f2
+|  row-size=105B cardinality=7.30K
+|
+|--00:SCAN HDFS [functional.emptytable a]
+|     partitions=0/0 files=0 size=0B
+|     row-size=16B cardinality=0
+|
+01:SCAN HDFS [functional.alltypes b]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> b.int_col
+   row-size=89B cardinality=7.30K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: b.int_col = a.f2
+|  runtime filters: RF000 <- a.f2
+|  row-size=105B cardinality=7.30K
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN HDFS [functional.emptytable a]
+|     partitions=0/0 files=0 size=0B
+|     row-size=16B cardinality=0
+|
+01:SCAN HDFS [functional.alltypes b]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> b.int_col
+   row-size=89B cardinality=7.30K
+====
+# cross join
+select *
+from functional.testtbl t1 cross join functional.testtbl
+---- PLAN
+PLAN-ROOT SINK
+|
+02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=48B cardinality=0
+|
+|--01:SCAN HDFS [functional.testtbl]
+|     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
+|
+00:SCAN HDFS [functional.testtbl t1]
+   partitions=1/1 files=0 size=0B
+   row-size=24B cardinality=0
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=48B cardinality=0
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.testtbl]
+|     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
+|
+00:SCAN HDFS [functional.testtbl t1]
+   partitions=1/1 files=0 size=0B
+   row-size=24B cardinality=0
+====
+# cross join with where clause
+select *
+from functional.testtbl t1 cross join functional.testtbl t2 where t1.id < t2.id
+---- PLAN
+PLAN-ROOT SINK
+|
+02:NESTED LOOP JOIN [INNER JOIN]
+|  predicates: t1.id < t2.id
+|  row-size=48B cardinality=0
+|
+|--01:SCAN HDFS [functional.testtbl t2]
+|     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
+|
+00:SCAN HDFS [functional.testtbl t1]
+   partitions=1/1 files=0 size=0B
+   row-size=24B cardinality=0
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:NESTED LOOP JOIN [INNER JOIN, BROADCAST]
+|  predicates: t1.id < t2.id
+|  row-size=48B cardinality=0
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.testtbl t2]
+|     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
+|
+00:SCAN HDFS [functional.testtbl t1]
+   partitions=1/1 files=0 size=0B
+   row-size=24B cardinality=0
+====
+# Test joins with decimals with different precision and scale
+# Regression test for IMPALA-1121
+select straight_join count(*)
+from functional.decimal_tbl a join functional.decimal_tbl b on a.d1 = b.d5
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.d1 = b.d5
+|  runtime filters: RF000 <- b.d5
+|  row-size=12B cardinality=3
+|
+|--01:SCAN HDFS [functional.decimal_tbl b]
+|     HDFS partitions=1/1 files=1 size=195B
+|     row-size=8B cardinality=3
+|
+00:SCAN HDFS [functional.decimal_tbl a]
+   partitions=1/1 files=1 size=195B
+   runtime filters: RF000 -> a.d1
+   row-size=4B cardinality=3
+====
+# Test queries that appear earlier in this file, but substitute "<=>" or "IS DISTINCT
+# FROM" for "=" in the join predicates.
+select *
+from functional.testtbl t1 join functional.testtbl t2
+where t1.id <=> t2.id and t1.zip = 94611
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: t1.id IS NOT DISTINCT FROM t2.id
+|  runtime filters: RF000 <- t2.id
+|  row-size=48B cardinality=0
+|
+|--01:SCAN HDFS [functional.testtbl t2]
+|     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
+|
+00:SCAN HDFS [functional.testtbl t1]
+   partitions=1/1 files=0 size=0B
+   predicates: t1.zip = 94611
+   runtime filters: RF000 -> t1.id
+   row-size=24B cardinality=0
+====
+select *
+from functional.testtbl t1 join functional.testtbl t2
+where t1.id is not distinct from t2.id and t1.zip = 94611
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: t1.id IS NOT DISTINCT FROM t2.id
+|  runtime filters: RF000 <- t2.id
+|  row-size=48B cardinality=0
+|
+|--01:SCAN HDFS [functional.testtbl t2]
+|     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
+|
+00:SCAN HDFS [functional.testtbl t1]
+   partitions=1/1 files=0 size=0B
+   predicates: t1.zip = 94611
+   runtime filters: RF000 -> t1.id
+   row-size=24B cardinality=0
+====
+select *
+from functional.testtbl t1 join functional.testtbl t2
+where (t1.id IS DISTINCT FROM t2.id) and t1.zip = 94611
+---- PLAN
+PLAN-ROOT SINK
+|
+02:NESTED LOOP JOIN [INNER JOIN]
+|  predicates: (t1.id IS DISTINCT FROM t2.id)
+|  row-size=48B cardinality=0
+|
+|--01:SCAN HDFS [functional.testtbl t2]
+|     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
+|
+00:SCAN HDFS [functional.testtbl t1]
+   partitions=1/1 files=0 size=0B
+   predicates: t1.zip = 94611
+   row-size=24B cardinality=0
+====
+# join involving tables with no table stats
+# one of the tables (alltypes) is a compressed text file
+# tests that the default join strategy is broadcast
+select * from functional_text_gzip.emptytable a inner join
+functional_text_gzip.alltypes b on a.f2 = b.int_col
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: b.int_col = a.f2
+|  runtime filters: RF000 <- a.f2
+|  row-size=96B cardinality=3.57K
+|
+|--00:SCAN HDFS [functional_text_gzip.emptytable a]
+|     partitions=0/0 files=0 size=0B
+|     row-size=16B cardinality=0
+|
+01:SCAN HDFS [functional_text_gzip.alltypes b]
+   HDFS partitions=24/24 files=24 size=77.88KB
+   runtime filters: RF000 -> b.int_col
+   row-size=80B cardinality=3.57K
+====
+# join involving tables with no table stats
+# one of the tables (alltypes) is a compressed text file
+# tests that the default join strategy is broadcast
+select * from functional_text_bzip.emptytable a inner join
+functional_text_bzip.alltypes b on a.f2 = b.int_col
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: b.int_col = a.f2
+|  runtime filters: RF000 <- a.f2
+|  row-size=96B cardinality=2.58K
+|
+|--00:SCAN HDFS [functional_text_bzip.emptytable a]
+|     partitions=0/0 files=0 size=0B
+|     row-size=16B cardinality=0
+|
+01:SCAN HDFS [functional_text_bzip.alltypes b]
+   HDFS partitions=24/24 files=24 size=56.23KB
+   runtime filters: RF000 -> b.int_col
+   row-size=80B cardinality=2.58K
+====
+# join involving tables with no table stats
+# one of the tables (alltypes) is a compressed text file
+# tests that the default join strategy is broadcast
+select * from functional_text_lzo.emptytable a inner join
+functional_text_lzo.alltypes b on a.f2 = b.int_col
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: b.int_col = a.f2
+|  runtime filters: RF000 <- a.f2
+|  row-size=96B cardinality=5.65K
+|
+|--00:SCAN HDFS [functional_text_lzo.emptytable a]
+|     partitions=0/0 files=0 size=0B
+|     row-size=16B cardinality=0
+|
+01:SCAN HDFS [functional_text_lzo.alltypes b]
+   HDFS partitions=24/24 files=24 size=123.32KB
+   runtime filters: RF000 -> b.int_col
+   row-size=80B cardinality=5.65K
+====
+# join involving tables with no table stats
+# one of the tables (alltypes) is a compressed text file
+# tests that the default join strategy is broadcast
+select * from functional_text_snap.emptytable a inner join
+functional_text_snap.alltypes b on a.f2 = b.int_col
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: b.int_col = a.f2
+|  runtime filters: RF000 <- a.f2
+|  row-size=96B cardinality=5.55K
+|
+|--00:SCAN HDFS [functional_text_snap.emptytable a]
+|     partitions=0/0 files=0 size=0B
+|     row-size=16B cardinality=0
+|
+01:SCAN HDFS [functional_text_snap.alltypes b]
+   HDFS partitions=24/24 files=24 size=121.15KB
+   runtime filters: RF000 -> b.int_col
+   row-size=80B cardinality=5.55K
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
index 1321c6a..944ddf0 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
@@ -3009,3 +3009,91 @@ PLAN-ROOT SINK
    constant-operands=1
    row-size=16B cardinality=1
 ====
+# join involving tables with no table stats
+# one of the tables (alltypes) is a compressed text file
+# tests that the default join strategy is broadcast
+select * from functional_text_gzip.emptytable a inner join
+functional_text_gzip.alltypes b on a.f2 = b.int_col
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.f2 = b.int_col
+|  runtime filters: RF000 <- b.int_col
+|  row-size=96B cardinality=0
+|
+|--01:SCAN HDFS [functional_text_gzip.alltypes b]
+|     HDFS partitions=24/24 files=24 size=77.88KB
+|     row-size=80B cardinality=unavailable
+|
+00:SCAN HDFS [functional_text_gzip.emptytable a]
+   partitions=0/0 files=0 size=0B
+   runtime filters: RF000 -> a.f2
+   row-size=16B cardinality=0
+====
+# join involving tables with no table stats
+# one of the tables (alltypes) is a compressed text file
+# tests that the default join strategy is broadcast
+select * from functional_text_bzip.emptytable a inner join
+functional_text_bzip.alltypes b on a.f2 = b.int_col
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.f2 = b.int_col
+|  runtime filters: RF000 <- b.int_col
+|  row-size=96B cardinality=0
+|
+|--01:SCAN HDFS [functional_text_bzip.alltypes b]
+|     HDFS partitions=24/24 files=24 size=56.23KB
+|     row-size=80B cardinality=unavailable
+|
+00:SCAN HDFS [functional_text_bzip.emptytable a]
+   partitions=0/0 files=0 size=0B
+   runtime filters: RF000 -> a.f2
+   row-size=16B cardinality=0
+====
+# join involving tables with no table stats
+# one of the tables (alltypes) is a compressed text file
+# tests that the default join strategy is broadcast
+select * from functional_text_lzo.emptytable a inner join
+functional_text_lzo.alltypes b on a.f2 = b.int_col
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.f2 = b.int_col
+|  runtime filters: RF000 <- b.int_col
+|  row-size=96B cardinality=0
+|
+|--01:SCAN HDFS [functional_text_lzo.alltypes b]
+|     HDFS partitions=24/24 files=24 size=123.32KB
+|     row-size=80B cardinality=unavailable
+|
+00:SCAN HDFS [functional_text_lzo.emptytable a]
+   partitions=0/0 files=0 size=0B
+   runtime filters: RF000 -> a.f2
+   row-size=16B cardinality=0
+====
+# join involving tables with no table stats
+# one of the tables (alltypes) is a compressed text file
+# tests that the default join strategy is broadcast
+select * from functional_text_snap.emptytable a inner join
+functional_text_snap.alltypes b on a.f2 = b.int_col
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.f2 = b.int_col
+|  runtime filters: RF000 <- b.int_col
+|  row-size=96B cardinality=0
+|
+|--01:SCAN HDFS [functional_text_snap.alltypes b]
+|     HDFS partitions=24/24 files=24 size=121.15KB
+|     row-size=80B cardinality=unavailable
+|
+00:SCAN HDFS [functional_text_snap.emptytable a]
+   partitions=0/0 files=0 size=0B
+   runtime filters: RF000 -> a.f2
+   row-size=16B cardinality=0
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
new file mode 100644
index 0000000..6a1d531
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
@@ -0,0 +1,59 @@
+# Each tested query in this file involves at least one hdfs table
+# without available statistics.
+# The following are the hdfs tables without available statistics:
+# functional_parquet.alltypes
+# Query with both Kudu and HDFS filter targets.
+select count(*) from functional_kudu.alltypes a, functional_parquet.alltypes b,
+    functional_kudu.alltypes c
+where a.int_col = b.int_col and a.int_col = c.int_col
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=31.88MB mem-reservation=5.89MB thread-reservation=4 runtime-filters-memory=2.00MB
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+05:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=3 row-size=8B cardinality=1
+|  in pipelines: 05(GETNEXT), 01(OPEN)
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = c.int_col
+|  fk/pk conjuncts: none
+|  runtime filters: RF000[bloom] <- c.int_col, RF001[min_max] <- c.int_col
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=1,0,2 row-size=12B cardinality=9.31M
+|  in pipelines: 01(GETNEXT), 02(OPEN)
+|
+|--02:SCAN KUDU [functional_kudu.alltypes c]
+|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     tuple-ids=2 row-size=4B cardinality=7.30K
+|     in pipelines: 02(GETNEXT)
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: b.int_col = a.int_col
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF002[bloom] <- a.int_col
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=1,0 row-size=8B cardinality=12.75K
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+|--00:SCAN KUDU [functional_kudu.alltypes a]
+|     runtime filters: RF001[min_max] -> a.int_col
+|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     tuple-ids=0 row-size=4B cardinality=7.30K
+|     in pipelines: 00(GETNEXT)
+|
+01:SCAN HDFS [functional_parquet.alltypes b]
+   HDFS partitions=24/24 files=24 size=200.43KB
+   runtime filters: RF000[bloom] -> b.int_col, RF002[bloom] -> b.int_col
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=1 row-size=4B cardinality=12.75K
+   in pipelines: 01(GETNEXT)
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation-hdfs-num-rows-est-enabled.test
new file mode 100644
index 0000000..cc56083
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation-hdfs-num-rows-est-enabled.test
@@ -0,0 +1,281 @@
+# Each tested query in this file involves at least one hdfs table
+# without available statistics.
+# The following are the hdfs tables without available statistics:
+# functional_parquet.alltypestiny
+# functional_parquet.alltypes
+# Distributed nested-loop join not allowed.
+select count(*) from
+functional_parquet.alltypestiny a,
+functional_parquet.alltypestiny b
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=42.00MB mem-reservation=16.00KB thread-reservation=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:NESTED LOOP JOIN [CROSS JOIN]
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  tuple-ids=0,1 row-size=0B cardinality=550.56K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [functional_parquet.alltypestiny b]
+|     HDFS partitions=4/4 files=4 size=11.67KB
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/4 rows=unavailable
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=0
+|     tuple-ids=1 row-size=0B cardinality=742
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [functional_parquet.alltypestiny a]
+   HDFS partitions=4/4 files=4 size=11.67KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/4 rows=unavailable
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=0
+   tuple-ids=0 row-size=0B cardinality=742
+   in pipelines: 00(GETNEXT)
+---- PARALLELPLANS
+NotImplementedException: MT_DOP not supported for plans with base table joins or table sinks.
+====
+# Distributed hash-join not allowed.
+select count(*) from
+functional_parquet.alltypestiny a,
+functional_parquet.alltypestiny b
+where a.id = b.id
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=28.94MB mem-reservation=2.95MB thread-reservation=1 runtime-filters-memory=1.00MB
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF000[bloom] <- b.id
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=742
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [functional_parquet.alltypestiny b]
+|     HDFS partitions=4/4 files=4 size=11.67KB
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/4 rows=unavailable
+|       columns: unavailable
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=0
+|     tuple-ids=1 row-size=4B cardinality=742
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [functional_parquet.alltypestiny a]
+   HDFS partitions=4/4 files=4 size=11.67KB
+   runtime filters: RF000[bloom] -> a.id
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/4 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=0
+   tuple-ids=0 row-size=4B cardinality=742
+   in pipelines: 00(GETNEXT)
+---- PARALLELPLANS
+NotImplementedException: MT_DOP not supported for plans with base table joins or table sinks.
+====
+# Single-table scan/filter/agg/topn should work.
+select count(int_col) cnt from functional_parquet.alltypes
+where id < 10
+group by bigint_col
+order by cnt, bigint_col
+limit 10
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=144.00MB mem-reservation=34.02MB thread-reservation=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+02:TOP-N [LIMIT=10]
+|  order by: count(int_col) ASC, bigint_col ASC
+|  mem-estimate=160B mem-reservation=0B thread-reservation=0
+|  tuple-ids=2 row-size=16B cardinality=10
+|  in pipelines: 02(GETNEXT), 01(OPEN)
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(int_col)
+|  group by: bigint_col
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=16B cardinality=1.27K
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   HDFS partitions=24/24 files=24 size=200.43KB
+   predicates: id < CAST(10 AS INT)
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   parquet statistics predicates: id < CAST(10 AS INT)
+   parquet dictionary predicates: id < CAST(10 AS INT)
+   mem-estimate=16.00MB mem-reservation=24.00KB thread-reservation=0
+   tuple-ids=0 row-size=16B cardinality=1.27K
+   in pipelines: 00(GETNEXT)
+---- PARALLELPLANS
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+05:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: count(int_col) ASC, bigint_col ASC
+|  limit: 10
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=2 row-size=16B cardinality=10
+|  in pipelines: 02(GETNEXT)
+|
+F01:PLAN FRAGMENT [HASH(bigint_col)] hosts=3 instances=9
+Per-Host Resources: mem-estimate=384.55MB mem-reservation=102.00MB thread-reservation=3
+02:TOP-N [LIMIT=10]
+|  order by: count(int_col) ASC, bigint_col ASC
+|  mem-estimate=160B mem-reservation=0B thread-reservation=0
+|  tuple-ids=2 row-size=16B cardinality=10
+|  in pipelines: 02(GETNEXT), 04(OPEN)
+|
+04:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
+|  group by: bigint_col
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=16B cardinality=1.27K
+|  in pipelines: 04(GETNEXT), 00(OPEN)
+|
+03:EXCHANGE [HASH(bigint_col)]
+|  mem-estimate=186.64KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=16B cardinality=1.27K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
+Per-Host Resources: mem-estimate=432.00MB mem-reservation=102.07MB thread-reservation=3
+01:AGGREGATE [STREAMING]
+|  output: count(int_col)
+|  group by: bigint_col
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=16B cardinality=1.27K
+|  in pipelines: 00(GETNEXT)
+|
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=200.43KB
+   predicates: id < CAST(10 AS INT)
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   parquet statistics predicates: id < CAST(10 AS INT)
+   parquet dictionary predicates: id < CAST(10 AS INT)
+   mem-estimate=16.00MB mem-reservation=24.00KB thread-reservation=0
+   tuple-ids=0 row-size=16B cardinality=1.27K
+   in pipelines: 00(GETNEXT)
+====
+# Single-table scan/filter/analytic should work.
+select row_number() over(partition by int_col order by id)
+from functional_parquet.alltypes
+where id < 10
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=26.00MB mem-reservation=10.02MB thread-reservation=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+02:ANALYTIC
+|  functions: row_number()
+|  partition by: int_col
+|  order by: id ASC
+|  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=4,3 row-size=16B cardinality=1.27K
+|  in pipelines: 01(GETNEXT)
+|
+01:SORT
+|  order by: int_col ASC NULLS FIRST, id ASC
+|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=4 row-size=8B cardinality=1.27K
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   HDFS partitions=24/24 files=24 size=200.43KB
+   predicates: id < CAST(10 AS INT)
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   parquet statistics predicates: id < CAST(10 AS INT)
+   parquet dictionary predicates: id < CAST(10 AS INT)
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+   tuple-ids=0 row-size=8B cardinality=1.27K
+   in pipelines: 00(GETNEXT)
+---- PARALLELPLANS
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=222.64KB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=222.64KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=4,3 row-size=16B cardinality=1.27K
+|  in pipelines: 01(GETNEXT)
+|
+F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=9
+Per-Host Resources: mem-estimate=30.33MB mem-reservation=30.00MB thread-reservation=3
+02:ANALYTIC
+|  functions: row_number()
+|  partition by: int_col
+|  order by: id ASC
+|  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=4,3 row-size=16B cardinality=1.27K
+|  in pipelines: 01(GETNEXT)
+|
+01:SORT
+|  order by: int_col ASC NULLS FIRST, id ASC
+|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=4 row-size=8B cardinality=1.27K
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+03:EXCHANGE [HASH(int_col)]
+|  mem-estimate=111.32KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=8B cardinality=1.27K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
+Per-Host Resources: mem-estimate=48.00MB mem-reservation=48.00KB thread-reservation=3
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=200.43KB
+   predicates: id < CAST(10 AS INT)
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   parquet statistics predicates: id < CAST(10 AS INT)
+   parquet dictionary predicates: id < CAST(10 AS INT)
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+   tuple-ids=0 row-size=8B cardinality=1.27K
+   in pipelines: 00(GETNEXT)
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
index 3283ddd..2b7bfd3 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
@@ -527,7 +527,7 @@ Per-Host Resources: mem-estimate=2.02GB mem-reservation=34.09MB thread-reservati
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
 |       partitions: 0/4 rows=unavailable
-|       columns: unavailable
+|       columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=1
 |     tuple-ids=1 row-size=80B cardinality=unavailable
@@ -590,7 +590,7 @@ Per-Host Resources: mem-estimate=4.03GB mem-reservation=68.17MB thread-reservati
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
 |       partitions: 0/4 rows=unavailable
-|       columns: unavailable
+|       columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=0
 |     tuple-ids=1 row-size=80B cardinality=unavailable
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite-hdfs-num-rows-est-enabled.test
new file mode 100644
index 0000000..e49e9d2
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite-hdfs-num-rows-est-enabled.test
@@ -0,0 +1,31 @@
+# Each tested query in this file involves at least one hdfs table
+# without available statistics.
+# The following is the hdfs table without available statistics:
+# functional.tinyinttable
+# Constant on LHS of IN for nested subqueries (no correlation)
+select * from functional.alltypes t where 1 in
+(select int_col from functional.tinyinttable where
+ 1 in (select int_col from functional.alltypestiny))
+---- PLAN
+PLAN-ROOT SINK
+|
+04:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|  row-size=89B cardinality=7.30K
+|
+|--03:NESTED LOOP JOIN [RIGHT SEMI JOIN]
+|  |  row-size=4B cardinality=1
+|  |
+|  |--01:SCAN HDFS [functional.tinyinttable]
+|  |     partitions=1/1 files=1 size=20B
+|  |     predicates: 1 = functional.tinyinttable.int_col
+|  |     row-size=4B cardinality=1
+|  |
+|  02:SCAN HDFS [functional.alltypestiny]
+|     partitions=4/4 files=4 size=460B
+|     predicates: 1 = functional.alltypestiny.int_col
+|     row-size=4B cardinality=4
+|
+00:SCAN HDFS [functional.alltypes t]
+   partitions=24/24 files=24 size=478.45KB
+   row-size=89B cardinality=7.30K
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test b/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
index 5a23ee9..4da1ac1 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
@@ -47,6 +47,20 @@ set num_scanner_threads=2;
 select * from functional_parquet.alltypes A, functional_parquet.alltypes B where
  A.int_col = B.int_col limit 1;
 ---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=67MB.*
+row_regex: .*Cluster Memory Admitted: 66.94 MB.*
+====
+---- QUERY
+# No mem_limit set
+# Upper bound enforced by pool.max_query_mem_limit
+set request_pool=regularPool;
+# set this to make estimates deterministic.
+set num_scanner_threads=2;
+# Disable the estimation of cardinality for an hdfs table withot stats.
+set DISABLE_HDFS_NUM_ROWS_ESTIMATE=1;
+select * from functional_parquet.alltypes A, functional_parquet.alltypes B where
+ A.int_col = B.int_col limit 1;
+---- RUNTIME_PROFILE
 row_regex: .*Per-Host Resource Estimates: Memory=2.06GB.*
 row_regex: .*Cluster Memory Admitted: 1.50 GB.*
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
index 9fc57ea..887001e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
@@ -66,6 +66,20 @@ from functional_avro.alltypes t1
   left outer join functional_avro.alltypes t3 on (t2.id = t3.id)
 where t1.month = 1 and t2.year = 2009 and t3.bool_col = false
 ---- RESULTS: VERIFY_IS_SUBSET
+'Per-Host Resource Estimates: Memory=74MB'
+'WARNING: The following tables are missing relevant table and/or column statistics.'
+'functional_avro.alltypes, functional_parquet.alltypessmall'
+====
+---- QUERY
+# Tests the warning about missing table stats in the explain header.
+# Disable the estimation of cardinality for an hdfs table withot stats.
+set DISABLE_HDFS_NUM_ROWS_ESTIMATE=1;
+explain select count(t1.int_col), avg(t2.float_col), sum(t3.bigint_col)
+from functional_avro.alltypes t1
+  inner join functional_parquet.alltypessmall t2 on (t1.id = t2.id)
+  left outer join functional_avro.alltypes t3 on (t2.id = t3.id)
+where t1.month = 1 and t2.year = 2009 and t3.bool_col = false
+---- RESULTS: VERIFY_IS_SUBSET
 'Per-Host Resource Estimates: Memory=4.07GB'
 'WARNING: The following tables are missing relevant table and/or column statistics.'
 'functional_avro.alltypes, functional_parquet.alltypessmall'
diff --git a/testdata/workloads/functional-query/queries/QueryTest/inline-view.test b/testdata/workloads/functional-query/queries/QueryTest/inline-view.test
index 49b2e91..7a2bfae 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/inline-view.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/inline-view.test
@@ -145,6 +145,37 @@ from (
   ) t1
 join alltypes t2 on (t1.int_col = t2.id)
 where month = 1
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+0,3,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+1,3,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+2,3,2,true,2,2,2,20,2.200000047683716,20.2,'01/01/09','2',2009-01-01 00:02:00.100000000,2009,1
+3,3,3,false,3,3,3,30,3.299999952316284,30.3,'01/01/09','3',2009-01-01 00:03:00.300000000,2009,1
+4,3,4,true,4,4,4,40,4.400000095367432,40.4,'01/01/09','4',2009-01-01 00:04:00.600000000,2009,1
+---- DBAPI_RESULTS: VERIFY_IS_EQUAL_SORTED
+0,3,0,True,0,0,0,0,0.0,0.0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+1,3,1,False,1,1,1,10,1.10000002384,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+2,3,2,True,2,2,2,20,2.20000004768,20.2,'01/01/09','2',2009-01-01 00:02:00.100000,2009,1
+3,3,3,False,3,3,3,30,3.29999995232,30.3,'01/01/09','3',2009-01-01 00:03:00.300000,2009,1
+4,3,4,True,4,4,4,40,4.40000009537,40.4,'01/01/09','4',2009-01-01 00:04:00.600000,2009,1
+---- TYPES
+int, bigint, int, boolean, tinyint, smallint, int, bigint, float, double, string, string, timestamp, int, int
+====
+---- QUERY
+# subquery with aggregation and order by/limit, as left-hand side of join;
+# having clause in subquery is transfered to merge agg step in distrib plan
+# Disable the estimation of cardinality for an hdfs table withot stats.
+set DISABLE_HDFS_NUM_ROWS_ESTIMATE=1;
+select *
+from (
+  select int_col, count(*)
+  from alltypessmall
+  where month = 1
+  group by int_col
+  having count(*) > 2
+  order by count(*) desc, int_col limit 5
+  ) t1
+join alltypes t2 on (t1.int_col = t2.id)
+where month = 1
 ---- RESULTS
 0,3,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
 1,3,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
index ff13999..1365ef8 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
@@ -400,6 +400,19 @@ SET BUFFER_POOL_LIMIT=290MB;
 select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b
     on a.month = b.id and b.int_col = -3
 ---- RESULTS
+====
+---- QUERY
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+SET RUNTIME_FILTER_MIN_SIZE=128MB;
+SET RUNTIME_FILTER_MAX_SIZE=500MB;
+# Disable the estimation of cardinality for an hdfs table withot stats.
+SET DISABLE_HDFS_NUM_ROWS_ESTIMATE=1;
+# Query would have been admitted if memory for runtime filters was not accounted for.
+SET BUFFER_POOL_LIMIT=290MB;
+select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b
+    on a.month = b.id and b.int_col = -3
+---- RESULTS
 ---- CATCH
 row_regex:.*minimum memory reservation on backend '.*' is greater than memory available to
  the query for buffer reservations\. Increase the buffer_pool_limit to 290.17 MB\. See
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test
index 31ab8ef..1a5f633 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -26,6 +26,7 @@ set all;
 'REQUEST_POOL','','REGULAR'
 'SYNC_DDL','0','REGULAR'
 'DEFAULT_FILE_FORMAT','TEXT','REGULAR'
+'DISABLE_HDFS_NUM_ROWS_ESTIMATE','0','REGULAR'
 ---- TYPES
 STRING, STRING, STRING
 ====
@@ -52,6 +53,7 @@ set all;
 'REQUEST_POOL','','REGULAR'
 'SYNC_DDL','0','REGULAR'
 'DEFAULT_FILE_FORMAT','TEXT','REGULAR'
+'DISABLE_HDFS_NUM_ROWS_ESTIMATE','0','REGULAR'
 ---- TYPES
 STRING, STRING, STRING
 ====
@@ -78,6 +80,7 @@ set all;
 'REQUEST_POOL','','REGULAR'
 'SYNC_DDL','0','REGULAR'
 'DEFAULT_FILE_FORMAT','TEXT','REGULAR'
+'DISABLE_HDFS_NUM_ROWS_ESTIMATE','0','REGULAR'
 ---- TYPES
 STRING, STRING, STRING
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
index 1ae0ea7..6183f91 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
@@ -7,7 +7,20 @@ insert into alltypes partition(year, month)
 select * from functional_parquet.alltypes where year = 2009;
 ====
 ---- QUERY
-# No stats are available.
+explain select id from alltypes;
+---- RESULTS: VERIFY_IS_SUBSET
+'   stored statistics:'
+'     table: rows=unavailable size=unavailable'
+'     partitions: 0/12 rows=unavailable'
+'     columns: unavailable'
+row_regex:.* extrapolated-rows=unavailable.*
+'   tuple-ids=0 row-size=4B cardinality=5.92K'
+---- TYPES
+STRING
+====
+---- QUERY
+# Disable the estimation of cardinality for an hdfs table withot stats.
+SET DISABLE_HDFS_NUM_ROWS_ESTIMATE=1;
 explain select id from alltypes;
 ---- RESULTS: VERIFY_IS_SUBSET
 '   stored statistics:'
@@ -203,8 +216,24 @@ explain select id from alltypes;
 '     partitions: 0/24 rows=unavailable'
 '     columns: unavailable'
 row_regex:.* extrapolated-rows=unavailable.*
-'   tuple-ids=0 row-size=4B cardinality=unavailable'
+'   tuple-ids=0 row-size=4B cardinality=17.76K'
 '   in pipelines: 00(GETNEXT)'
 ---- TYPES
 STRING
 ====
+---- QUERY
+# Test that dropping stats resets everything.
+SET DISABLE_HDFS_NUM_ROWS_ESTIMATE=1;
+drop stats alltypes;
+explain select id from alltypes;
+---- RESULTS: VERIFY_IS_SUBSET
+'   stored statistics:'
+'     table: rows=unavailable size=unavailable'
+'     partitions: 0/24 rows=unavailable'
+'     columns: unavailable'
+row_regex:.* extrapolated-rows=unavailable.*
+'   tuple-ids=0 row-size=4B cardinality=unavailable'
+'   in pipelines: 00(GETNEXT)'
+---- TYPES
+STRING
+====
\ No newline at end of file


[impala] 01/04: IMPALA-8443: Record time spent in authorization in the runtime profile

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 65175a23b2ed73d91734607d474ba80e15323c9b
Author: Tamas Mate <tm...@cloudera.com>
AuthorDate: Mon Jun 17 08:30:27 2019 -0700

    IMPALA-8443: Record time spent in authorization in the runtime profile
    
    The analysis and authorization is handled together as authorization
    depends on the results of the analysis. The timeline EventSequence is
    moved into the AuthorizationContext and the markEvent is called during
    the postAuthorize method.
    
    In some cases the EventSequence is not available when the
    AuthorizationContext is created therefore it is wrapped in Optional.
    
    Change-Id: I5bb85e57fcc75d41f3eb2911e6d375e0da6f82ae
    Reviewed-on: http://gerrit.cloudera.org:8080/13353
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/analysis/AnalysisContext.java   |  4 +++-
 .../impala/authorization/AuthorizationChecker.java    |  6 +++++-
 .../impala/authorization/AuthorizationContext.java    | 15 ++++++++++++---
 .../authorization/BaseAuthorizationChecker.java       | 19 +++++++++++++------
 .../authorization/NoopAuthorizationFactory.java       |  6 ++++--
 .../ranger/RangerAuthorizationChecker.java            | 10 +++++++---
 .../ranger/RangerAuthorizationContext.java            |  6 +++++-
 .../sentry/SentryAuthorizationChecker.java            |  6 ++++--
 .../main/java/org/apache/impala/service/Frontend.java |  3 +--
 .../java/org/apache/impala/util/EventSequence.java    |  7 +++++--
 .../org/apache/impala/common/FrontendTestBase.java    |  7 +++++--
 tests/observability/test_log_fragments.py             |  3 ++-
 tests/query_test/test_observability.py                | 19 ++++++++++++++++++-
 13 files changed, 84 insertions(+), 27 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index 1c1e059..2353bfc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -21,6 +21,7 @@ import static org.apache.impala.analysis.ToSqlOptions.REWRITTEN;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache;
@@ -415,6 +416,7 @@ public class AnalysisContext {
     } catch (AnalysisException e) {
       analysisException = e;
     }
+    timeline_.markEvent("Analysis finished");
 
     // Authorize statement and record exception. Authorization relies on information
     // collected during analysis.
@@ -425,7 +427,7 @@ public class AnalysisContext {
       authzCtx = authzChecker.createAuthorizationContext(true,
           clientRequest.isSetRedacted_stmt() ?
               clientRequest.getRedacted_stmt() : clientRequest.getStmt(),
-          queryCtx_.getSession());
+          queryCtx_.getSession(), Optional.of(timeline_));
       authzChecker.authorize(authzCtx, analysisResult_, catalog_);
     } catch (AuthorizationException e) {
       authException = e;
diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
index e582d2a..8038913 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
@@ -21,7 +21,9 @@ import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
 import org.apache.impala.catalog.FeCatalog;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.thrift.TSessionState;
+import org.apache.impala.util.EventSequence;
 
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -42,9 +44,11 @@ public interface AuthorizationChecker {
    * @param doAudits a flag whether or not to do the audits
    * @param sqlStmt the SQL statement to be logged for auditing
    * @param sessionState the client session state
+   * @param timeline optional timeline to mark events in the query profile
    */
   AuthorizationContext createAuthorizationContext(boolean doAudits, String sqlStmt,
-      TSessionState sessionState) throws InternalException;
+      TSessionState sessionState, Optional<EventSequence> timeline)
+      throws InternalException;
 
   /**
    * Authorize an analyzed statement.
diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizationContext.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizationContext.java
index 3ed59ce..1930dc2 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizationContext.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizationContext.java
@@ -17,14 +17,23 @@
 
 package org.apache.impala.authorization;
 
+import org.apache.impala.util.EventSequence;
+
+import java.util.Optional;
+
 /**
  * An authorization context class that is created per authorization check.
  */
 public class AuthorizationContext {
-  private final long startTime_ = System.currentTimeMillis();
+  private final Optional<EventSequence> timeline_;
+
+  public AuthorizationContext(Optional<EventSequence> timeline) {
+    this.timeline_ = timeline;
+  }
 
   /**
-   * Gets the start time when the authorization check started.
+   * Gets the timeline which can be used to mark events in the query profile.
    */
-  public long getStartTime() { return startTime_; }
+  public Optional<EventSequence> getTimeline() { return timeline_; }
+
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
index 255a184..2104ed7 100644
--- a/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
@@ -26,6 +26,7 @@ import org.apache.impala.catalog.FeCatalog;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.util.EventSequence;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +36,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Optional;
 
 /**
  * A base class for the {@link AuthorizationChecker}.
@@ -60,11 +62,12 @@ public abstract class BaseAuthorizationChecker implements AuthorizationChecker {
    */
   @Override
   public boolean hasAccess(User user, PrivilegeRequest request) throws InternalException {
-    // We don't want to do an audit log here. This method is used by "show databases",
-    // "show tables", "describe" to filter out unauthorized database, table, or column
-    // names.
+    // We don't want to do an audit log or profile events logged here. This method is used
+    // by "show databases", "show tables", "describe" to filter out unauthorized database,
+    // table, or column names.
     return hasAccess(createAuthorizationContext(false /*no audit log*/,
-        null /*no SQL statement*/, null /*no session state*/), user, request);
+        null /*no SQL statement*/, null /*no session state*/,
+        Optional.empty()), user, request);
   }
 
   private boolean hasAccess(AuthorizationContext authzCtx, User user,
@@ -86,8 +89,12 @@ public abstract class BaseAuthorizationChecker implements AuthorizationChecker {
    */
   @Override
   public void postAuthorize(AuthorizationContext authzCtx) {
-    long durationMs = System.currentTimeMillis() - authzCtx.getStartTime();
-    LOG.debug("Authorization check took {} ms", durationMs);
+    if (authzCtx.getTimeline().isPresent()) {
+      EventSequence timeline = authzCtx.getTimeline().get();
+      long durationMs = timeline.markEvent(String.format("Authorization finished (%s)",
+          config_.getProviderName())) / 1000;
+      LOG.debug("Authorization check took {} ms", durationMs);
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java b/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
index 2a371ea..33f2414 100644
--- a/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
@@ -36,9 +36,11 @@ import org.apache.impala.thrift.TShowGrantPrincipalParams;
 import org.apache.impala.thrift.TShowRolesParams;
 import org.apache.impala.thrift.TShowRolesResult;
 import org.apache.impala.util.ClassUtil;
+import org.apache.impala.util.EventSequence;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Supplier;
 
@@ -207,8 +209,8 @@ public class NoopAuthorizationFactory implements AuthorizationFactory {
 
       @Override
       public AuthorizationContext createAuthorizationContext(boolean doAudits,
-          String sqlStmt, TSessionState sessionState) {
-        return new AuthorizationContext();
+          String sqlStmt, TSessionState sessionState, Optional<EventSequence> timeline) {
+        return new AuthorizationContext(timeline);
       }
     };
   }
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
index 13e8261..7f76cad 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
@@ -35,6 +35,7 @@ import org.apache.impala.authorization.User;
 import org.apache.impala.catalog.FeCatalog;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.thrift.TSessionState;
+import org.apache.impala.util.EventSequence;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
@@ -48,6 +49,7 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -208,8 +210,10 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
 
   @Override
   public AuthorizationContext createAuthorizationContext(boolean doAudits,
-      String sqlStmt, TSessionState sessionState) throws InternalException {
-    RangerAuthorizationContext authzCtx = new RangerAuthorizationContext(sessionState);
+      String sqlStmt, TSessionState sessionState, Optional<EventSequence> timeline)
+      throws InternalException {
+    RangerAuthorizationContext authzCtx =
+        new RangerAuthorizationContext(sessionState, timeline);
     if (doAudits) {
       // Any statement that goes through {@link authorize} will need to have audit logs.
       if (sqlStmt != null) {
@@ -235,7 +239,7 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
     // case 4: table (select) ERROR, columns (select) ERROR --> only add the first column
     //                                                          event
     RangerAuthorizationContext tmpCtx = new RangerAuthorizationContext(
-        originalCtx.getSessionState());
+        originalCtx.getSessionState(), originalCtx.getTimeline());
     tmpCtx.setAuditHandler(new RangerBufferAuditHandler(
         originalAuditHandler.getSqlStmt(), originalAuditHandler.getClusterName(),
         originalAuditHandler.getClientIp()));
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationContext.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationContext.java
index f4ad66a..afdf395 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationContext.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationContext.java
@@ -20,8 +20,10 @@ package org.apache.impala.authorization.ranger;
 import com.google.common.base.Preconditions;
 import org.apache.impala.authorization.AuthorizationContext;
 import org.apache.impala.thrift.TSessionState;
+import org.apache.impala.util.EventSequence;
 
 import javax.annotation.Nullable;
+import java.util.Optional;
 
 /**
  * Ranger specific {@link AuthorizationContext}.
@@ -31,7 +33,9 @@ public class RangerAuthorizationContext extends AuthorizationContext {
   // Audit handler can be null meaning we don't want to do an audit log.
   private @Nullable RangerBufferAuditHandler auditHandler_;
 
-  public RangerAuthorizationContext(TSessionState sessionState) {
+  public RangerAuthorizationContext(TSessionState sessionState,
+      Optional<EventSequence> timeline) {
+    super(timeline);
     sessionState_ = sessionState;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationChecker.java
index 46a393f..cd0f5ff 100644
--- a/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationChecker.java
@@ -30,6 +30,7 @@ import org.apache.impala.authorization.User;
 import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.thrift.TSessionState;
+import org.apache.impala.util.EventSequence;
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Subject;
 import org.apache.sentry.core.model.db.DBModelAuthorizable;
@@ -38,6 +39,7 @@ import org.apache.sentry.provider.common.ResourceAuthorizationProvider;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -89,8 +91,8 @@ public class SentryAuthorizationChecker extends BaseAuthorizationChecker {
 
   @Override
   public AuthorizationContext createAuthorizationContext(boolean doAudits,
-      String sqlStmt, TSessionState sessionState) {
-    return new AuthorizationContext();
+      String sqlStmt, TSessionState sessionState, Optional<EventSequence> timeline) {
+    return new AuthorizationContext(timeline);
   }
 
   /*
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index d0979c0..fbc5d36 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1248,8 +1248,7 @@ public class Frontend {
     AnalysisContext analysisCtx = new AnalysisContext(queryCtx, authzFactory_, timeline);
     AnalysisResult analysisResult =
         analysisCtx.analyzeAndAuthorize(stmt, stmtTableCache, authzChecker_.get());
-    LOG.info("Analysis finished.");
-    timeline.markEvent("Analysis finished");
+    LOG.info("Analysis and authorization finished.");
     Preconditions.checkNotNull(analysisResult.getStmt());
 
     TExecRequest result = createBaseExecRequest(queryCtx, analysisResult);
diff --git a/fe/src/main/java/org/apache/impala/util/EventSequence.java b/fe/src/main/java/org/apache/impala/util/EventSequence.java
index 92552d1..d137208 100644
--- a/fe/src/main/java/org/apache/impala/util/EventSequence.java
+++ b/fe/src/main/java/org/apache/impala/util/EventSequence.java
@@ -41,11 +41,14 @@ public class EventSequence {
 
   /**
    * Saves an event at the current time with the given label.
+   * It returns the duration in nano seconds between the last and the current event.
    */
-  public void markEvent(String label) {
+  public long markEvent(String label) {
     // Timestamps should be in ns resolution
-    timestamps_.add(System.nanoTime() - startTime_);
+    long durationNs = System.nanoTime() - startTime_;
+    timestamps_.add(durationNs);
     labels_.add(label);
+    return durationNs;
   }
 
   // For testing
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 9771fe4..71285eb 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.fail;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Supplier;
 
@@ -58,6 +59,7 @@ import org.apache.impala.testutil.ImpaladTestCatalog;
 import org.apache.impala.thrift.TAccessEvent;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSessionState;
+import org.apache.impala.util.EventSequence;
 import org.junit.Assert;
 
 import com.google.common.base.Preconditions;
@@ -377,8 +379,9 @@ public class FrontendTestBase extends AbstractFrontendTest {
 
           @Override
           public AuthorizationContext createAuthorizationContext(boolean doAudits,
-              String sqlStmt, TSessionState sessionState) {
-            return new AuthorizationContext();
+              String sqlStmt, TSessionState sessionState,
+              Optional<EventSequence> timeline) {
+            return new AuthorizationContext(timeline);
           }
         };
       }
diff --git a/tests/observability/test_log_fragments.py b/tests/observability/test_log_fragments.py
index b3d6afc..81fbf53 100644
--- a/tests/observability/test_log_fragments.py
+++ b/tests/observability/test_log_fragments.py
@@ -42,7 +42,8 @@ class TestLogFragments(ImpalaTestSuite):
     self.execute_query("select 1")
     # Logging may be buffered, so sleep to wait out the buffering.
     time.sleep(6)
-    self.assert_impalad_log_contains('INFO', query_id + "] Analysis finished.")
+    self.assert_impalad_log_contains('INFO', query_id +
+      "] Analysis and authorization finished.")
     assert query_id.endswith("000")
     # Looks for a fragment instance that doesn't end with "0" to make sure instances
     # are getting propagated too.
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 005a661..fef2fef 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -279,7 +279,24 @@ class TestObservability(ImpalaTestSuite):
     assert results.runtime_profile.count("AGGREGATION_NODE") == 2
     assert results.runtime_profile.count("PLAN_ROOT_SINK") == 2
 
-  def test_query_profile_contains_query_events(self):
+  def test_query_profile_contains_query_compilation_events(self):
+    """Test that the expected events show up in a query profile.
+       If the table metadata is not cached this test will fail, as the metadata load
+       creates lines dynamically."""
+    event_regexes = [r'Query Compilation:',
+        r'Metadata of all .* tables cached:',
+        r'Analysis finished:',
+        r'Authorization finished (.*):',
+        r'Value transfer graph computed:',
+        r'Single node plan created:',
+        r'Runtime filters computed:',
+        r'Distributed plan created:',
+        r'Planning finished:']
+    query = "select * from functional.alltypes"
+    runtime_profile = self.execute_query(query).runtime_profile
+    self.__verify_profile_event_sequence(event_regexes, runtime_profile)
+
+  def test_query_profile_contains_query_timeline_events(self):
     """Test that the expected events show up in a query profile."""
     event_regexes = [r'Query Timeline:',
         r'Query submitted:',