You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2017/05/26 21:15:13 UTC

[1/2] incubator-impala git commit: IMPALA-2373: Extrapolate row counts for HDFS tables.

Repository: incubator-impala
Updated Branches:
  refs/heads/master bad10da4a -> e89d7057a


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
index 32171fb..58ea01f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
@@ -30,7 +30,8 @@ from tpch.lineitem join tpch.orders on l_orderkey = o_orderkey;
 '|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2'
 '|  01:SCAN HDFS [tpch.orders, RANDOM]'
 row_regex:.*partitions=1/1 files=1 size=.*
-'|     table stats: 1500000 rows total'
+'|     stats-rows=1500000 extrapolated-rows=disabled'
+'|     table stats: rows=1500000 size=162.56MB'
 '|     column stats: all'
 '|     mem-estimate=88.00MB mem-reservation=0B'
 '|     tuple-ids=1 row-size=191B cardinality=1500000'
@@ -38,7 +39,8 @@ row_regex:.*partitions=1/1 files=1 size=.*
 '00:SCAN HDFS [tpch.lineitem, RANDOM]'
 row_regex:.*partitions=1/1 files=1 size=.*
 '   runtime filters: RF000 -> l_orderkey'
-'   table stats: 6001215 rows total'
+'   stats-rows=6001215 extrapolated-rows=disabled'
+'   table stats: rows=6001215 size=718.94MB'
 '   column stats: all'
 '   mem-estimate=88.00MB mem-reservation=0B'
 '   tuple-ids=0 row-size=263B cardinality=6001215'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test b/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test
index f1ff4a8..603544e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test
@@ -32,7 +32,8 @@ from tpch.lineitem join tpch.orders on l_orderkey = o_orderkey;
 '  00:SCAN HDFS [tpch.lineitem, RANDOM]'
 row_regex:.*partitions=1/1 files=1 size=.*
 '     runtime filters: RF000 -> l_orderkey'
-'     table stats: 6001215 rows total'
+'     stats-rows=6001215 extrapolated-rows=disabled'
+'     table stats: rows=6001215 size=718.94MB'
 '     column stats: all'
 '     mem-estimate=88.00MB mem-reservation=0B'
 '     tuple-ids=0 row-size=263B cardinality=6001215'
@@ -42,7 +43,8 @@ row_regex:.*partitions=1/1 files=1 size=.*
 '  |  mem-estimate=0B mem-reservation=0B'
 '  01:SCAN HDFS [tpch.orders, RANDOM]'
 row_regex:.*partitions=1/1 files=1 size=.*
-'     table stats: 1500000 rows total'
+'     stats-rows=1500000 extrapolated-rows=disabled'
+'     table stats: rows=1500000 size=162.56MB'
 '     column stats: all'
 '     mem-estimate=88.00MB mem-reservation=0B'
 '     tuple-ids=1 row-size=191B cardinality=1500000'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
new file mode 100644
index 0000000..c22095b
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
@@ -0,0 +1,148 @@
+====
+---- QUERY
+create table alltypes like functional_parquet.alltypes;
+insert into alltypes partition(year, month)
+select * from functional_parquet.alltypes where year = 2009;
+====
+---- QUERY
+# No stats are available.
+explain select id from alltypes;
+---- RESULTS: VERIFY_IS_SUBSET
+'   stats-rows=unavailable extrapolated-rows=unavailable'
+'   table stats: rows=unavailable size=unavailable'
+'   column stats: unavailable'
+'   mem-estimate=16.00MB mem-reservation=0B'
+'   tuple-ids=0 row-size=4B cardinality=unavailable'
+---- TYPES
+STRING
+====
+---- QUERY
+compute stats alltypes
+---- RESULTS
+'Updated 12 partition(s) and 11 column(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+# Stats are available now.
+explain select id from alltypes;
+---- RESULTS: VERIFY_IS_EQUAL
+'Per-Host Resource Reservation: Memory=0B'
+'Per-Host Resource Estimates: Memory=16.00MB'
+''
+'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1'
+'PLAN-ROOT SINK'
+'|  mem-estimate=0B mem-reservation=0B'
+'|'
+'00:SCAN HDFS [$DATABASE.alltypes]'
+row_regex:.*partitions=12/12 files=12 size=.*
+'   stats-rows=3650 extrapolated-rows=3650'
+row_regex:.*table stats: rows=3650 size=.*
+'   column stats: all'
+'   mem-estimate=16.00MB mem-reservation=0B'
+'   tuple-ids=0 row-size=4B cardinality=3650'
+---- TYPES
+STRING
+====
+---- QUERY
+# Select a subset of partitions.
+explain select id from alltypes where month in (1, 2, 3);
+---- RESULTS: VERIFY_IS_EQUAL
+'Per-Host Resource Reservation: Memory=0B'
+'Per-Host Resource Estimates: Memory=16.00MB'
+''
+'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1'
+'PLAN-ROOT SINK'
+'|  mem-estimate=0B mem-reservation=0B'
+'|'
+'00:SCAN HDFS [$DATABASE.alltypes]'
+row_regex:.*partitions=3/12 files=3 size=.*
+'   stats-rows=900 extrapolated-rows=904'
+row_regex:.*table stats: rows=3650 size=.*
+'   column stats: all'
+'   mem-estimate=16.00MB mem-reservation=0B'
+'   tuple-ids=0 row-size=4B cardinality=904'
+---- TYPES
+STRING
+====
+---- QUERY
+# Double the data in existing partitions.
+insert into alltypes partition(year, month)
+select * from functional_parquet.alltypes where year = 2009;
+explain select id from alltypes;
+---- RESULTS: VERIFY_IS_EQUAL
+'Per-Host Resource Reservation: Memory=0B'
+'Per-Host Resource Estimates: Memory=16.00MB'
+''
+'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1'
+'PLAN-ROOT SINK'
+'|  mem-estimate=0B mem-reservation=0B'
+'|'
+'00:SCAN HDFS [$DATABASE.alltypes]'
+row_regex:.*partitions=12/12 files=24 size=.*
+'   stats-rows=3650 extrapolated-rows=7300'
+row_regex:.*table stats: rows=3650 size=.*
+'   column stats: all'
+'   mem-estimate=16.00MB mem-reservation=0B'
+'   tuple-ids=0 row-size=4B cardinality=7300'
+---- TYPES
+STRING
+====
+---- QUERY
+# Create new partitions and extrapolate their row count.
+insert into alltypes partition(year, month)
+select * from functional_parquet.alltypes where year = 2010;
+explain select id from alltypes where year = 2010;
+---- RESULTS: VERIFY_IS_EQUAL
+'Per-Host Resource Reservation: Memory=0B'
+'Per-Host Resource Estimates: Memory=16.00MB'
+''
+'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1'
+'PLAN-ROOT SINK'
+'|  mem-estimate=0B mem-reservation=0B'
+'|'
+'00:SCAN HDFS [$DATABASE.alltypes]'
+row_regex:.*partitions=12/24 files=12 size=.*
+'   stats-rows=unavailable extrapolated-rows=3651'
+row_regex:.*table stats: rows=3650 size=.*
+'   column stats: all'
+'   mem-estimate=16.00MB mem-reservation=0B'
+'   tuple-ids=0 row-size=4B cardinality=3651'
+---- TYPES
+STRING
+====
+---- QUERY
+# Compute stats and run the same query again.
+compute stats alltypes;
+explain select id from alltypes where year = 2010;
+---- RESULTS: VERIFY_IS_EQUAL
+'Per-Host Resource Reservation: Memory=0B'
+'Per-Host Resource Estimates: Memory=16.00MB'
+''
+'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1'
+'PLAN-ROOT SINK'
+'|  mem-estimate=0B mem-reservation=0B'
+'|'
+'00:SCAN HDFS [$DATABASE.alltypes]'
+row_regex:.*partitions=12/24 files=12 size=.*
+'   stats-rows=3650 extrapolated-rows=3651'
+row_regex:.*table stats: rows=10950 size=.*
+'   column stats: all'
+'   mem-estimate=16.00MB mem-reservation=0B'
+'   tuple-ids=0 row-size=4B cardinality=3651'
+---- TYPES
+STRING
+====
+---- QUERY
+# Test that dropping stats resets everything.
+drop stats alltypes;
+explain select id from alltypes;
+---- RESULTS: VERIFY_IS_SUBSET
+'   stats-rows=unavailable extrapolated-rows=unavailable'
+'   table stats: rows=unavailable size=unavailable'
+'   column stats: unavailable'
+'   mem-estimate=16.00MB mem-reservation=0B'
+'   tuple-ids=0 row-size=4B cardinality=unavailable'
+---- TYPES
+STRING
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/tests/custom_cluster/test_stats_extrapolation.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_stats_extrapolation.py b/tests/custom_cluster/test_stats_extrapolation.py
new file mode 100644
index 0000000..06a2e84
--- /dev/null
+++ b/tests/custom_cluster/test_stats_extrapolation.py
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.test_dimensions import (
+    create_exec_option_dimension,
+    create_single_exec_option_dimension,
+    create_uncompressed_text_dimension)
+
+class TestStatsExtrapolation(CustomClusterTestSuite):
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestStatsExtrapolation, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+
+  @CustomClusterTestSuite.with_args(impalad_args=('--enable_stats_extrapolation=true'))
+  def test_stats_extrapolation(self, vector, unique_database):
+    vector.get_value('exec_option')['num_nodes'] = 1
+    vector.get_value('exec_option')['explain_level'] = 2
+    self.run_test_case('QueryTest/stats-extrapolation', vector, unique_database)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/tests/metadata/test_explain.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_explain.py b/tests/metadata/test_explain.py
index 1ee823f..506c618 100644
--- a/tests/metadata/test_explain.py
+++ b/tests/metadata/test_explain.py
@@ -70,7 +70,7 @@ class TestExplain(ImpalaTestSuite):
     vector.get_value('exec_option')['explain_level'] = 3
     self.run_test_case('QueryTest/explain-level3', vector)
 
