You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/08/13 03:33:22 UTC

[impala] branch master updated: IMPALA-9744: Treat corrupt table stats as missing to avoid bad plans

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fedf7b  IMPALA-9744: Treat corrupt table stats as missing to avoid bad plans
5fedf7b is described below

commit 5fedf7bf7247240ae1d356a393179f80b97d5cb5
Author: Qifan Chen <qc...@cloudera.com>
AuthorDate: Thu Jun 18 19:31:20 2020 -0700

    IMPALA-9744: Treat corrupt table stats as missing to avoid bad plans
    
    This work addresses the current limitation in computing the total row
    count for a Hive table in a scan. The row count can be incorrectly
    computed as 0, even though there exists data in the Hive table. This
    is the stats corruption at table level. Similar stats corruption
    exists for a partition. The row count of a table or a partition
    sometime can also be -1 which indicates a missing stats situation.
    
    In the fix, as long as no partition in a Hive table exhibits any
    missing or corrupt stats, the total row count for the table is computed
    from the row counts in all partitions. Otherwise, Impala looks at
    the table level stats particularly the table row count.
    
    In addition, if the table stats is missing or corrupted, Impala
    estimates a row count for the table, if feasible. This row count is
    the sum of the row count from the partitions with good stats, and
    an estimation of the number of rows in the partitions with missing or
    corrupt stats. Such estimation also applies when some partition
    has corrupt stats.
    
    One way to observe the fix is through the explain of queries scanning
    Hive tables with missing or corrupted stats. The cardinality for any
    full scan should be a positive value (i.e. the estimated row count),
    instead of 'unavailable'.  At the beginning of the explain output,
    that table is still listed in the WARNING section for potentially
    corrupt table statistics.
    
    Testing:
    1. Ran unit tests with queries documented in the case against Hive
       tables with the following configrations:
       a. No stats corruption in any partitions
       b. Stats corruption in some partitions
       c. Stats corruption in all partitions
    2. Added two new tests in test_compute_stats.py:
       a. test_corrupted_stats_in_partitioned_Hive_tables
       b. test_corrupted_stats_in_unpartitioned_Hive_tables
    3. Fixed failures in corrupt-stats.test
    4. Ran "core" test
    
    Change-Id: I9f4c64616ff7c0b6d5a48f2b5331325feeff3576
    Reviewed-on: http://gerrit.cloudera.org:8080/16098
    Reviewed-by: Sahil Takiar <st...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/planner/HdfsScanNode.java    |  83 ++++++++------
 .../org/apache/impala/planner/PlannerTest.java     |  32 ++++--
 .../org/apache/impala/planner/PlannerTestBase.java |   9 +-
 .../java/org/apache/impala/testutil/TestUtils.java |  50 ++++++++-
 .../queries/PlannerTest/acid-scans.test            |  32 +++---
 .../PlannerTest/bloom-filter-assignment.test       |  26 ++---
 ...k-join-detection-hdfs-num-rows-est-enabled.test |   2 +-
 ...-runtime-filters-hdfs-num-rows-est-enabled.test |   2 +-
 .../PlannerTest/parquet-filtering-disabled.test    |   8 +-
 .../queries/PlannerTest/parquet-filtering.test     |  10 +-
 .../queries/PlannerTest/tablesample.test           |   2 +-
 .../queries/PlannerTest/union.test                 |   4 +-
 .../queries/QueryTest/corrupt-stats.test           |  11 +-
 .../queries/QueryTest/stats-extrapolation.test     |   4 +-
 tests/metadata/test_compute_stats.py               | 122 +++++++++++++++++++++
 tests/metadata/test_explain.py                     |  11 +-
 16 files changed, 311 insertions(+), 97 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 81b03e0..91a5e75 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1150,9 +1150,11 @@ public class HdfsScanNode extends ScanNode {
   /**
    * Computes and returns the number of rows scanned based on the per-partition row count
    * stats and/or the table-level row count stats, depending on which of those are
-   * available, and whether the table is partitioned. Partitions without stats are
-   * ignored as long as there is at least one partition with stats. Otherwise,
-   * we fall back to table-level stats even for partitioned tables.
+   * available. Partition stats are used as long as they are neither missing nor
+   * corrupted. Otherwise, we fall back to table-level stats even for partitioned tables.
+   * We further estimate the row count if the table-level stats is missing or corrupted,
+   * or some partitions are with corrupt stats. The estimation is done only for those
+   * partitions with corrupt stats.
    *
    * Sets these members:
    * numPartitionsWithNumRows_, partitionNumRows_, hasCorruptTableStats_.
@@ -1161,34 +1163,43 @@ public class HdfsScanNode extends ScanNode {
     numPartitionsWithNumRows_ = 0;
     partitionNumRows_ = -1;
     hasCorruptTableStats_ = false;
-    if (tbl_.getNumClusteringCols() > 0) {
-      for (FeFsPartition p: partitions_) {
-        // Check for corrupt partition stats
-        long partNumRows = p.getNumRows();
-        if (partNumRows < -1  || (partNumRows == 0 && p.getSize() > 0))  {
-          hasCorruptTableStats_ = true;
-        }
-        // Ignore partitions with missing stats in the hope they don't matter
-        // enough to change the planning outcome.
-        if (partNumRows > -1) {
-          if (partitionNumRows_ == -1) partitionNumRows_ = 0;
-          partitionNumRows_ = checkedAdd(partitionNumRows_, partNumRows);
-          ++numPartitionsWithNumRows_;
-        }
+
+    List<FeFsPartition> partitionsWithCorruptOrMissingStats = new ArrayList<>();
+    for (FeFsPartition p : partitions_) {
+      long partNumRows = p.getNumRows();
+      // Check for corrupt stats
+      if (partNumRows < -1 || (partNumRows == 0 && p.getSize() > 0)) {
+        hasCorruptTableStats_ = true;
+        partitionsWithCorruptOrMissingStats.add(p);
+      } else if (partNumRows == -1) { // Check for missing stats
+        partitionsWithCorruptOrMissingStats.add(p);
+      } else if (partNumRows > -1) {
+        // Consider partition with good stats.
+        if (partitionNumRows_ == -1) partitionNumRows_ = 0;
+        partitionNumRows_ = checkedAdd(partitionNumRows_, partNumRows);
+        ++numPartitionsWithNumRows_;
       }
-      if (numPartitionsWithNumRows_ > 0) return partitionNumRows_;
     }
-    // Table is unpartitioned or the table is partitioned but no partitions have stats.
+    // If all partitions have good stats, return the total row count contributed
+    // by each of the partitions, as the row count for the table.
+    if (partitionsWithCorruptOrMissingStats.size() == 0
+        && numPartitionsWithNumRows_ > 0) {
+      return partitionNumRows_;
+    }
+
     // Set cardinality based on table-level stats.
     long numRows = tbl_.getNumRows();
     // Depending on the query option of disable_hdfs_num_rows_est, if numRows
-    // is still not available, we provide a crude estimation by computing
-    // sumAvgRowSizes, the sum of the slot size of each column of scalar type,
-    // and then generate the estimate using sumValues(totalBytesPerFs_), the size of
-    // the hdfs table.
-    if (!queryOptions.disable_hdfs_num_rows_estimate && numRows == -1L) {
-      // Compute the estimated table size when taking compression into consideration
-      long estimatedTableSize = computeEstimatedTableSize();
+    // is still not available (-1), or partition stats is corrupted, we provide
+    // a crude estimation by computing sumAvgRowSizes, the sum of the slot
+    // size of each column of scalar type, and then generate the estimate using
+    // sumValues(totalBytesPerFs_), the size of the hdfs table.
+    if (!queryOptions.disable_hdfs_num_rows_estimate
+        && (numRows == -1L || hasCorruptTableStats_)) {
+      // Compute the estimated table size from those partitions with missing or corrupt
+      // row count, when taking compression into consideration
+      long estimatedTableSize =
+          computeEstimatedTableSize(partitionsWithCorruptOrMissingStats);
 
       double sumAvgRowSizes = 0.0;
       for (Column col : tbl_.getColumns()) {
@@ -1202,25 +1213,35 @@ public class HdfsScanNode extends ScanNode {
         }
       }
 
+      long estNumRows = 0;
       if (sumAvgRowSizes == 0.0) {
         // When the type of each Column is of ArrayType or MapType,
         // sumAvgRowSizes would be equal to 0. In this case, we use a ultimate
         // fallback row width if sumAvgRowSizes == 0.0.
-        numRows = Math.round(estimatedTableSize / DEFAULT_ROW_WIDTH_ESTIMATE);
+        estNumRows = Math.round(estimatedTableSize / DEFAULT_ROW_WIDTH_ESTIMATE);
       } else {
-        numRows = Math.round(estimatedTableSize / sumAvgRowSizes);
+        estNumRows = Math.round(estimatedTableSize / sumAvgRowSizes);
       }
+
+      // Include the row count contributed by partitions with good stats (if any).
+      numRows = partitionNumRows_ =
+          (partitionNumRows_ > 0) ? partitionNumRows_ + estNumRows : estNumRows;
     }
+
     if (numRows < -1 || (numRows == 0 && tbl_.getTotalHdfsBytes() > 0)) {
       hasCorruptTableStats_ = true;
     }
+
     return numRows;
   }
 
-  /** Compute the estimated table size when taking compression into consideration */
-  private long computeEstimatedTableSize() {
+  /**
+   * Compute the estimated table size for the partitions contained in
+   * the partitions argument when taking compression into consideration
+   */
+  private long computeEstimatedTableSize(List<FeFsPartition> partitions) {
     long estimatedTableSize = 0;
-    for (FeFsPartition p: partitions_) {
+    for (FeFsPartition p : partitions) {
       HdfsFileFormat format = p.getFileFormat();
       long estimatedPartitionSize = 0;
       if (format == HdfsFileFormat.TEXT) {
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index f720873..1d03e98 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -325,7 +325,8 @@ public class PlannerTest extends PlannerTestBase {
     addTestTable("create table test_hdfs_insert_writer_limit.unpartitioned_table"
         + " (id int) location '/'");
     runPlannerTestFile("insert-hdfs-writer-limit", "test_hdfs_insert_writer_limit",
-        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -383,8 +384,9 @@ public class PlannerTest extends PlannerTestBase {
     // The FK/PK detection result is included in EXTENDED or higher.
     TQueryOptions options = defaultQueryOptions();
     options.setDisable_hdfs_num_rows_estimate(false);
-    runPlannerTestFile("fk-pk-join-detection-hdfs-num-rows-est-enabled",
-        options, ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
+    runPlannerTestFile("fk-pk-join-detection-hdfs-num-rows-est-enabled", options,
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -608,7 +610,9 @@ public class PlannerTest extends PlannerTestBase {
 
   @Test
   public void testBloomFilterAssignment() {
-    runPlannerTestFile("bloom-filter-assignment");
+    runPlannerTestFile("bloom-filter-assignment",
+        ImmutableSet.of(
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -624,7 +628,8 @@ public class PlannerTest extends PlannerTestBase {
   @Test
   public void testParquetFiltering() {
     runPlannerTestFile("parquet-filtering",
-        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -633,7 +638,8 @@ public class PlannerTest extends PlannerTestBase {
     options.setParquet_dictionary_filtering(false);
     options.setParquet_read_statistics(false);
     runPlannerTestFile("parquet-filtering-disabled", options,
-        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -798,7 +804,8 @@ public class PlannerTest extends PlannerTestBase {
   public void testTableSample() {
     TQueryOptions options = defaultQueryOptions();
     runPlannerTestFile("tablesample", options,
-        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -859,7 +866,9 @@ public class PlannerTest extends PlannerTestBase {
     options.setExplain_level(TExplainLevel.EXTENDED);
     options.setDisable_hdfs_num_rows_estimate(false);
     options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.MIN_MAX);
-    runPlannerTestFile("min-max-runtime-filters-hdfs-num-rows-est-enabled", options);
+    runPlannerTestFile("min-max-runtime-filters-hdfs-num-rows-est-enabled", options,
+        ImmutableSet.of(
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -1013,6 +1022,9 @@ public class PlannerTest extends PlannerTestBase {
     filter = TestUtils.ROW_SIZE_FILTER;
     assertEquals(" row-size= cardinality=10.3K",
         filter.transform(" row-size=10B cardinality=10.3K"));
+    filter = TestUtils.PARTITIONS_FILTER;
+    assertEquals(" partitions: 0/24 rows=",
+        filter.transform(" partitions: 0/24 rows=10.3K"));
   }
 
   @Test
@@ -1083,7 +1095,9 @@ public class PlannerTest extends PlannerTestBase {
    */
   @Test
   public void testAcidTableScans() {
-    runPlannerTestFile("acid-scans", "functional_orc_def");
+    runPlannerTestFile("acid-scans", "functional_orc_def",
+        ImmutableSet.of(
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   /**
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index a675a75..afcfa11 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -540,6 +540,10 @@ public class PlannerTestBase extends FrontendTestBase {
       if (!testOptions.contains(PlannerTestOption.VALIDATE_SCAN_FS)) {
         resultFilters.add(TestUtils.SCAN_NODE_SCHEME_FILTER);
       }
+      if (testOptions.contains(
+              PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS)) {
+        resultFilters.add(TestUtils.PARTITIONS_FILTER);
+      }
 
       String planDiff = TestUtils.compareOutput(
           Lists.newArrayList(explainStr.split("\n")), expectedPlan, true, resultFilters);
@@ -830,7 +834,10 @@ public class PlannerTestBase extends FrontendTestBase {
     VALIDATE_SCAN_FS,
     // If set, disables the attempt to compute an estimated number of rows in an
     // hdfs table.
-    DISABLE_HDFS_NUM_ROWS_ESTIMATE
+    DISABLE_HDFS_NUM_ROWS_ESTIMATE,
+    // If set, make no attempt to validate the estimated number of rows for any
+    // partitions in an hdfs table.
+    DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS
   }
 
   protected void runPlannerTestFile(String testFile, TQueryOptions options) {
diff --git a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
index f07e239..c7fde3e 100644
--- a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
+++ b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
@@ -113,20 +113,31 @@ public class TestUtils {
    */
   public static class IgnoreValueFilter implements ResultFilter {
     // Literal string containing the key name.
-    private final String keyPrefix;
-    private final String valueRegex;
+    protected final String keyPrefix;
+    protected final String valueRegex;
 
     /**
      * Create a filter that ignores the value from key value pairs where the key is
-     * the literal 'key' value and the value matches 'valueRegex'.
+     * the literal 'key' value and the value matches 'valueRegex'. The key and the
+     * value are separated by the 'separator' character.
      */
-    public IgnoreValueFilter(String key, String valueRegex) {
+    public IgnoreValueFilter(
+        String key, String valueRegex, char separator) {
       // Include leading space to avoid matching partial keys, e.g. if key is "bar" we
       // don't want to match "foobar=".
-      this.keyPrefix = " " + key + "=";
+      this.keyPrefix = " " + key + Character.toString(separator);
       this.valueRegex = valueRegex;
     }
 
+    /**
+     *  Create a filter that ignores the value from key value pairs where the key is
+     *  the literal 'key' value and the value matches 'valueRegex'. The key and the
+     *  value are separated by '='.
+     */
+    public IgnoreValueFilter(String key, String valueRegex) {
+      this(key, valueRegex, '=');
+    }
+
     @Override
     public boolean matches(String input) { return input.contains(keyPrefix); }
 
@@ -137,6 +148,30 @@ public class TestUtils {
   }
 
   /**
+   * Filter to replace the value from elements in the format key=value.
+   */
+  public static class ReplaceValueFilter extends IgnoreValueFilter {
+    // Literal string containing the replacement regex.
+    private final String replaceRegex;
+
+    /**
+     * Create a filter that replaces the value from key value pairs where the key is
+     * the literal 'key' value and the value matches 'valueRegex'. The key and the
+     * value are separated by the 'separator' character.
+     */
+    public ReplaceValueFilter(
+        String key, String valueRegex, String replaceRegex, char separator) {
+      super(key, valueRegex, separator);
+      this.replaceRegex = replaceRegex;
+    }
+
+    @Override
+    public String transform(String input) {
+      return input.replaceAll(keyPrefix + valueRegex, keyPrefix + replaceRegex);
+    }
+  }
+
+  /**
    * Filter to ignore the filesystem schemes in the scan node explain output. See
    * {@link org.apache.impala.planner.PlannerTestBase.PlannerTestOption#VALIDATE_SCAN_FS}
    * for more details.
@@ -176,6 +211,11 @@ public class TestUtils {
   public static final IgnoreValueFilter CARDINALITY_FILTER =
       new IgnoreValueFilter("cardinality", "\\S+");
 
+  // Ignore any values after 'rows=' in partitions: 0/24 rows=12.83K or
+  // partitions: 0/24 rows=unavailable entries
+  public static final ReplaceValueFilter PARTITIONS_FILTER =
+      new ReplaceValueFilter("partitions", "( \\d+/\\d+ rows=)\\S+", "$1", ':');
+
   // Ignore the exact estimated row count, which depends on the file sizes.
   static IgnoreValueFilter SCAN_RANGE_ROW_COUNT_FILTER =
       new IgnoreValueFilter("max-scan-range-rows", PrintUtils.METRIC_REGEX);
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test b/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test
index ab982e1..54dfae7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test
@@ -292,7 +292,7 @@ PLAN-ROOT SINK
 |  |     HDFS partitions=6/24 files=6 size=6.58KB
 |  |     stored statistics:
 |  |       table: rows=unavailable size=unavailable
-|  |       partitions: 0/6 rows=unavailable
+|  |       partitions: 0/6 rows=413
 |  |       columns: all
 |  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -303,7 +303,7 @@ PLAN-ROOT SINK
 |     HDFS partitions=24/24 files=24 size=54.09KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=3.42K
 |       columns missing stats: id
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
@@ -320,7 +320,7 @@ PLAN-ROOT SINK
 |     HDFS partitions=6/24 files=6 size=6.58KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/6 rows=unavailable
+|       partitions: 0/6 rows=413
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -332,7 +332,7 @@ PLAN-ROOT SINK
    runtime filters: RF000[bloom] -> id
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=3.42K
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=64.00MB mem-reservation=112.00KB thread-reservation=1
@@ -404,7 +404,7 @@ Per-Host Resources: mem-estimate=68.94MB mem-reservation=4.98MB thread-reservati
 |  |     HDFS partitions=6/24 files=6 size=6.58KB
 |  |     stored statistics:
 |  |       table: rows=unavailable size=unavailable
-|  |       partitions: 0/6 rows=unavailable
+|  |       partitions: 0/6 rows=413
 |  |       columns: all
 |  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -415,7 +415,7 @@ Per-Host Resources: mem-estimate=68.94MB mem-reservation=4.98MB thread-reservati
 |     HDFS partitions=24/24 files=24 size=54.09KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=3.42K
 |       columns missing stats: id
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
@@ -439,7 +439,7 @@ Per-Host Resources: mem-estimate=68.94MB mem-reservation=4.98MB thread-reservati
 |     HDFS partitions=6/24 files=6 size=6.58KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/6 rows=unavailable
+|       partitions: 0/6 rows=413
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -451,7 +451,7 @@ Per-Host Resources: mem-estimate=68.94MB mem-reservation=4.98MB thread-reservati
    runtime filters: RF000[bloom] -> id
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=3.42K
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=64.00MB mem-reservation=112.00KB thread-reservation=1
@@ -490,7 +490,7 @@ Per-Host Resources: mem-estimate=212.88MB mem-reservation=6.89MB thread-reservat
   |  |     HDFS partitions=6/24 files=6 size=6.58KB
   |  |     stored statistics:
   |  |       table: rows=unavailable size=unavailable
-  |  |       partitions: 0/6 rows=unavailable
+  |  |       partitions: 0/6 rows=413
   |  |       columns: all
   |  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
   |  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -501,7 +501,7 @@ Per-Host Resources: mem-estimate=212.88MB mem-reservation=6.89MB thread-reservat
   |     HDFS partitions=24/24 files=24 size=54.09KB
   |     stored statistics:
   |       table: rows=unavailable size=unavailable
-  |       partitions: 0/24 rows=unavailable
+  |       partitions: 0/24 rows=3.42K
   |       columns: all
   |     extrapolated-rows=disabled max-scan-range-rows=unavailable
   |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -518,7 +518,7 @@ Per-Host Resources: mem-estimate=212.88MB mem-reservation=6.89MB thread-reservat
   |     HDFS partitions=6/24 files=6 size=6.58KB
   |     stored statistics:
   |       table: rows=unavailable size=unavailable
-  |       partitions: 0/6 rows=unavailable
+  |       partitions: 0/6 rows=413
   |       columns: all
   |     extrapolated-rows=disabled max-scan-range-rows=unavailable
   |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -530,7 +530,7 @@ Per-Host Resources: mem-estimate=212.88MB mem-reservation=6.89MB thread-reservat
      runtime filters: RF000[bloom] -> t1.id % 12
      stored statistics:
        table: rows=unavailable size=unavailable
-       partitions: 0/24 rows=unavailable
+       partitions: 0/24 rows=3.42K
        columns missing stats: id
      extrapolated-rows=disabled max-scan-range-rows=unavailable
      mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
@@ -590,7 +590,7 @@ Per-Host Resources: mem-estimate=66.99MB mem-reservation=2.97MB thread-reservati
      runtime filters: RF000[bloom] -> t1.id % 12
      stored statistics:
        table: rows=unavailable size=unavailable
-       partitions: 0/24 rows=unavailable
+       partitions: 0/24 rows=3.42K
        columns missing stats: id
      extrapolated-rows=disabled max-scan-range-rows=unavailable
      mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
@@ -605,7 +605,7 @@ Per-Host Resources: mem-estimate=48.00MB mem-reservation=24.00KB thread-reservat
      HDFS partitions=6/24 files=6 size=6.58KB
      stored statistics:
        table: rows=unavailable size=unavailable
-       partitions: 0/6 rows=unavailable
+       partitions: 0/6 rows=413
        columns: all
      extrapolated-rows=disabled max-scan-range-rows=unavailable
      mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -631,7 +631,7 @@ Per-Host Resources: mem-estimate=49.99MB mem-reservation=1.96MB thread-reservati
      HDFS partitions=24/24 files=24 size=54.09KB
      stored statistics:
        table: rows=unavailable size=unavailable
-       partitions: 0/24 rows=unavailable
+       partitions: 0/24 rows=3.42K
        columns: all
      extrapolated-rows=disabled max-scan-range-rows=unavailable
      mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -646,7 +646,7 @@ Per-Host Resources: mem-estimate=48.00MB mem-reservation=24.00KB thread-reservat
      HDFS partitions=6/24 files=6 size=6.58KB
      stored statistics:
        table: rows=unavailable size=unavailable
-       partitions: 0/6 rows=unavailable
+       partitions: 0/6 rows=413
        columns: all
      extrapolated-rows=disabled max-scan-range-rows=unavailable
      mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test b/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
index ab0aa15..9e23d9f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
@@ -30,7 +30,7 @@ PLAN-ROOT SINK
 |     HDFS partitions=24/24 files=24 size=201.11KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=12.84K
 |       columns: unavailable
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -42,7 +42,7 @@ PLAN-ROOT SINK
    runtime filters: RF000[bloom] -> a.id
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -126,7 +126,7 @@ PLAN-ROOT SINK
    runtime filters: RF000[bloom] -> a.id
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -165,7 +165,7 @@ PLAN-ROOT SINK
 |     HDFS partitions=24/24 files=24 size=201.11KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=12.84K
 |       columns: unavailable
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -216,7 +216,7 @@ PLAN-ROOT SINK
    runtime filters: RF000[bloom] -> a.id + 1
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -254,7 +254,7 @@ PLAN-ROOT SINK
 |     HDFS partitions=24/24 files=24 size=201.11KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=12.84K
 |       columns: unavailable
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -301,7 +301,7 @@ PLAN-ROOT SINK
 |     predicates: c.id < CAST(100 AS INT)
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=12.84K
 |       columns: unavailable
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     parquet statistics predicates: c.id < CAST(100 AS INT)
@@ -329,7 +329,7 @@ PLAN-ROOT SINK
    runtime filters: RF000[bloom] -> a.int_col, RF002[bloom] -> a.int_col
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -372,7 +372,7 @@ PLAN-ROOT SINK
 |     predicates: c.id < CAST(100 AS INT)
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=12.84K
 |       columns: unavailable
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     parquet statistics predicates: c.id < CAST(100 AS INT)
@@ -399,7 +399,7 @@ PLAN-ROOT SINK
    runtime filters: RF000[bloom] -> a.int_col + 1, RF002[bloom] -> a.int_col
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -442,7 +442,7 @@ PLAN-ROOT SINK
 |     predicates: c.id < CAST(100 AS INT)
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=12.84K
 |       columns: unavailable
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     parquet statistics predicates: c.id < CAST(100 AS INT)
@@ -470,7 +470,7 @@ PLAN-ROOT SINK
    runtime filters: RF000[bloom] -> a.timestamp_col, RF003[bloom] -> a.timestamp_col
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -533,7 +533,7 @@ PLAN-ROOT SINK
    runtime filters: RF000[bloom] -> a.timestamp_col, RF003[bloom] -> a.timestamp_col
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection-hdfs-num-rows-est-enabled.test
index e144a2c..0b8eabb 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection-hdfs-num-rows-est-enabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection-hdfs-num-rows-est-enabled.test
@@ -79,7 +79,7 @@ PLAN-ROOT SINK
    runtime filters: RF000[bloom] -> ss_customer_sk
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/1824 rows=unavailable
+     partitions: 0/1824 rows=8.07M
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=128.00MB mem-reservation=8.00MB thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
index 9210208..c4b691d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
@@ -51,7 +51,7 @@ PLAN-ROOT SINK
    runtime filters: RF000[bloom] -> b.int_col, RF002[bloom] -> b.int_col
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
index d54b4ec..6b53ede 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
@@ -25,7 +25,7 @@ PLAN-ROOT SINK
    predicates: int_col IS NULL, int_col > CAST(1 AS INT), int_col > CAST(tinyint_col AS INT), CAST(int_col AS DOUBLE) * rand() > CAST(50 AS DOUBLE)
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
@@ -64,7 +64,7 @@ PLAN-ROOT SINK
    predicates: bool_col, bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), int_col % CAST(2 AS INT) = CAST(1 AS INT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = CAST(1 AS INT), date_string_col > ' [...]
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/22 rows=unavailable
+     partitions: 0/22 rows=11.74K
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=128.00MB mem-reservation=88.00KB thread-reservation=1
@@ -95,7 +95,7 @@ PLAN-ROOT SINK
    predicates: id IN (int_col), id NOT IN (CAST(0 AS INT), CAST(1 AS INT), CAST(2 AS INT)), int_col % CAST(50 AS INT) IN (CAST(0 AS INT), CAST(1 AS INT)), string_col IN ('aaaa', 'bbbb', 'cccc', NULL)
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -346,7 +346,7 @@ PLAN-ROOT SINK
    predicates: bool_col, bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), int_col % CAST(2 AS INT) = CAST(1 AS INT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = CAST(1 AS INT), date_string_col > ' [...]
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/4 rows=unavailable
+     partitions: 0/4 rows=2.55K
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=128.00MB mem-reservation=88.00KB thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index 7b94c17..90792b8 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -24,7 +24,7 @@ PLAN-ROOT SINK
    predicates: int_col IS NULL, int_col > CAST(1 AS INT), int_col > CAST(tinyint_col AS INT), CAST(int_col AS DOUBLE) * rand() > CAST(50 AS DOUBLE)
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    parquet statistics predicates: int_col > CAST(1 AS INT)
@@ -89,7 +89,7 @@ PLAN-ROOT SINK
    predicates: bool_col, bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), int_col % CAST(2 AS INT) = CAST(1 AS INT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = CAST(1 AS INT), date_string_col > ' [...]
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/22 rows=unavailable
+     partitions: 0/22 rows=11.74K
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    parquet statistics predicates: bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), date_string_col > '1993-10-01'
@@ -159,7 +159,7 @@ PLAN-ROOT SINK
    predicates: id IN (int_col), id NOT IN (CAST(0 AS INT), CAST(1 AS INT), CAST(2 AS INT)), int_col % CAST(50 AS INT) IN (CAST(0 AS INT), CAST(1 AS INT)), string_col IN ('aaaa', 'bbbb', 'cccc', NULL)
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    parquet dictionary predicates: id NOT IN (CAST(0 AS INT), CAST(1 AS INT), CAST(2 AS INT)), int_col % CAST(50 AS INT) IN (CAST(0 AS INT), CAST(1 AS INT)), string_col IN ('aaaa', 'bbbb', 'cccc', NULL)
@@ -600,7 +600,7 @@ PLAN-ROOT SINK
    predicates: bool_col, bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), int_col % CAST(2 AS INT) = CAST(1 AS INT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = CAST(1 AS INT), date_string_col > ' [...]
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/4 rows=unavailable
+     partitions: 0/4 rows=2.55K
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    parquet statistics predicates: bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), date_string_col > '1993-10-01'
@@ -637,7 +637,7 @@ PLAN-ROOT SINK
    predicates: bool_col, bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), int_col % CAST(2 AS INT) = CAST(1 AS INT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = CAST(1 AS INT), date_string_col > ' [...]
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/0 rows=unavailable
+     partitions: 0/0 rows=0
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=0
    mem-estimate=0B mem-reservation=0B thread-reservation=0
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
index 22d9144..2ff75a2 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
@@ -178,7 +178,7 @@ PLAN-ROOT SINK
    HDFS partitions=3/24 files=3 size=25.50KB
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/union.test b/testdata/workloads/functional-planner/queries/PlannerTest/union.test
index 396f85f..818367b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/union.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/union.test
@@ -4152,7 +4152,7 @@ Per-Host Resources: mem-estimate=128.00MB mem-reservation=32.00KB thread-reserva
    partitions=0/0 files=0 size=0B
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/0 rows=unavailable
+     partitions: 0/0 rows=0
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=0
    mem-estimate=0B mem-reservation=0B thread-reservation=0
@@ -4190,7 +4190,7 @@ Per-Host Resources: mem-estimate=128.00MB mem-reservation=32.00KB thread-reserva
 |     partitions=0/0 files=0 size=0B
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/0 rows=unavailable
+|       partitions: 0/0 rows=0
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=0
 |     mem-estimate=0B mem-reservation=0B thread-reservation=0
diff --git a/testdata/workloads/functional-query/queries/QueryTest/corrupt-stats.test b/testdata/workloads/functional-query/queries/QueryTest/corrupt-stats.test
index 00af63e..6f2c696 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/corrupt-stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/corrupt-stats.test
@@ -59,15 +59,18 @@ explain select count(*) from corrupted where org = 1;
 ''
 '03:AGGREGATE [FINALIZE]'
 '|  output: count:merge(*)'
+'|  row-size=8B cardinality=1'
 '|'
 '02:EXCHANGE [UNPARTITIONED]'
 '|'
 '01:AGGREGATE'
 '|  output: count(*)'
+'|  row-size=8B cardinality=1'
 '|'
 '00:SCAN $FILESYSTEM_NAME [$DATABASE.corrupted]'
+'   partition predicates: org = 1'
 '   $FILESYSTEM_NAME partitions=1/2 files=1 size=24B'
-'   row-size=0B cardinality=0'
+'   row-size=0B cardinality=1'
 ---- TYPES
 STRING
 ====
@@ -143,7 +146,7 @@ explain select count(*) from corrupted where org = 2;
 '|'
 '00:SCAN $FILESYSTEM_NAME [$DATABASE.corrupted]'
 '   $FILESYSTEM_NAME partitions=1/2 files=1 size=24B'
-'   row-size=0B cardinality=6'
+'   row-size=0B cardinality=1'
 ---- TYPES
 STRING
 ====
@@ -203,7 +206,7 @@ explain select count(*) from corrupted_no_part;
 '|'
 '00:SCAN $FILESYSTEM_NAME [$DATABASE.corrupted_no_part]'
 '   $FILESYSTEM_NAME partitions=1/1 files=1 size=6B'
-'   row-size=0B cardinality=0'
+'   row-size=0B cardinality=2'
 ---- TYPES
 STRING
 ====
@@ -226,7 +229,7 @@ explain select count(*) from corrupted_no_part;
 '|'
 '00:SCAN $FILESYSTEM_NAME [$DATABASE.corrupted_no_part]'
 '   $FILESYSTEM_NAME partitions=1/1 files=1 size=6B'
-'   row-size=0B cardinality=unavailable'
+'   row-size=0B cardinality=2'
 ---- TYPES
 STRING
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
index 7efb68d..0f445b2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
@@ -11,7 +11,7 @@ explain select id from alltypes;
 ---- RESULTS: VERIFY_IS_SUBSET
 '   stored statistics:'
 '     table: rows=unavailable size=unavailable'
-'     partitions: 0/12 rows=unavailable'
+'     partitions: 0/12 rows=5.97K'
 '     columns: unavailable'
 row_regex:.* extrapolated-rows=unavailable.*
 '   tuple-ids=0 row-size=4B cardinality=5.97K'
@@ -218,7 +218,7 @@ explain select id from alltypes;
 ---- RESULTS: VERIFY_IS_SUBSET
 '   stored statistics:'
 '     table: rows=unavailable size=unavailable'
-'     partitions: 0/24 rows=unavailable'
+'     partitions: 0/24 rows=17.91K'
 '     columns: unavailable'
 row_regex:.* extrapolated-rows=unavailable.*
 row_regex:.* tuple-ids=0 row-size=4B cardinality=17\.9.*K
diff --git a/tests/metadata/test_compute_stats.py b/tests/metadata/test_compute_stats.py
index 70e99f5..5455d60 100644
--- a/tests/metadata/test_compute_stats.py
+++ b/tests/metadata/test_compute_stats.py
@@ -29,6 +29,8 @@ from tests.common.test_dimensions import (
     create_uncompressed_text_dimension)
 from CatalogObjects.ttypes import THdfsCompression
 
+import os
+
 
 IMPALA_TEST_CLUSTER_PROPERTIES = ImpalaTestClusterProperties.get_instance()
 
@@ -170,6 +172,126 @@ class TestComputeStats(ImpalaTestSuite):
     assert(len(show_result.data) == 2)
     assert("1\tpval\t8" in show_result.data[0])
 
+  @staticmethod
+  def create_load_test_corrupt_stats(self, unique_database, create_load_stmts,
+          table_name, partitions, files):
+    """A helper method for tests against the fix to IMPALA-9744."""
+    # Create and load the Hive table.
+    self.run_stmt_in_hive(create_load_stmts)
+
+    # Make the table visible in Impala.
+    self.execute_query("invalidate metadata %s.%s" % (unique_database, table_name))
+
+    # Formulate a simple query that scans the Hive table.
+    explain_stmt = """
+    explain select * from {0}.{1} where
+    int_col > (select 3*stddev(int_col) from {0}.{1})
+    """.format(unique_database, table_name)
+    explain_result = self.execute_query(explain_stmt)
+
+    # Formulate a template which verifies the number of partitions and the number
+    # of files are per spec.
+    hdfs_physical_properties_template \
+      = """HDFS partitions={0}/{0} files={1}""".format(partitions, files)
+
+    # Check that the template formulated above exists and row count of the table is
+    # not zero, for all scans.
+    for i in xrange(len(explain_result.data)):
+      if ("SCAN HDFS" in explain_result.data[i]):
+         assert(hdfs_physical_properties_template in explain_result.data[i + 1])
+         assert("cardinality=0" not in explain_result.data[i + 2])
+
+  @SkipIfS3.hive
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
+  def test_corrupted_stats_in_partitioned_hive_tables(self, vector, unique_database):
+    """IMPALA-9744: Tests that the partition stats corruption in Hive tables
+    (row count=0, partition size>0, persisted when the data was loaded with
+    hive.stats.autogather=true) is handled at the table scan level.
+    """
+    # Unless something drastic changes in Hive and/or Impala, this test should
+    # always succeed.
+    if self.exploration_strategy() != 'exhaustive': pytest.skip()
+
+    # Load from a local data file
+    local_file = os.path.join(os.environ['IMPALA_HOME'],
+                 "testdata/data/alltypes_tiny_pages.parquet")
+    table_name = "partitioned_table_with_corrupted_and_missing_stats"
+
+    # Setting hive.stats.autogather=true after CRTB DDL but before LOAD DML
+    # minimally reproduces the corrupt stats issue.
+    create_load_stmts = """
+      CREATE TABLE {0}.{1} (
+        id int COMMENT 'Add a comment',
+        bool_col boolean,
+        tinyint_col tinyint,
+        smallint_col smallint,
+        int_col int,
+        bigint_col bigint,
+        float_col float,
+        double_col double,
+        date_string_col string,
+        string_col string,
+        timestamp_col timestamp,
+        year int,
+        month int )
+        PARTITIONED BY (decade string)
+        STORED AS PARQUET;
+      set hive.stats.autogather=true;
+      load data local inpath '{2}' into table {0}.{1} partition (decade="corrupt-stats");
+      set hive.stats.autogather=false;
+      load data local inpath '{2}' into table {0}.{1} partition (decade="missing-stats");
+    """.format(unique_database, table_name, local_file)
+
+    self.create_load_test_corrupt_stats(self, unique_database, create_load_stmts,
+            table_name, 2, 2)
+
+  @SkipIfS3.hive
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
+  def test_corrupted_stats_in_unpartitioned_hive_tables(self, vector, unique_database):
+    """IMPALA-9744: Tests that the stats corruption in unpartitioned Hive
+    tables (row count=0, partition size>0, persisted when the data was loaded
+    with hive.stats.autogather=true) is handled at the table scan level.
+    """
+    # Unless something drastic changes in Hive and/or Impala, this test should
+    # always succeed.
+    if self.exploration_strategy() != 'exhaustive': pytest.skip()
+
+    # Load from a local data file
+    local_file = os.path.join(os.environ['IMPALA_HOME'],
+                 "testdata/data/alltypes_tiny_pages.parquet")
+    table_name = "nonpartitioned_table_with_corrupted_stats"
+
+    # Setting hive.stats.autogather=true prior to CRTB DDL minimally reproduces the
+    # corrupt stats issue.
+    create_load_stmts = """
+      set hive.stats.autogather=true;
+      CREATE TABLE {0}.{1} (
+        id int COMMENT 'Add a comment',
+        bool_col boolean,
+        tinyint_col tinyint,
+        smallint_col smallint,
+        int_col int,
+        bigint_col bigint,
+        float_col float,
+        double_col double,
+        date_string_col string,
+        string_col string,
+        timestamp_col timestamp,
+        year int,
+        month int)
+        STORED AS PARQUET;
+      load data local inpath '{2}' into table {0}.{1};
+    """.format(unique_database, table_name, local_file)
+
+    self.create_load_test_corrupt_stats(self, unique_database, create_load_stmts,
+            table_name, 1, 1)
+
   @SkipIfS3.eventually_consistent
   @SkipIfCatalogV2.stats_pulling_disabled()
   def test_pull_stats_profile(self, vector, unique_database):
diff --git a/tests/metadata/test_explain.py b/tests/metadata/test_explain.py
index 9f2d61b..bc408db 100644
--- a/tests/metadata/test_explain.py
+++ b/tests/metadata/test_explain.py
@@ -129,13 +129,20 @@ class TestExplain(ImpalaTestSuite):
     result = self.execute_query("explain select * from %s where p = 1" % mixed_tbl,
         query_options={'explain_level':3})
     check_cardinality(result.data, '100')
+    # Set the number of rows at the table level to -1.
+    self.execute_query(
+      "alter table %s set tblproperties('numRows'='-1')" % mixed_tbl)
     # Set the number of rows for a single partition.
     self.execute_query(
       "alter table %s partition(p=1) set tblproperties('numRows'='50')" % mixed_tbl)
-    # Use partition stats when availabe. Partitions without stats are ignored.
+    # Use partition stats when availabe. Row counts for partitions without
+    # stats are estimated.
     result = self.execute_query("explain select * from %s" % mixed_tbl,
         query_options={'explain_level':3})
-    check_cardinality(result.data, '50')
+    check_cardinality(result.data, '51')
+    # Set the number of rows at the table level back to 100.
+    self.execute_query(
+      "alter table %s set tblproperties('numRows'='100')" % mixed_tbl)
     # Fall back to table-level stats when no selected partitions have stats.
     result = self.execute_query("explain select * from %s where p = 2" % mixed_tbl,
         query_options={'explain_level':3})