-  def test_explain_validate_cardinality_estimates(self, vector):
+  def test_explain_validate_cardinality_estimates(self, vector, unique_database):
     # Tests that the cardinality estimates are correct for partitioned tables.
     # TODO Cardinality estimation tests should eventually be part of the planner tests.
     # TODO Remove this test
@@ -101,6 +101,37 @@ class TestExplain(ImpalaTestSuite):
         query_options={'explain_level':3})
     check_cardinality(result.data, '7300')
 
+    # Create a partitioned table with a mixed set of available stats,
+    mixed_tbl = unique_database + ".t"
+    self.execute_query(
+      "create table %s (c int) partitioned by (p int)" % mixed_tbl)
+    self.execute_query(
+      "insert into table %s partition (p) values(1,1),(2,2),(3,3)" % mixed_tbl)
+    # Set the number of rows at the table level.
+    self.execute_query(
+      "alter table %s set tblproperties('numRows'='100')" % mixed_tbl)
+    # Should fall back to table-level cardinality when partitions lack stats.
+    result = self.execute_query("explain select * from %s" % mixed_tbl,
+        query_options={'explain_level':3})
+    check_cardinality(result.data, '100')
+    # Should fall back to table-level cardinality, even for a subset of partitions,
+    result = self.execute_query("explain select * from %s where p = 1" % mixed_tbl,
+        query_options={'explain_level':3})
+    check_cardinality(result.data, '100')
+    # Set the number of rows for a single partition.
+    self.execute_query(
+      "alter table %s partition(p=1) set tblproperties('numRows'='50')" % mixed_tbl)
+    # Use partition stats when availabe. Partitions without stats are ignored.
+    result = self.execute_query("explain select * from %s" % mixed_tbl,
+        query_options={'explain_level':3})
+    check_cardinality(result.data, '50')
+    # Fall back to table-level stats when no selected partitions have stats.
+    result = self.execute_query("explain select * from %s where p = 2" % mixed_tbl,
+        query_options={'explain_level':3})
+    check_cardinality(result.data, '100')  
+
+
+
 class TestExplainEmptyPartition(ImpalaTestSuite):
   TEST_DB_NAME = "imp_1708"
 


[2/2] incubator-impala git commit: IMPALA-2373: Extrapolate row counts for HDFS tables.

Posted by ab...@apache.org.
IMPALA-2373: Extrapolate row counts for HDFS tables.

The main idea of this patch is to use table stats to
extrapolate the row counts for new/modified partitions.

Existing behavior:
- Partitions that lack the row count stat are ignored
  when estimating the cardinality of HDFS scans. Such
  partitions effectively have an estimated row count
  of zero.
- We always use the row count stats for partitions that
  have one. The row count may be innaccurate if data in
  such partitions has changed significantly.

Summary of changes:
- Enhance COMPUTE STATS to also store the total number
  of file bytes in the table.
- Use the table-level row count and file bytes stats
  to estimate the number of rows in a scan.
- A new impalad startup flag is added to enable/disable
  the extrapolation behavior. The feature is disabled by
  default. Note that even with the feature disabled,
  COMPUTE STATS stores the file bytes so you can enable
  the feature without having to run COMPUTE STATS again.

Testing:
- Added new FE unit test
- Added new EE test

Change-Id: I972c8a03ed70211734631a7dc9085cb33622ebc4
Reviewed-on: http://gerrit.cloudera.org:8080/6840
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e89d7057
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e89d7057
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e89d7057

Branch: refs/heads/master
Commit: e89d7057a6c759b91a61deef8c5ce2d91a049408
Parents: bad10da
Author: Alex Behm <al...@cloudera.com>
Authored: Fri May 5 10:44:03 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 26 21:06:17 2017 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc                   |   4 +
 be/src/exec/catalog-op-executor.cc              |   4 +
 be/src/util/backend-gflag-util.cc               |   3 +-
 common/thrift/BackendGflags.thrift              |   2 +
 common/thrift/CatalogObjects.thrift             |   5 +-
 common/thrift/JniCatalog.thrift                 |   4 +
 .../impala/analysis/ComputeStatsStmt.java       |   4 +
 .../apache/impala/catalog/DataSourceTable.java  |   4 +-
 .../org/apache/impala/catalog/HBaseTable.java   |   2 +-
 .../org/apache/impala/catalog/HdfsTable.java    |  63 ++++++--
 .../org/apache/impala/catalog/KuduTable.java    |   2 +-
 .../java/org/apache/impala/catalog/Table.java   |  42 +++--
 .../java/org/apache/impala/catalog/View.java    |   4 +-
 .../org/apache/impala/planner/HdfsScanNode.java | 114 +++++++++----
 .../org/apache/impala/planner/ScanNode.java     |  23 ++-
 .../apache/impala/service/BackendConfig.java    |   3 +
 .../impala/service/CatalogOpExecutor.java       |  28 ++--
 .../catalog/CatalogObjectToFromThriftTest.java  |   2 +-
 .../impala/planner/StatsExtrapolationTest.java  | 161 +++++++++++++++++++
 .../queries/PlannerTest/constant-folding.test   |  42 +++--
 .../queries/PlannerTest/mt-dop-validation.test  |  32 ++--
 .../queries/PlannerTest/parquet-filtering.test  |  45 +++---
 .../PlannerTest/resource-requirements.test      | 118 +++++++++-----
 .../PlannerTest/sort-expr-materialization.test  |  24 ++-
 .../queries/PlannerTest/tablesample.test        |  54 ++++---
 .../queries/QueryTest/explain-level2.test       |   6 +-
 .../queries/QueryTest/explain-level3.test       |   6 +-
 .../queries/QueryTest/stats-extrapolation.test  | 148 +++++++++++++++++
 .../custom_cluster/test_stats_extrapolation.py  |  41 +++++
 tests/metadata/test_explain.py                  |  33 +++-
 30 files changed, 817 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index cc80011..dd170b7 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -131,3 +131,7 @@ DEFINE_int64(inc_stats_size_limit_bytes, 200 * (1LL<<20), "Maximum size of "
     "This limit is set as a safety check, to prevent the JVM from "
     "hitting a maximum array limit of 1GB (or OOM) while building "
     "the thrift objects to send to impalads. By default, it's set to 200MB");
+
+DEFINE_bool(enable_stats_extrapolation, false,
+    "If true, uses table statistics computed with COMPUTE STATS "
+    "to extrapolate the row counts of partitions.");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index f3aed05..11c21cc 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -112,6 +112,10 @@ Status CatalogOpExecutor::ExecComputeStats(
   // Fill the alteration request based on the child-query results.
   SetTableStats(tbl_stats_schema, tbl_stats_data,
       compute_stats_params.existing_part_stats, &update_stats_params);
+  if (compute_stats_params.__isset.total_file_bytes) {
+    update_stats_params.table_stats.__set_total_file_bytes(
+        compute_stats_params.total_file_bytes);
+  }
   // col_stats_schema and col_stats_data will be empty if there was no column stats query.
   if (!col_stats_schema.columns.empty()) {
     if (compute_stats_params.is_incremental) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 0781393..96c8f87 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -26,6 +26,7 @@
 // Configs for the Frontend and the Catalog.
 DECLARE_bool(load_catalog_in_background);
 DECLARE_bool(load_auth_to_local_rules);
+DECLARE_bool(enable_stats_extrapolation);
 DECLARE_int32(non_impala_java_vlog);
 DECLARE_int32(read_size);
 DECLARE_int32(num_metadata_loading_threads);
@@ -43,7 +44,6 @@ DECLARE_string(authorized_proxy_user_config);
 DECLARE_string(authorized_proxy_user_config_delimiter);
 DECLARE_string(kudu_master_hosts);
 DECLARE_string(sentry_config);
-DECLARE_string(sentry_config);
 
 namespace impala {
 
@@ -67,6 +67,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_impala_log_lvl(FlagToTLogLevel(FLAGS_v));
   cfg.__set_non_impala_java_vlog(FlagToTLogLevel(FLAGS_non_impala_java_vlog));
   cfg.__set_inc_stats_size_limit_bytes(FLAGS_inc_stats_size_limit_bytes);
+  cfg.__set_enable_stats_extrapolation(FLAGS_enable_stats_extrapolation);
   cfg.__set_lineage_event_log_dir(FLAGS_lineage_event_log_dir);
   cfg.__set_local_library_path(FLAGS_local_library_dir);
   cfg.__set_kudu_operation_timeout_ms(FLAGS_kudu_operation_timeout_ms);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 09cf6f7..c76d8e0 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -31,6 +31,8 @@ struct TBackendGflags {
 
   5: required i64 inc_stats_size_limit_bytes
 
+ 19: required bool enable_stats_extrapolation
+
   6: required string lineage_event_log_dir
 
   7: required bool load_catalog_in_background

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index eeeae44..51ab922 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -135,7 +135,10 @@ struct TTableName {
 
 struct TTableStats {
   // Estimated number of rows in the table or -1 if unknown
-  1: required i64 num_rows;
+  1: required i64 num_rows
+
+  // Sum of file sizes in the table. Only set for tables of type HDFS_TABLE.
+  2: optional i64 total_file_bytes
 }
 
 // Column stats data that Impala uses.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/common/thrift/JniCatalog.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 507aa2e..008b60e 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -497,6 +497,10 @@ struct TComputeStatsParams {
   // The number of partition columns for the target table. Only set if this is_incremental
   // is true.
   8: optional i32 num_partition_cols
+
+  // Sum of file sizes in the table. Only set for tables of type HDFS_TABLE and if
+  // is_incremental is false.
+  9: optional i64 total_file_bytes
 }
 
 // Parameters for CREATE/DROP ROLE

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index a4552a6..d806521 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -280,6 +280,7 @@ public class ComputeStatsStmt extends StatementBase {
         partitionSet_.setPartitionShouldExist();
         partitionSet_.analyze(analyzer);
       }
+
       // For incremental stats, estimate the size of intermediate stats and report an
       // error if the estimate is greater than --inc_stats_size_limit_bytes in bytes
       if (isIncremental_) {
@@ -581,6 +582,9 @@ public class ComputeStatsStmt extends StatementBase {
     if (isIncremental_) {
       params.setNum_partition_cols(((HdfsTable)table_).getNumClusteringCols());
     }
+    if (table_ instanceof HdfsTable && !isIncremental_) {
+      params.setTotal_file_bytes(((HdfsTable)table_).getTotalHdfsBytes());
+    }
     return params;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
index a1597b9..e9e26b6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
@@ -186,7 +186,7 @@ public class DataSourceTable extends Table {
       loadColumns(fieldSchemas, client);
 
       // Set table stats.
-      numRows_ = getRowCount(super.getMetaStoreTable().getParameters());
+      setTableStats(msTable_);
     } catch (Exception e) {
       throw new TableLoadingException("Failed to load metadata for data source table: " +
           name_, e);
@@ -216,7 +216,7 @@ public class DataSourceTable extends Table {
     resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift()));
     result.setSchema(resultSchema);
     TResultRowBuilder rowBuilder = new TResultRowBuilder();
-    rowBuilder.add(numRows_);
+    rowBuilder.add(tableStats_.num_rows);
     result.addToRows(rowBuilder.get());
     return result;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
index c901f1d..4f60c96 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
@@ -408,7 +408,7 @@ public class HBaseTable extends Table {
       }
 
       // Set table stats.
-      numRows_ = getRowCount(super.getMetaStoreTable().getParameters());
+      setTableStats(msTable_);
 
       // since we don't support composite hbase rowkeys yet, all hbase tables have a
       // single clustering col

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index d8d8d4a..bbdbd16 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1057,7 +1057,7 @@ public class HdfsTable extends Table {
         // This is the special case of CTAS that creates a 'temp' table that does not
         // actually exist in the Hive Metastore.
         initializePartitionMetadata(msTbl);
-        updateStatsFromHmsTable(msTbl);
+        setTableStats(msTbl);
         return;
       }
       // Load partition and file metadata
@@ -1082,7 +1082,7 @@ public class HdfsTable extends Table {
         loadAllPartitions(msPartitions, msTbl);
       }
       if (loadTableSchema) setAvroSchema(client, msTbl);
-      updateStatsFromHmsTable(msTbl);
+      setTableStats(msTbl);
       numHdfsFiles_ = -1;
       totalHdfsBytes_ = -1;
     } catch (TableLoadingException e) {
@@ -1232,13 +1232,8 @@ public class HdfsTable extends Table {
     return partitions;
   }
 
-  /**
-   * Updates the cardinality of this table from an HMS table. Sets the cardinalities of
-   * dummy/default partitions for the case of unpartitioned tables.
-   */
-  private void updateStatsFromHmsTable(
-      org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    numRows_ = getRowCount(msTbl.getParameters());
+  public void setTableStats(org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    super.setTableStats(msTbl);
     // For unpartitioned tables set the numRows in its partitions
     // to the table's numRows.
     if (numClusteringCols_ == 0 && !partitionMap_.isEmpty()) {
@@ -1246,7 +1241,7 @@ public class HdfsTable extends Table {
       // Temp tables used in CTAS statements have one partition.
       Preconditions.checkState(partitionMap_.size() == 2 || partitionMap_.size() == 1);
       for (HdfsPartition p: partitionMap_.values()) {
-        p.setNumRows(numRows_);
+        p.setNumRows(getNumRows());
       }
     }
   }
@@ -1765,6 +1760,29 @@ public class HdfsTable extends Table {
   }
 
   /**
+   * Returns an estimated row count for the given number of file bytes. The row count is
+   * extrapolated using the table-level row count and file bytes statistics.
+   * Returns zero only if the given file bytes is zero.
+   * Returns -1 if:
+   * - stats extrapolation has been disabled
+   * - the given file bytes statistic is negative
+   * - the row count or the file byte statistic is missing
+   * - the file bytes statistic is zero or negative
+   * - the row count statistic is zero and the file bytes is non-zero
+   * Otherwise, returns a value >= 1.
+   */
+  public long getExtrapolatedNumRows(long fileBytes) {
+    if (!BackendConfig.INSTANCE.enableStatsExtrapolation()) return -1;
+    if (fileBytes == 0) return 0;
+    if (fileBytes < 0) return -1;
+    if (tableStats_.num_rows < 0 || tableStats_.total_file_bytes <= 0) return -1;
+    if (tableStats_.num_rows == 0 && tableStats_.total_file_bytes != 0) return -1;
+    double rowsPerByte = tableStats_.num_rows / (double) tableStats_.total_file_bytes;
+    double extrapolatedNumRows = fileBytes * rowsPerByte;
+    return (long) Math.max(1, Math.round(extrapolatedNumRows));
+  }
+
+  /**
    * Returns statistics on this table as a tabular result set. Used for the
    * SHOW TABLE STATS statement. The schema of the returned TResultSet is set
    * inside this method.
@@ -1781,7 +1799,12 @@ public class HdfsTable extends Table {
       resultSchema.addToColumns(colDesc);
     }
 
+    boolean statsExtrap = BackendConfig.INSTANCE.enableStatsExtrapolation();
+
     resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift()));
+    if (statsExtrap) {
+      resultSchema.addToColumns(new TColumn("Extrap #Rows", Type.BIGINT.toThrift()));
+    }
     resultSchema.addToColumns(new TColumn("#Files", Type.BIGINT.toThrift()));
     resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
     resultSchema.addToColumns(new TColumn("Bytes Cached", Type.STRING.toThrift()));
@@ -1806,9 +1829,13 @@ public class HdfsTable extends Table {
         rowBuilder.add(expr.getStringValue());
       }
 
-      // Add number of rows, files, bytes, cache stats, and file format.
-      rowBuilder.add(p.getNumRows()).add(p.getFileDescriptors().size())
-          .addBytes(p.getSize());
+      // Add rows, extrapolated rows, files, bytes, cache stats, and file format.
+      rowBuilder.add(p.getNumRows());
+      // Compute and report the extrapolated row count because the set of files could
+      // have changed since we last computed stats for this partition. We also follow
+      // this policy during scan-cardinality estimation.
+      if (statsExtrap) rowBuilder.add(getExtrapolatedNumRows(p.getSize()));
+      rowBuilder.add(p.getFileDescriptors().size()).addBytes(p.getSize());
       if (!p.isMarkedCached()) {
         // Helps to differentiate partitions that have 0B cached versus partitions
         // that are not marked as cached.
@@ -1853,8 +1880,14 @@ public class HdfsTable extends Table {
         rowBuilder.add("");
       }
 
-      // Total num rows, files, and bytes (leave format empty).
-      rowBuilder.add(numRows_).add(numHdfsFiles_).addBytes(totalHdfsBytes_)
+      // Total rows, extrapolated rows, files, bytes, cache stats.
+      // Leave format empty.
+      rowBuilder.add(getNumRows());
+      // Compute and report the extrapolated row count because the set of files could
+      // have changed since we last computed stats for this partition. We also follow
+      // this policy during scan-cardinality estimation.
+      if (statsExtrap) rowBuilder.add(getExtrapolatedNumRows(totalHdfsBytes_));
+      rowBuilder.add(numHdfsFiles_).addBytes(totalHdfsBytes_)
           .addBytes(totalCachedBytes).add("").add("").add("").add("");
       result.addToRows(rowBuilder.get());
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index af3fb46..a79ffe0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -201,7 +201,7 @@ public class KuduTable extends Table {
     kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
     Preconditions.checkNotNull(kuduMasters_);
     org.apache.kudu.client.KuduTable kuduTable = null;
-    numRows_ = getRowCount(msTable_.getParameters());
+    setTableStats(msTable_);
 
     // Connect to Kudu to retrieve table metadata
     try (KuduClient kuduClient = KuduUtil.createKuduClient(getKuduMasterHosts())) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index c52e6fa..6b27353 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -73,8 +73,9 @@ public abstract class Table implements CatalogObject {
   // Number of clustering columns.
   protected int numClusteringCols_;
 
-  // estimated number of rows in table; -1: unknown.
-  protected long numRows_ = -1;
+  // Contains the estimated number of rows and optional file bytes. Non-null. Member
+  // values of -1 indicate an unknown statistic.
+  protected TTableStats tableStats_;
 
   // colsByPos[i] refers to the ith column in the table. The first numClusteringCols are
   // the clustering columns.
@@ -100,6 +101,8 @@ public abstract class Table implements CatalogObject {
     owner_ = owner;
     lastDdlTime_ = (msTable_ != null) ?
         CatalogServiceCatalog.getLastDdlTime(msTable_) : -1;
+    tableStats_ = new TTableStats(-1);
+    tableStats_.setTotal_file_bytes(-1);
   }
 
   public ReentrantLock getLock() { return tableLock_; }
@@ -121,6 +124,14 @@ public abstract class Table implements CatalogObject {
   public abstract void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException;
 
+  /**
+   * Sets 'tableStats_' by extracting the table statistics from the given HMS table.
+   */
+  public void setTableStats(org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    tableStats_ = new TTableStats(getRowCount(msTbl.getParameters()));
+    tableStats_.setTotal_file_bytes(getRawDataSize(msTbl.getParameters()));
+  }
+
   public void addColumn(Column col) {
     colsByPos_.add(col);
     colsByName_.put(col.getName().toLowerCase(), col);
@@ -199,11 +210,19 @@ public abstract class Table implements CatalogObject {
    * Returns the value of the ROW_COUNT constant, or -1 if not found.
    */
   protected static long getRowCount(Map<String, String> parameters) {
+    return getLongParam(StatsSetupConst.ROW_COUNT, parameters);
+  }
+
+  protected static long getRawDataSize(Map<String, String> parameters) {
+    return getLongParam(StatsSetupConst.RAW_DATA_SIZE, parameters);
+  }
+
+  private static long getLongParam(String key, Map<String, String> parameters) {
     if (parameters == null) return -1;
-    String numRowsStr = parameters.get(StatsSetupConst.ROW_COUNT);
-    if (numRowsStr == null) return -1;
+    String value = parameters.get(key);
+    if (value == null) return -1;
     try {
-      return Long.valueOf(numRowsStr);
+      return Long.valueOf(value);
     } catch (NumberFormatException exc) {
       // ignore
     }
@@ -279,10 +298,7 @@ public abstract class Table implements CatalogObject {
     }
 
     numClusteringCols_ = thriftTable.getClustering_columns().size();
-
-    // Estimated number of rows
-    numRows_ = thriftTable.isSetTable_stats() ?
-        thriftTable.getTable_stats().getNum_rows() : -1;
+    if (thriftTable.isSetTable_stats()) tableStats_ = thriftTable.getTable_stats();
 
     // Default to READ_WRITE access if the field is not set.
     accessLevel_ = thriftTable.isSetAccess_level() ? thriftTable.getAccess_level() :
@@ -337,10 +353,7 @@ public abstract class Table implements CatalogObject {
     }
 
     table.setMetastore_table(getMetaStoreTable());
-    if (numRows_ != -1) {
-      table.setTable_stats(new TTableStats());
-      table.getTable_stats().setNum_rows(numRows_);
-    }
+    table.setTable_stats(tableStats_);
     return table;
   }
 
@@ -473,7 +486,8 @@ public abstract class Table implements CatalogObject {
     numClusteringCols_ = n;
   }
 
-  public long getNumRows() { return numRows_; }
+  public long getNumRows() { return tableStats_.num_rows; }
+  public TTableStats getTTableStats() { return tableStats_; }
   public ArrayType getType() { return type_; }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/fe/src/main/java/org/apache/impala/catalog/View.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/View.java b/fe/src/main/java/org/apache/impala/catalog/View.java
index b82c489..c1f1a9b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/View.java
+++ b/fe/src/main/java/org/apache/impala/catalog/View.java
@@ -31,6 +31,7 @@ import org.apache.impala.analysis.SqlScanner;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
+import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.thrift.TTableType;
 import com.google.common.collect.Lists;
 
@@ -114,7 +115,8 @@ public class View extends Table {
       }
       // These fields are irrelevant for views.
       numClusteringCols_ = 0;
-      numRows_ = -1;
+      tableStats_ = new TTableStats(-1);
+      tableStats_.setTotal_file_bytes(-1);
       init();
     } catch (TableLoadingException e) {
       throw e;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 47720fd..9fb87e0 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -132,6 +132,11 @@ public class HdfsScanNode extends ScanNode {
   private long totalFiles_ = 0;
   private long totalBytes_ = 0;
 
+  // Input cardinality based on the partition row counts or extrapolation.
+  // -1 if invalid.
+  private long statsNumRows_ = -1;
+  private long extrapolatedNumRows_ = -1;
+
   // True if this scan node should use the MT implementation in the backend.
   private boolean useMtScanNode_;
 
@@ -649,8 +654,21 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
-   * Also computes totalBytes_, totalFiles_, numPartitionsMissingStats_,
-   * and sets hasCorruptTableStats_.
+   * Computes and sets the following members.
+   * inputCardinality_, cardinality_, numHosts_, statsNumRows_, extrapolatedNumRows_,
+   * numPartitionsMissingStats_, and hasCorruptTableStats_.
+   *
+   * Row count extrapolation
+   * If available, table-level row count and file bytes statistics are used for
+   * extrapolating the input cardinality (before conjuncts). The extrapolation is based
+   * on the total number of bytes to be scanned and is intended to address the following
+   * scenarios: (1) new partitions that have no stats, and (2) existing partitions which
+   * have changed since the last stats collection. When extrapolating, the per-partition
+   * row counts are ignored because we cannot determine whether the partition has changed
+   * since the last stats collection.
+   * Otherwise, the input cardinality is based on the per-partition row count stats
+   * and/or the table-level row count stats, depending on which of those are available.
+   * Partitions without stats are ignored.
    */
   @Override
   public void computeStats(Analyzer analyzer) {
@@ -659,20 +677,16 @@ public class HdfsScanNode extends ScanNode {
       LOG.trace("collecting partitions for table " + tbl_.getName());
     }
     numPartitionsMissingStats_ = 0;
+    statsNumRows_ = -1;
     if (tbl_.getNumClusteringCols() == 0) {
-      cardinality_ = tbl_.getNumRows();
-      if (cardinality_ < -1 || (cardinality_ == 0 && tbl_.getTotalHdfsBytes() > 0)) {
+      statsNumRows_ = tbl_.getNumRows();
+      if (statsNumRows_ < -1 || (statsNumRows_ == 0 && tbl_.getTotalHdfsBytes() > 0)) {
         hasCorruptTableStats_ = true;
       }
-      if (partitions_.isEmpty()) {
-        // Nothing to scan. Definitely a cardinality of 0 even if we have no stats.
-        cardinality_ = 0;
-      } else {
+      if (!partitions_.isEmpty()) {
         Preconditions.checkState(partitions_.size() == 1);
       }
     } else {
-      cardinality_ = 0;
-      boolean hasValidPartitionCardinality = false;
       for (HdfsPartition p: partitions_) {
         // Check for corrupt table stats
         if (p.getNumRows() < -1  || (p.getNumRows() == 0 && p.getSize() > 0))  {
@@ -681,24 +695,56 @@ public class HdfsScanNode extends ScanNode {
         // ignore partitions with missing stats in the hope they don't matter
         // enough to change the planning outcome
         if (p.getNumRows() > -1) {
-          cardinality_ = addCardinalities(cardinality_, p.getNumRows());
-          hasValidPartitionCardinality = true;
+          if (statsNumRows_ == -1) statsNumRows_ = 0;
+          statsNumRows_ = addCardinalities(statsNumRows_, p.getNumRows());
         } else {
           ++numPartitionsMissingStats_;
         }
       }
-      if (!partitions_.isEmpty() && !hasValidPartitionCardinality) {
-        // if none of the partitions knew its number of rows, we fall back on
-        // the table stats
-        cardinality_ = tbl_.getNumRows();
-      }
     }
+    extrapolatedNumRows_ = tbl_.getExtrapolatedNumRows(totalBytes_);
+    computeCardinalities();
+    computeNumNodes(analyzer, cardinality_);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("computeStats HdfsScan: #nodes=" + Integer.toString(numNodes_));
+    }
+  }
 
-    // Adjust the cardinality based on table sampling.
-    if (sampleParams_ != null && cardinality_ != -1) {
-      double fracPercBytes = (double) sampleParams_.getPercentBytes() / 100;
-      cardinality_ = Math.round(cardinality_ * fracPercBytes);
-      cardinality_ = Math.max(cardinality_, 1);
+  /**
+   * Computes and sets the input and output cardinalities, choosing between the
+   * 'extrapolatedNumRows_' and 'statsNumRows_'.
+   * Adjusts the output cardinality based on the scan conjuncts and table sampling.
+   */
+  private void computeCardinalities() {
+    Preconditions.checkState(statsNumRows_ >= -1 || hasCorruptTableStats_);
+    Preconditions.checkState(extrapolatedNumRows_ >= -1);
+
+    if (totalBytes_ == 0) {
+      // Nothing to scan. Definitely a cardinality of 0.
+      inputCardinality_ = 0;
+      cardinality_ = 0;
+      return;
+    }
+
+    // Choose between the extrapolated row count and the one based on stored stats.
+    if (extrapolatedNumRows_ != -1) {
+      // The extrapolated row count is based on the 'totalBytes_' which already accounts
+      // for table sampling, so no additional adjustment for sampling is necessary.
+      cardinality_ = extrapolatedNumRows_;
+    } else {
+      if (!partitions_.isEmpty() && numPartitionsMissingStats_ == partitions_.size()) {
+        // if none of the partitions knew its number of rows, and extrapolation was
+        // not possible, we fall back on the table stats
+        cardinality_ = tbl_.getNumRows();
+      } else {
+        cardinality_ = statsNumRows_;
+      }
+      // Adjust the cardinality based on table sampling.
+      if (sampleParams_ != null && cardinality_ != -1) {
+        double fracPercBytes = (double) sampleParams_.getPercentBytes() / 100;
+        cardinality_ = Math.round(cardinality_ * fracPercBytes);
+        cardinality_ = Math.max(cardinality_, 1);
+      }
     }
 
     // Adjust cardinality for all collections referenced along the tuple's path.
@@ -726,12 +772,7 @@ public class HdfsScanNode extends ScanNode {
     }
     cardinality_ = capAtLimit(cardinality_);
     if (LOG.isTraceEnabled()) {
-      LOG.trace("computeStats HdfsScan: cardinality_=" + Long.toString(cardinality_));
-    }
-
-    computeNumNodes(analyzer, cardinality_);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("computeStats HdfsScan: #nodes=" + Integer.toString(numNodes_));
+      LOG.trace("HdfsScan: cardinality_=" + Long.toString(cardinality_));
     }
   }
 
@@ -859,6 +900,17 @@ public class HdfsScanNode extends ScanNode {
       }
     }
     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
+      String extrapRows = String.valueOf(extrapolatedNumRows_);
+      if (!BackendConfig.INSTANCE.enableStatsExtrapolation()) {
+        extrapRows = "disabled";
+      } else if (extrapolatedNumRows_ == -1) {
+        extrapRows = "unavailable";
+      }
+      String statsRows = String.valueOf(statsNumRows_);
+      if (statsNumRows_ == -1) statsRows = "unavailable";
+      output.append(String.format(
+          "%sstats-rows=%s extrapolated-rows=%s", detailPrefix, statsRows, extrapRows));
+      output.append("\n");
       output.append(getStatsExplainString(detailPrefix, detailLevel));
       output.append("\n");
       if (numScanRangesNoDiskIds_ > 0) {
@@ -964,6 +1016,12 @@ public class HdfsScanNode extends ScanNode {
   }
 
   @Override
+  public boolean isTableMissingTableStats() {
+    if (extrapolatedNumRows_ >= 0) return false;
+    return super.isTableMissingTableStats();
+  }
+
+  @Override
   public boolean hasCorruptTableStats() { return hasCorruptTableStats_; }
 
   public boolean hasMissingDiskIds() { return numScanRangesNoDiskIds_ > 0; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/fe/src/main/java/org/apache/impala/planner/ScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index c1b6858..b85532f 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -23,11 +23,15 @@ import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.Table;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.NotImplementedException;
+import org.apache.impala.common.PrintUtils;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TScanRangeLocationList;
+import org.apache.impala.thrift.TTableStats;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -107,14 +111,17 @@ abstract public class ScanNode extends PlanNode {
   protected String getStatsExplainString(String prefix, TExplainLevel detailLevel) {
     StringBuilder output = new StringBuilder();
     // Table stats.
-    if (desc_.getTable().getNumRows() == -1) {
-      output.append(prefix + "table stats: unavailable");
-    } else {
-      output.append(prefix + "table stats: " + desc_.getTable().getNumRows() +
-          " rows total");
-      if (numPartitionsMissingStats_ > 0) {
-        output.append(" (" + numPartitionsMissingStats_ + " partition(s) missing stats)");
-      }
+    TTableStats tblStats = desc_.getTable().getTTableStats();
+    String numRows = String.valueOf(tblStats.num_rows);
+    if (tblStats.num_rows == -1) numRows = "unavailable";
+    output.append(prefix + "table stats: rows=" + numRows);
+    if (desc_.getTable() instanceof HdfsTable) {
+      String totalBytes = PrintUtils.printBytes(tblStats.total_file_bytes);
+      if (tblStats.total_file_bytes == -1) totalBytes = "unavailable";
+      output.append(" size=" + totalBytes);
+    }
+    if (tblStats.num_rows != -1 && numPartitionsMissingStats_ > 0) {
+      output.append(" (" + numPartitionsMissingStats_ + " partition(s) missing stats)");
     }
     output.append("\n");
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/fe/src/main/java/org/apache/impala/service/BackendConfig.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 5a4a440..80f8ace 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -49,6 +49,9 @@ public class BackendConfig {
     return !Strings.isNullOrEmpty(backendCfg_.lineage_event_log_dir);
   }
   public long getIncStatsMaxSize() { return backendCfg_.inc_stats_size_limit_bytes; }
+  public boolean enableStatsExtrapolation() {
+    return backendCfg_.enable_stats_extrapolation;
+  }
   public boolean isAuthToLocalEnabled() {
     return backendCfg_.load_auth_to_local_rules &&
         !Strings.isNullOrEmpty(backendCfg_.principal);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index c0246e8..e634a65 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -17,12 +17,6 @@
 
 package org.apache.impala.service;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -34,10 +28,8 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.PartitionDropOptions;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -138,8 +130,8 @@ import org.apache.impala.thrift.TGrantRevokePrivParams;
 import org.apache.impala.thrift.TGrantRevokeRoleParams;
 import org.apache.impala.thrift.THdfsCachingOp;
 import org.apache.impala.thrift.THdfsFileFormat;
-import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPartitionDef;
+import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPartitionStats;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TResetMetadataRequest;
@@ -159,6 +151,12 @@ import org.apache.impala.util.MetaStoreUtil;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 /**
  * Class used to execute Catalog Operations, including DDL and refresh/invalidate
  * metadata requests. Acts as a bridge between the Thrift catalog operation requests
@@ -827,9 +825,13 @@ public class CatalogOpExecutor {
       }
     }
 
-    // Update the table's ROW_COUNT parameter.
+    // Update the table's ROW_COUNT and RAW_DATA_SIZE parameters.
     msTbl.putToParameters(StatsSetupConst.ROW_COUNT,
         String.valueOf(params.getTable_stats().num_rows));
+    if (params.getTable_stats().isSetTotal_file_bytes()) {
+      msTbl.putToParameters(StatsSetupConst.RAW_DATA_SIZE,
+          String.valueOf(params.getTable_stats().total_file_bytes));
+    }
     Pair<String, String> statsTaskParam = MetastoreShim.statsGeneratedViaStatsTaskParam();
     msTbl.putToParameters(statsTaskParam.first, statsTaskParam.second);
     return numTargetedPartitions;
@@ -1194,7 +1196,11 @@ public class CatalogOpExecutor {
     // Delete the ROW_COUNT from the table (if it was set).
     org.apache.hadoop.hive.metastore.api.Table msTbl = table.getMetaStoreTable();
     int numTargetedPartitions = 0;
-    if (msTbl.getParameters().remove(StatsSetupConst.ROW_COUNT) != null) {
+    boolean droppedRowCount =
+        msTbl.getParameters().remove(StatsSetupConst.ROW_COUNT) != null;
+    boolean droppedRawDataSize =
+        msTbl.getParameters().remove(StatsSetupConst.RAW_DATA_SIZE) != null;
+    if (droppedRowCount || droppedRawDataSize) {
       applyAlterTable(msTbl);
       ++numTargetedPartitions;
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
index 67005be..6f63380 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
@@ -87,7 +87,7 @@ public class CatalogObjectToFromThriftTest {
       Assert.assertEquals(newTable.name_, thriftTable.tbl_name);
       Assert.assertEquals(newTable.numClusteringCols_, 2);
       // Currently only have table stats on "functional.alltypes"
-      if (dbName.equals("functional")) Assert.assertEquals(7300, newTable.numRows_);
+      if (dbName.equals("functional")) Assert.assertEquals(7300, newTable.getNumRows());
 
       HdfsTable newHdfsTable = (HdfsTable) newTable;
       Assert.assertEquals(newHdfsTable.getPartitions().size(), 25);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java b/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java
new file mode 100644
index 0000000..f1e1a70
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.Table;
+import org.apache.impala.common.FrontendTestBase;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TBackendGflags;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Tests the behavior of stats extrapolation with valid, invalid, and unset stats,
+ * as well as extreme values and other edge cases.
+ */
+public class StatsExtrapolationTest extends FrontendTestBase {
+
+  /**
+   * Sets the row count and raw data size stats in the given table.
+   * Unsets the corresponding statistic if a null value is passed.
+   */
+  private void setStats(Table tbl, Long rowCount, Long rawDataSize) {
+    org.apache.hadoop.hive.metastore.api.Table msTbl =
+        new org.apache.hadoop.hive.metastore.api.Table();
+    msTbl.setParameters(new HashMap<String, String>());
+    if (rowCount != null) {
+      msTbl.getParameters().put(StatsSetupConst.ROW_COUNT,
+          String.valueOf(rowCount));
+    }
+    if (rawDataSize != null) {
+      msTbl.getParameters().put(StatsSetupConst.RAW_DATA_SIZE,
+          String.valueOf(rawDataSize));
+    }
+    tbl.setMetaStoreTable(msTbl);
+    tbl.setTableStats(msTbl);
+  }
+
+  private void runTest(Table tbl, Long rowCount, Long rawDataSize,
+      long fileBytes, long expectedExtrapNumRows) {
+    Preconditions.checkState(tbl instanceof HdfsTable);
+    setStats(tbl, rowCount, rawDataSize);
+    long actualExrtapNumRows = ((HdfsTable)tbl).getExtrapolatedNumRows(fileBytes);
+    assertEquals(expectedExtrapNumRows, actualExrtapNumRows);
+  }
+
+  private void testInvalidStats(Table tbl, Long rowCount, Long rawDataSize) {
+    runTest(tbl, rowCount, rawDataSize, 0, 0);
+    runTest(tbl, rowCount, rawDataSize, 1, -1);
+    runTest(tbl, rowCount, rawDataSize, 100, -1);
+    runTest(tbl, rowCount, rawDataSize, 1000000000, -1);
+    runTest(tbl, rowCount, rawDataSize, Long.MAX_VALUE, -1);
+    runTest(tbl, rowCount, rawDataSize, Long.MIN_VALUE, -1);
+  }
+
+  @Test
+  public void TestStatsExtrapolation() {
+    addTestDb("extrap_stats", null);
+    Table tbl = addTestTable("create table extrap_stats.t (i int)");
+
+    // Replace/restore the static backend config for this test.
+    BackendConfig origInstance = BackendConfig.INSTANCE;
+    try {
+      // Create a fake config with extrapolation enabled.
+      TBackendGflags testGflags = new TBackendGflags();
+      testGflags.setEnable_stats_extrapolation(true);
+      BackendConfig.create(testGflags);
+
+      // Both stats are set to a meaningful value.
+      runTest(tbl, 100L, 1000L, 0, 0);
+      runTest(tbl, 100L, 1000L, 100, 10);
+      runTest(tbl, 100L, 1000L, 1000000000, 100000000);
+      runTest(tbl, 100L, 1000L, Long.MAX_VALUE, 922337203685477632L);
+      runTest(tbl, 100L, 1000L, -100, -1);
+      // The extrapolated number of rows should double/triple when the
+      // actual data volume doubles/triples.
+      runTest(tbl, 1000000000L, 123456789L, 123456789*2, 2000000000L);
+      runTest(tbl, 1000000000L, 123456789L, 123456789*3, 3000000000L);
+      runTest(tbl, 7777777777L, 33333333L, 33333333L*2, 15555555554L);
+      runTest(tbl, 7777777777L, 33333333L, 33333333L*3, 23333333331L);
+      // Very small row count and very big raw data size.
+      runTest(tbl, 1L, Long.MAX_VALUE, 1, 1);
+      runTest(tbl, 1L, Long.MAX_VALUE, 100, 1);
+      runTest(tbl, 1L, Long.MAX_VALUE, 1000000000, 1);
+      runTest(tbl, 1L, Long.MAX_VALUE, Long.MAX_VALUE, 1);
+      runTest(tbl, 1L, Long.MAX_VALUE, -100, -1);
+      // Very large row count and very small raw data size.
+      runTest(tbl, Long.MAX_VALUE, 1L, 1, Long.MAX_VALUE);
+      runTest(tbl, Long.MAX_VALUE, 1L, 100, Long.MAX_VALUE);
+      runTest(tbl, Long.MAX_VALUE, 1L, 1000000000, Long.MAX_VALUE);
+      runTest(tbl, Long.MAX_VALUE, 1L, Long.MAX_VALUE, Long.MAX_VALUE);
+      runTest(tbl, Long.MAX_VALUE, 1L, -100, -1);
+
+      // No stats are set.
+      testInvalidStats(tbl, null, null);
+      // Only one of the stats fields is set.
+      testInvalidStats(tbl, 100L, null);
+      testInvalidStats(tbl, null, 1000L);
+      // Stats are set to invalid values.
+      testInvalidStats(tbl, -100L, -1000L);
+      testInvalidStats(tbl, -100L, 1000L);
+      testInvalidStats(tbl, 100L, -1000L);
+      // Stats are zero.
+      runTest(tbl, 0L, 0L, 0, 0);
+      testInvalidStats(tbl, 0L, 0L);
+      testInvalidStats(tbl, 100L, 0L);
+      testInvalidStats(tbl, 0L, 1000L);
+
+      // Invalid file bytes input.
+      runTest(tbl, 100L, 1000L, -1, -1);
+      runTest(tbl, 100L, 1000L, Long.MIN_VALUE, -1);
+    } finally {
+      BackendConfig.INSTANCE = origInstance;
+    }
+  }
+
+  @Test
+  public void TestStatsExtrapolationDisabled() {
+    addTestDb("extrap_stats", null);
+    Table tbl = addTestTable("create table extrap_stats.t (i int)");
+
+    // Replace/restore the static backend config for this test.
+    BackendConfig origInstance = BackendConfig.INSTANCE;
+    try {
+      // Create a fake config with extrapolation disabled.
+      TBackendGflags testGflags = new TBackendGflags();
+      testGflags.setEnable_stats_extrapolation(false);
+      BackendConfig.create(testGflags);
+
+      // Always expect -1 even with legitimate stats.
+      runTest(tbl, 100L, 1000L, 0, -1);
+      runTest(tbl, 100L, 1000L, 100, -1);
+      runTest(tbl, 100L, 1000L, 1000000000, -1);
+      runTest(tbl, 100L, 1000L, Long.MAX_VALUE, -1);
+      runTest(tbl, 100L, 1000L, -100, -1);
+    } finally {
+      BackendConfig.INSTANCE = origInstance;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 0a30a97..3600aef 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -49,7 +49,8 @@ PLAN-ROOT SINK
    predicates: c_custkey > 10, !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o_orderkey = 4
    predicates on o_lineitems: 20 + l_linenumber < 0
-   table stats: 150000 rows total
+   stats-rows=150000 extrapolated-rows=disabled
+   table stats: rows=150000 size=292.36MB
    columns missing stats: c_orders
    parquet statistics predicates: c_custkey > 10
    parquet dictionary predicates: c_custkey > 10
@@ -70,7 +71,7 @@ PLAN-ROOT SINK
    stop key: 20\0
    hbase filters: d:string_col EQUAL '4'
    predicates: tinyint_col = 5, string_col = '4'
-   table stats: 10000 rows total
+   table stats: rows=10000
    column stats: all
    mem-estimate=1.00GB mem-reservation=0B
    tuple-ids=0 row-size=119B cardinality=1
@@ -110,7 +111,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=20B cardinality=7300
@@ -136,7 +138,8 @@ PLAN-ROOT SINK
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-|     table stats: 7300 rows total
+|     stats-rows=7300 extrapolated-rows=disabled
+|     table stats: rows=7300 size=478.45KB
 |     column stats: all
 |     parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
 |     mem-estimate=128.00MB mem-reservation=0B
@@ -144,7 +147,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=7300
@@ -170,7 +174,8 @@ PLAN-ROOT SINK
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-|     table stats: 7300 rows total
+|     stats-rows=7300 extrapolated-rows=disabled
+|     table stats: rows=7300 size=478.45KB
 |     column stats: all
 |     parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
 |     mem-estimate=128.00MB mem-reservation=0B
@@ -178,7 +183,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=7300
@@ -208,7 +214,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=20B cardinality=7300
@@ -236,7 +243,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=4B cardinality=7300
@@ -257,17 +265,18 @@ PLAN-ROOT SINK
 |  order by: greatest(20, bigint_col) ASC
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  mem-estimate=0B mem-reservation=16.00MB
-|  tuple-ids=3,2 row-size=37B cardinality=7300
+|  tuple-ids=3,2 row-size=61B cardinality=7300
 |
 01:SORT
 |  order by: concat('ab', string_col) ASC NULLS FIRST, greatest(20, bigint_col) ASC
 |  materialized: concat('ab', string_col), greatest(20, bigint_col)
 |  mem-estimate=16.00MB mem-reservation=48.00MB
-|  tuple-ids=3 row-size=29B cardinality=7300
+|  tuple-ids=3 row-size=53B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=29B cardinality=7300
@@ -287,7 +296,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=7300
@@ -304,7 +314,8 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(CAST(3 + ye
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
-   table stats: 100 rows total
+   stats-rows=100 extrapolated-rows=disabled
+   table stats: rows=100 size=6.32KB
    column stats: all
    mem-estimate=32.00MB mem-reservation=0B
    tuple-ids=0 row-size=16B cardinality=100
@@ -328,7 +339,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    limit: 2
    mem-estimate=128.00MB mem-reservation=0B

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index e3dd297..8147d0c 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -55,9 +55,10 @@ PLAN-ROOT SINK
 |  tuple-ids=1 row-size=16B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=156.57KB
+   partitions=24/24 files=24 size=178.13KB
    predicates: id < 10
-   table stats: unavailable
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=unavailable
    column stats: unavailable
    parquet statistics predicates: id < 10
    parquet dictionary predicates: id < 10
@@ -98,9 +99,10 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
 |  tuple-ids=1 row-size=16B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   partitions=24/24 files=24 size=156.57KB
+   partitions=24/24 files=24 size=178.13KB
    predicates: id < 10
-   table stats: unavailable
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=unavailable
    column stats: unavailable
    parquet statistics predicates: id < 10
    parquet dictionary predicates: id < 10
@@ -130,9 +132,10 @@ PLAN-ROOT SINK
 |  tuple-ids=4 row-size=8B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=156.57KB
+   partitions=24/24 files=24 size=178.13KB
    predicates: id < 10
-   table stats: unavailable
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=unavailable
    column stats: unavailable
    parquet statistics predicates: id < 10
    parquet dictionary predicates: id < 10
@@ -167,9 +170,10 @@ F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=9
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   partitions=24/24 files=24 size=156.57KB
+   partitions=24/24 files=24 size=178.13KB
    predicates: id < 10
-   table stats: unavailable
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=unavailable
    column stats: unavailable
    parquet statistics predicates: id < 10
    parquet dictionary predicates: id < 10
@@ -226,7 +230,8 @@ PLAN-ROOT SINK
    predicates: c_custkey < 10, !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o_orderkey < 5
    predicates on o_lineitems: l_linenumber < 3
-   table stats: 150000 rows total
+   stats-rows=150000 extrapolated-rows=disabled
+   table stats: rows=150000 size=292.36MB
    columns missing stats: c_orders
    parquet statistics predicates: c_custkey < 10
    parquet dictionary predicates: c_custkey < 10
@@ -283,7 +288,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
    predicates: c_custkey < 10, !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o_orderkey < 5
    predicates on o_lineitems: l_linenumber < 3
-   table stats: 150000 rows total
+   stats-rows=150000 extrapolated-rows=disabled
+   table stats: rows=150000 size=292.36MB
    columns missing stats: c_orders
    parquet statistics predicates: c_custkey < 10
    parquet dictionary predicates: c_custkey < 10
@@ -331,7 +337,8 @@ PLAN-ROOT SINK
    partitions=1/1 files=4 size=292.36MB
    predicates: !empty(c.c_orders), !empty(c.c_orders)
    predicates on o1: o1.o_orderkey < 5
-   table stats: 150000 rows total
+   stats-rows=150000 extrapolated-rows=disabled
+   table stats: rows=150000 size=292.36MB
    columns missing stats: c_orders, c_orders
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=270B cardinality=150000
@@ -377,7 +384,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
    partitions=1/1 files=4 size=292.36MB
    predicates: !empty(c.c_orders), !empty(c.c_orders)
    predicates on o1: o1.o_orderkey < 5
-   table stats: 150000 rows total
+   stats-rows=150000 extrapolated-rows=disabled
+   table stats: rows=150000 size=292.36MB
    columns missing stats: c_orders, c_orders
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=270B cardinality=150000

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index 31451aa..f5ae46d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -17,9 +17,10 @@ PLAN-ROOT SINK
 |  tuple-ids=1 row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=156.57KB
+   partitions=24/24 files=24 size=178.13KB
    predicates: int_col IS NULL, int_col > 1, int_col > tinyint_col, int_col * rand() > 50
-   table stats: unavailable
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=unavailable
    column stats: unavailable
    parquet statistics predicates: int_col > 1
    parquet dictionary predicates: int_col > 1
@@ -45,9 +46,10 @@ PLAN-ROOT SINK
 |  tuple-ids=1 row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=22/24 files=22 size=143.36KB
+   partitions=22/24 files=22 size=163.12KB
    predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
-   table stats: unavailable
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=unavailable
    columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    parquet statistics predicates: bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), date_string_col > '1993-10-01'
    parquet dictionary predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
@@ -65,20 +67,21 @@ and mod(int_col,50) IN (0,1)
 and id IN (int_col);
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-  PLAN-ROOT SINK
-  |  mem-estimate=0B mem-reservation=0B
-  |
-  01:AGGREGATE [FINALIZE]
-  |  output: count(*)
-  |  mem-estimate=10.00MB mem-reservation=0B
-  |  tuple-ids=1 row-size=8B cardinality=1
-  |
-  00:SCAN HDFS [functional_parquet.alltypes]
-     partitions=24/24 files=24 size=173.09KB
-     predicates: id IN (int_col), id NOT IN (0, 1, 2), string_col IN ('aaaa', 'bbbb', 'cccc', NULL), mod(int_col, 50) IN (0, 1)
-     table stats: unavailable
-     column stats: unavailable
-     parquet dictionary predicates: id NOT IN (0, 1, 2), string_col IN ('aaaa', 'bbbb', 'cccc', NULL), mod(int_col, 50) IN (0, 1)
-     mem-estimate=48.00MB mem-reservation=0B
-     tuple-ids=0 row-size=24B cardinality=unavailable
-====
\ No newline at end of file
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B
+|  tuple-ids=1 row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+   predicates: id IN (int_col), id NOT IN (0, 1, 2), string_col IN ('aaaa', 'bbbb', 'cccc', NULL), mod(int_col, 50) IN (0, 1)
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=unavailable
+   column stats: unavailable
+   parquet dictionary predicates: id NOT IN (0, 1, 2), string_col IN ('aaaa', 'bbbb', 'cccc', NULL), mod(int_col, 50) IN (0, 1)
+   mem-estimate=48.00MB mem-reservation=0B
+   tuple-ids=0 row-size=24B cardinality=unavailable
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 7ae30fa..fb0cd3d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -15,7 +15,8 @@ PLAN-ROOT SINK
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.61MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.61MB
    column stats: all
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -34,7 +35,8 @@ PLAN-ROOT SINK
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.61MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.61MB
    column stats: all
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -56,7 +58,8 @@ PLAN-ROOT SINK
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=718.94MB
    column stats: all
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -75,7 +78,8 @@ PLAN-ROOT SINK
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=718.94MB
    column stats: all
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -98,7 +102,7 @@ PLAN-ROOT SINK
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=100 instances=100
 00:SCAN HBASE [functional_hbase.alltypes]
-   table stats: unavailable
+   table stats: rows=unavailable
    column stats: unavailable
    mem-estimate=1.00GB mem-reservation=0B
    tuple-ids=0 row-size=88B cardinality=14298
@@ -118,7 +122,7 @@ PLAN-ROOT SINK
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=100 instances=200
 00:SCAN HBASE [functional_hbase.alltypes]
-   table stats: unavailable
+   table stats: rows=unavailable
    column stats: unavailable
    mem-estimate=1.00GB mem-reservation=0B
    tuple-ids=0 row-size=88B cardinality=14298
@@ -186,14 +190,16 @@ F02:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 |
 |--02:SCAN HDFS [tpch.lineitem, RANDOM]
 |     partitions=1/1 files=1 size=718.94MB
-|     table stats: 6001215 rows total
+|     stats-rows=6001215 extrapolated-rows=disabled
+|     table stats: rows=6001215 size=718.94MB
 |     column stats: all
 |     mem-estimate=88.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=263B cardinality=6001215
 |
 01:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=718.94MB
    column stats: all
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -217,14 +223,16 @@ F02:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 |
 |--02:SCAN HDFS [tpch.lineitem, RANDOM]
 |     partitions=1/1 files=1 size=718.94MB
-|     table stats: 6001215 rows total
+|     stats-rows=6001215 extrapolated-rows=disabled
+|     table stats: rows=6001215 size=718.94MB
 |     column stats: all
 |     mem-estimate=88.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=263B cardinality=6001215
 |
 01:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=718.94MB
    column stats: all
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -265,7 +273,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.61MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.61MB
    column stats: all
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=6001215
@@ -301,7 +310,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.61MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.61MB
    column stats: all
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=6001215
@@ -333,7 +343,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.61MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.61MB
    column stats: all
    mem-estimate=0B mem-reservation=0B
    tuple-ids=0 row-size=0B cardinality=6001215
@@ -362,7 +373,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.61MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.61MB
    column stats: all
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=0B cardinality=6001215
@@ -392,7 +404,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.61MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.61MB
    column stats: all
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -417,7 +430,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.61MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.61MB
    column stats: all
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -449,7 +463,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.61MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.61MB
    column stats: all
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -475,7 +490,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.61MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.61MB
    column stats: all
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -509,7 +525,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
 |  01:SCAN HDFS [tpch.orders, RANDOM]
 |     partitions=1/1 files=1 size=162.56MB
-|     table stats: 1500000 rows total
+|     stats-rows=1500000 extrapolated-rows=disabled
+|     table stats: rows=1500000 size=162.56MB
 |     column stats: all
 |     mem-estimate=88.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=191B cardinality=1500000
@@ -517,7 +534,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF000 -> l_orderkey
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=718.94MB
    column stats: all
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -554,7 +572,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
 |  01:SCAN HDFS [tpch.orders, RANDOM]
 |     partitions=1/1 files=1 size=162.56MB
-|     table stats: 1500000 rows total
+|     stats-rows=1500000 extrapolated-rows=disabled
+|     table stats: rows=1500000 size=162.56MB
 |     column stats: all
 |     mem-estimate=88.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=191B cardinality=1500000
@@ -562,7 +581,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF000 -> l_orderkey
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=718.94MB
    column stats: all
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -594,14 +614,16 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
 |  01:SCAN HDFS [tpch.orders, RANDOM]
 |     partitions=1/1 files=1 size=162.56MB
-|     table stats: 1500000 rows total
+|     stats-rows=1500000 extrapolated-rows=disabled
+|     table stats: rows=1500000 size=162.56MB
 |     column stats: all
 |     mem-estimate=88.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=191B cardinality=1500000
 |
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=718.94MB
    column stats: all
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -635,14 +657,16 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
 |  01:SCAN HDFS [tpch.orders, RANDOM]
 |     partitions=1/1 files=1 size=162.56MB
-|     table stats: 1500000 rows total
+|     stats-rows=1500000 extrapolated-rows=disabled
+|     table stats: rows=1500000 size=162.56MB
 |     column stats: all
 |     mem-estimate=88.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=191B cardinality=1500000
 |
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=718.94MB
    column stats: all
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
@@ -706,7 +730,8 @@ F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=3
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 00:SCAN HDFS [functional.alltypes, RANDOM]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=5B cardinality=7300
@@ -741,7 +766,8 @@ F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=6
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 00:SCAN HDFS [functional.alltypes, RANDOM]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=5B cardinality=7300
@@ -844,7 +870,8 @@ F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
 |  |
 |  03:SCAN HDFS [tpch.lineitem, RANDOM]
 |     partitions=1/1 files=1 size=718.94MB
-|     table stats: 6001215 rows total
+|     stats-rows=6001215 extrapolated-rows=disabled
+|     table stats: rows=6001215 size=718.94MB
 |     column stats: all
 |     mem-estimate=88.00MB mem-reservation=0B
 |     tuple-ids=3 row-size=16B cardinality=6001215
@@ -862,7 +889,8 @@ F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
 |  F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
 |  00:SCAN HDFS [tpch.customer, RANDOM]
 |     partitions=1/1 files=1 size=23.08MB
-|     table stats: 150000 rows total
+|     stats-rows=150000 extrapolated-rows=disabled
+|     table stats: rows=150000 size=23.08MB
 |     column stats: all
 |     mem-estimate=32.00MB mem-reservation=0B
 |     tuple-ids=0 row-size=42B cardinality=150000
@@ -881,7 +909,8 @@ F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
 |  01:SCAN HDFS [tpch.orders, RANDOM]
 |     partitions=1/1 files=1 size=162.56MB
 |     runtime filters: RF000 -> o_orderkey, RF001 -> o_custkey
-|     table stats: 1500000 rows total
+|     stats-rows=1500000 extrapolated-rows=disabled
+|     table stats: rows=1500000 size=162.56MB
 |     column stats: all
 |     mem-estimate=88.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=50B cardinality=1500000
@@ -894,7 +923,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 02:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF000 -> tpch.lineitem.l_orderkey, RF002 -> l_orderkey
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=718.94MB
    column stats: all
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=2 row-size=16B cardinality=6001215
@@ -968,7 +998,8 @@ F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
 |  |
 |  03:SCAN HDFS [tpch.lineitem, RANDOM]
 |     partitions=1/1 files=1 size=718.94MB
-|     table stats: 6001215 rows total
+|     stats-rows=6001215 extrapolated-rows=disabled
+|     table stats: rows=6001215 size=718.94MB
 |     column stats: all
 |     mem-estimate=88.00MB mem-reservation=0B
 |     tuple-ids=3 row-size=16B cardinality=6001215
@@ -993,7 +1024,8 @@ F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
 |  F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
 |  00:SCAN HDFS [tpch.customer, RANDOM]
 |     partitions=1/1 files=1 size=23.08MB
-|     table stats: 150000 rows total
+|     stats-rows=150000 extrapolated-rows=disabled
+|     table stats: rows=150000 size=23.08MB
 |     column stats: all
 |     mem-estimate=32.00MB mem-reservation=0B
 |     tuple-ids=0 row-size=42B cardinality=150000
@@ -1019,7 +1051,8 @@ F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
 |  01:SCAN HDFS [tpch.orders, RANDOM]
 |     partitions=1/1 files=1 size=162.56MB
 |     runtime filters: RF000 -> o_orderkey, RF001 -> o_custkey
-|     table stats: 1500000 rows total
+|     stats-rows=1500000 extrapolated-rows=disabled
+|     table stats: rows=1500000 size=162.56MB
 |     column stats: all
 |     mem-estimate=88.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=50B cardinality=1500000
@@ -1032,7 +1065,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 02:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF000 -> tpch.lineitem.l_orderkey, RF002 -> l_orderkey
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=718.94MB
    column stats: all
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=2 row-size=16B cardinality=6001215
@@ -1054,7 +1088,8 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 |
 00:SCAN HDFS [functional.alltypes, RANDOM]
    partitions=1/24 files=1 size=20.36KB
-   table stats: 7300 rows total
+   stats-rows=310 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=89B cardinality=310
@@ -1069,7 +1104,8 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 |
 00:SCAN HDFS [functional.alltypes, RANDOM]
    partitions=1/24 files=1 size=20.36KB
-   table stats: 7300 rows total
+   stats-rows=310 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=89B cardinality=310
@@ -1094,7 +1130,8 @@ WRITE TO HDFS [default.dummy_insert, OVERWRITE=false, PARTITION-KEYS=(l_partkey)
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=718.94MB
    column stats: all
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=50B cardinality=6001215
@@ -1114,7 +1151,8 @@ WRITE TO HDFS [default.dummy_insert, OVERWRITE=false, PARTITION-KEYS=(l_partkey)
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
-   table stats: 6001215 rows total
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=718.94MB
    column stats: all
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=50B cardinality=6001215

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test b/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
index b7e22a9..5845af2 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
@@ -13,7 +13,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=97B cardinality=7300
@@ -33,7 +34,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=97B cardinality=7300
@@ -52,7 +54,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=97B cardinality=7300
@@ -73,7 +76,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=97B cardinality=7300
@@ -101,7 +105,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=21B cardinality=7300
@@ -122,7 +127,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=29B cardinality=7300
@@ -142,7 +148,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=97B cardinality=7300
@@ -162,7 +169,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=41B cardinality=7300

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e89d7057/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
index 8257661..6712200 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
@@ -6,8 +6,9 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=3/24 files=3 size=60.68KB
-   table stats: 7300 rows total
+   partitions=3/24 files=3 size=60.02KB
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=32.00MB mem-reservation=0B
    tuple-ids=0 row-size=97B cardinality=730
@@ -20,8 +21,9 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=12/24 files=12 size=240.27KB
-   table stats: 7300 rows total
+   partitions=12/24 files=12 size=239.44KB
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=97B cardinality=3650
@@ -36,9 +38,10 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=12/24 files=12 size=239.26KB
+   partitions=12/24 files=12 size=239.44KB
    predicates: id < 10
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    parquet dictionary predicates: id < 10
    mem-estimate=80.00MB mem-reservation=0B
@@ -53,8 +56,9 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=6/24 files=6 size=119.04KB
-   table stats: 7300 rows total
+   partitions=6/24 files=6 size=120.46KB
+   stats-rows=3650 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=48.00MB mem-reservation=0B
    tuple-ids=0 row-size=97B cardinality=1825
@@ -68,10 +72,11 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=0/24 files=0 size=0B
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=0B mem-reservation=0B
-   tuple-ids=0 row-size=97B cardinality=1
+   tuple-ids=0 row-size=97B cardinality=0
 ====
 # Edge case: sample 1%, at least one file should be selected
 select * from functional.alltypes tablesample system(1) repeatable(1234)
@@ -81,8 +86,9 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=1/24 files=1 size=20.36KB
-   table stats: 7300 rows total
+   partitions=1/24 files=1 size=19.95KB
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=32.00MB mem-reservation=0B
    tuple-ids=0 row-size=97B cardinality=73
@@ -97,7 +103,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=1/24 files=1 size=20.36KB
-   table stats: 7300 rows total
+   stats-rows=3650 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=32.00MB mem-reservation=0B
    tuple-ids=0 row-size=97B cardinality=37
@@ -111,7 +118,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=97B cardinality=7300
@@ -124,8 +132,9 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=3/24 files=3 size=22.53KB
-   table stats: unavailable
+   partitions=3/24 files=3 size=22.05KB
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=unavailable
    column stats: unavailable
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=4B cardinality=unavailable
@@ -146,8 +155,9 @@ PLAN-ROOT SINK
 |  tuple-ids=0 row-size=4B cardinality=10
 |
 |--01:SCAN HDFS [functional.alltypessmall t2]
-|     partitions=1/4 files=1 size=1.57KB
-|     table stats: 100 rows total
+|     partitions=1/4 files=1 size=1.58KB
+|     stats-rows=100 extrapolated-rows=disabled
+|     table stats: rows=100 size=6.32KB
 |     column stats: all
 |     mem-estimate=32.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=4B cardinality=10
@@ -155,7 +165,8 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> t1.id
-   table stats: 7300 rows total
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=4B cardinality=7300
@@ -169,8 +180,9 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=3/24 files=3 size=60.68KB
-   table stats: 7300 rows total
+   partitions=3/24 files=3 size=60.02KB
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
    column stats: all
    mem-estimate=32.00MB mem-reservation=0B
    tuple-ids=0 row-size=4B cardinality=730