You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2017/11/30 01:31:36 UTC

[4/4] impala git commit: IMPALA-5310: Add COMPUTE STATS TABLESAMPLE.

IMPALA-5310: Add COMPUTE STATS TABLESAMPLE.

Adds the TABLESAMPLE clause for COMPUTE STATS.

Syntax:
COMPUTE STATS <table> TABLESAMPLE SYSTEM(<number>) [REPEATABLE(<number>)]

Computes and replaces the table-level row count and total file size,
as well as all table-level column statistics. Existing partition-level
row counts are not modified.
The TABLESAMPLE clause can be used to limit the scanned data volume to
a desired percentage. When sampling, the unmodified results of the
COMPUTE STATS queries are sent to the CatalogServer. There, the stats
are extrapolated before storing them into the HMS so as not to confuse
other engines like Hive/SparkSQL which may rely on the shared HMS
fields being accurate.

Limitations
- Only works for HDFS tables
- TABLESAMPLE is not supported for COMPUTE INCREMENTAL STATS
- TABLESAMPLE requires --enable_stats_extrapolation=true

Changes to EXPLAIN
The stored statistics from the HMS are more clearly displayed under
a 'stored statistics' section. Example:

00:SCAN HDFS [functional.alltypes, RANDOM]
   partitions=24/24 files=24 size=478.45KB
   stored statistics:
     table: rows=7300 size=478.45KB
     partitions: 24/24 rows=7300
     columns: all

Testing:
- added new functional tests
- core/hdfs run passed

Change-Id: I7f3e72471ac563adada4a4156033a85852b7c8b7
Reviewed-on: http://gerrit.cloudera.org:8080/8136
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b3d8a507
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b3d8a507
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b3d8a507

Branch: refs/heads/master
Commit: b3d8a507cb4e5e7887ed55782f5de1d6cdfcd276
Parents: 72ed4fc
Author: Alex Behm <al...@cloudera.com>
Authored: Tue May 23 19:00:13 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Nov 29 22:37:01 2017 +0000

----------------------------------------------------------------------
 be/src/exec/catalog-op-executor.cc              |   17 +-
 common/thrift/JniCatalog.thrift                 |   10 +-
 fe/src/main/cup/sql-parser.cup                  |   10 +-
 .../impala/analysis/ComputeStatsStmt.java       |  212 ++--
 .../org/apache/impala/analysis/TableRef.java    |    2 +-
 .../impala/analysis/TableSampleClause.java      |   13 +-
 .../org/apache/impala/catalog/ColumnStats.java  |    6 +-
 .../org/apache/impala/catalog/HdfsTable.java    |   22 +-
 .../impala/hive/executor/UdfExecutor.java       |    7 +-
 .../impala/planner/DataSourceScanNode.java      |    2 +-
 .../apache/impala/planner/HBaseScanNode.java    |    2 +-
 .../org/apache/impala/planner/HdfsScanNode.java |  207 ++--
 .../org/apache/impala/planner/ScanNode.java     |   57 +-
 .../impala/service/CatalogOpExecutor.java       |  236 +++--
 .../apache/impala/analysis/AnalyzeDDLTest.java  |   35 +-
 .../impala/analysis/AnalyzeStmtsTest.java       |   16 +-
 .../org/apache/impala/analysis/ParserTest.java  |   17 +-
 .../impala/hive/executor/UdfExecutorTest.java   |    1 +
 .../queries/PlannerTest/constant-folding.test   |  102 +-
 .../PlannerTest/fk-pk-join-detection.test       |  189 ++--
 .../queries/PlannerTest/max-row-size.test       |   90 +-
 .../PlannerTest/min-max-runtime-filters.test    |   10 +-
 .../queries/PlannerTest/mt-dop-validation.test  |   76 +-
 .../queries/PlannerTest/parquet-filtering.test  |   67 +-
 .../queries/PlannerTest/partition-pruning.test  |    8 +-
 .../PlannerTest/resource-requirements.test      | 1001 ++++++++++--------
 .../PlannerTest/sort-expr-materialization.test  |   64 +-
 .../PlannerTest/spillable-buffer-sizing.test    |  276 ++---
 .../queries/PlannerTest/tablesample.test        |  100 +-
 .../QueryTest/alter-table-set-column-stats.test |    4 +-
 .../QueryTest/compute-stats-incremental.test    |   16 +-
 .../QueryTest/compute-stats-tablesample.test    |  137 +++
 .../queries/QueryTest/compute-stats.test        |   22 +-
 .../queries/QueryTest/explain-level2.test       |   14 +-
 .../queries/QueryTest/explain-level3.test       |   14 +-
 .../hbase-compute-stats-incremental.test        |    2 +-
 .../queries/QueryTest/hbase-compute-stats.test  |    4 +-
 .../queries/QueryTest/show-stats.test           |    4 +-
 .../queries/QueryTest/stats-extrapolation.test  |   80 +-
 .../queries/QueryTest/truncate-table.test       |    4 +-
 .../custom_cluster/test_stats_extrapolation.py  |   43 +
 tests/metadata/test_explain.py                  |    5 +-
 42 files changed, 1952 insertions(+), 1252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index 252d345..bb9508b 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -117,6 +117,9 @@ Status CatalogOpExecutor::ExecComputeStats(
     update_stats_params.table_stats.__set_total_file_bytes(
         compute_stats_params.total_file_bytes);
   }
+  if (compute_stats_params.__isset.sample_file_bytes) {
+    update_stats_params.__set_sample_file_bytes(compute_stats_params.sample_file_bytes);
+  }
   // col_stats_schema and col_stats_data will be empty if there was no column stats query.
   if (!col_stats_schema.columns.empty()) {
     if (compute_stats_params.is_incremental) {
@@ -200,7 +203,15 @@ void CatalogOpExecutor::HandleDropDataSource(const TDropDataSourceParams& reques
 void CatalogOpExecutor::SetTableStats(const TTableSchema& tbl_stats_schema,
     const TRowSet& tbl_stats_data, const vector<TPartitionStats>& existing_part_stats,
     TAlterTableUpdateStatsParams* params) {
-  // Accumulate total number of rows in the table.
+  if (tbl_stats_data.rows.size() == 1 && tbl_stats_data.rows[0].colVals.size() == 1) {
+    // Unpartitioned table. Only set table stats, but no partition stats.
+    // The first column is the COUNT(*) expr of the original query.
+    params->table_stats.__set_num_rows(tbl_stats_data.rows[0].colVals[0].i64Val.value);
+    params->__isset.table_stats = true;
+    return;
+  }
+
+  // Accumulate total number of rows in the partitioned table.
   long total_num_rows = 0;
   // Set per-partition stats.
   for (const TRow& row: tbl_stats_data.rows) {
@@ -219,13 +230,13 @@ void CatalogOpExecutor::SetTableStats(const TTableSchema& tbl_stats_schema,
     params->partition_stats[partition_key_vals].stats.__set_num_rows(num_rows);
     total_num_rows += num_rows;
   }
+  params->__isset.partition_stats = true;
 
+  // Add row counts of existing partitions that are not going to be modified.
   for (const TPartitionStats& existing_stats: existing_part_stats) {
     total_num_rows += existing_stats.stats.num_rows;
   }
 
-  params->__isset.partition_stats = true;
-
   // Set per-table stats.
   params->table_stats.__set_num_rows(total_num_rows);
   params->__isset.table_stats = true;

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/common/thrift/JniCatalog.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 37f8088..18f30fe 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -283,7 +283,7 @@ struct TAlterTableUpdateStatsParams {
   2: optional CatalogObjects.TTableStats table_stats
 
   // Partition-level stats. Maps from a list of partition-key values
-  // to its partition stats.
+  // to its partition stats. Only set for partitioned Hdfs tables.
   3: optional map<list<string>, CatalogObjects.TPartitionStats> partition_stats
 
   // Column-level stats. Maps from column name to column stats.
@@ -295,6 +295,10 @@ struct TAlterTableUpdateStatsParams {
 
   // If true, this is the result of an incremental stats computation
   6: optional bool is_incremental
+
+  // Sum of file sizes in the table sample. Only set when TABLESAMPLE was specified.
+  // Only set for tables of type HDFS_TABLE and if is_incremental is false.
+  7: optional i64 sample_file_bytes
 }
 
 // Parameters for ALTER TABLE SET [PARTITION partitionSet] CACHED|UNCACHED
@@ -501,6 +505,10 @@ struct TComputeStatsParams {
   // Sum of file sizes in the table. Only set for tables of type HDFS_TABLE and if
   // is_incremental is false.
   9: optional i64 total_file_bytes
+
+  // Sum of file sizes in the table sample. Only set when TABLESAMPLE was specified.
+  // Only set for tables of type HDFS_TABLE and if is_incremental is false.
+  10: optional i64 sample_file_bytes
 }
 
 // Parameters for CREATE/DROP ROLE

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index f7be00c..a013326 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -1737,12 +1737,12 @@ cascade_val ::=
   ;
 
 compute_stats_stmt ::=
-  KW_COMPUTE KW_STATS table_name:table
-  {: RESULT = new ComputeStatsStmt(table); :}
+  KW_COMPUTE KW_STATS table_name:table opt_tablesample:tblsmpl
+  {: RESULT = ComputeStatsStmt.createStatsStmt(table, tblsmpl); :}
   | KW_COMPUTE KW_INCREMENTAL KW_STATS table_name:table
-  {: RESULT = new ComputeStatsStmt(table, true, null); :}
-  | KW_COMPUTE KW_INCREMENTAL KW_STATS table_name:table partition_set:partitions
-  {: RESULT = new ComputeStatsStmt(table, true, partitions); :}
+  {: RESULT = ComputeStatsStmt.createIncrementalStatsStmt(table, null); :}
+  | KW_COMPUTE KW_INCREMENTAL KW_STATS table_name:table partition_set:parts
+  {: RESULT = ComputeStatsStmt.createIncrementalStatsStmt(table, parts); :}
   ;
 
 drop_stats_stmt ::=

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index fec9ac9..64dc9bf 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -18,9 +18,10 @@
 package org.apache.impala.analysis;
 
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.authorization.Privilege;
@@ -28,12 +29,14 @@ import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.TComputeStatsParams;
 import org.apache.impala.thrift.TPartitionStats;
 import org.apache.impala.thrift.TTableName;
@@ -45,14 +48,41 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 /**
- * Represents a COMPUTE STATS <table> and COMPUTE INCREMENTAL STATS <table> [PARTITION
- * <part_spec>] statement for statistics collection. The former statement gathers all
- * table and column stats for a given table and stores them in the Metastore via the
- * CatalogService. All existing stats for that table are replaced and no existing stats
- * are reused. The latter, incremental form, similarly computes stats for the whole table
- * but does so by re-using stats from partitions which have 'valid' statistics. Statistics
- * are 'valid' currently if they exist, in the future they may be expired based on recency
- * etc.
+ * Represents the following statements for statistics collection. Which statistics
+ * are computed and stored depends on the statement type (incremental or not), the
+ * clauses used (sampling, partition spec), as well as whether stats extrapolation
+ * is enabled or not (--enable_stats_extrapolation).
+ *
+ * 1. COMPUTE STATS <table> [TABLESAMPLE SYSTEM(<perc>) [REPEATABLE(<seed>)]]
+ * - Stats extrapolation enabled:
+ *   Computes and replaces the table-level row count and total file size, as well as all
+ *   table-level column statistics. Existing partition-objects and their row count are
+ *   not modified at all. The TABLESAMPLE clause can be used to limit the scanned data
+ *   volume to a desired percentage. When sampling, the results of the COMPUTE STATS
+ *   queries are sent to the CatalogServer. There, the stats are extrapolated before
+ *   storing them into the HMS so as not to confuse other engines like Hive/SparkSQL
+ *   which may rely on the shared HMS fields representing to the whole table and not
+ *   a sample. See {@link CatalogOpExecutor#getExtrapolatedStatsVal}.
+ * - Stats extrapolation disabled:
+ *   Computes and replaces the table-level row count and total file size, the row counts
+ *   for all partitions (if applicable), as well as all table-level column statistics.
+ *   The TABLESAMPLE clause is not supported to simplify implementation and testing. In
+ *   particular, we want to avoid implementing and testing code for updating all HMS
+ *   partitions to set the extrapolated numRows statistic. Altering many partitions is
+ *   expensive and so should be avoided in favor of enabling extrapolation.
+ *
+ * 2. COMPUTE INCREMENTAL STATS <table> [PARTITION <part_spec>]
+ * - Stats extrapolation enabled:
+ *   Not supported for now to keep the logic/code simple.
+ * - Stats extrapolation disabled:
+ *   Computes and replaces the table and partition-level row counts. Computes mergeable
+ *   per-partition column statistics (HLL intermediate state) and stores them in the HMS.
+ *   Computes and replaces the table-level column statistics by merging the
+ *   partition-level column statistics.
+ *   Instead of recomputing those statistics for all partitions, this command reuses
+ *   existing statistics from partitions which already have incremental statistics.
+ *   If a set of partitions is specified, then the incremental statistics for those
+ *   partitions are recomputed (then merged into the table-level statistics).
  *
  * TODO: Allow more coarse/fine grained (db, column)
  * TODO: Compute stats on complex types.
@@ -66,10 +96,15 @@ public class ComputeStatsStmt extends StatementBase {
           "column definitions, e.g., using the result of 'SHOW CREATE TABLE'";
 
   protected final TableName tableName_;
+  protected final TableSampleClause sampleParams_;
 
   // Set during analysis.
   protected Table table_;
 
+  // Total number of bytes in the file sample of the target HDFS table. Set to -1 for
+  // non-HDFS tables or when TABLESAMPLE is not specified.
+  protected long sampleFileBytes_ = -1;
+
   // The Null count is not currently being used in optimization or run-time,
   // and compute stats runs 2x faster in many cases when not counting NULLs.
   private static final boolean COUNT_NULLS = false;
@@ -86,9 +121,10 @@ public class ComputeStatsStmt extends StatementBase {
   private boolean isIncremental_ = false;
 
   // If true, expect the compute stats process to produce output for all partitions in the
-  // target table (only meaningful, therefore, if partitioned). This is always true for
-  // non-incremental computations. If set, expectedPartitions_ will be empty - the point
-  // of this flag is to optimise the case where all partitions are targeted.
+  // target table. In that case, 'expectedPartitions_' will be empty. The point of this
+  // flag is to optimize the case where all partitions are targeted.
+  // False for unpartitioned HDFS tables, non-HDFS tables or when stats extrapolation
+  // is enabled.
   private boolean expectAllPartitions_ = false;
 
   // The list of valid partition statistics that can be used in an incremental computation
@@ -114,26 +150,18 @@ public class ComputeStatsStmt extends StatementBase {
   private static final int MAX_INCREMENTAL_PARTITIONS = 1000;
 
   /**
-   * Constructor for the non-incremental form of COMPUTE STATS.
+   * Should only be constructed via static creation functions.
    */
-  protected ComputeStatsStmt(TableName tableName) {
-    this(tableName, false, null);
-  }
-
-  /**
-   * Constructor for the incremental form of COMPUTE STATS. If isIncremental is true,
-   * statistics will be recomputed incrementally; if false they will be recomputed for the
-   * whole table. The partition set partitionSet can specify a list of partitions whose
-   * stats should be recomputed.
-   */
-  protected ComputeStatsStmt(TableName tableName, boolean isIncremental,
-      PartitionSet partitionSet) {
+  private ComputeStatsStmt(TableName tableName, TableSampleClause sampleParams,
+      boolean isIncremental, PartitionSet partitionSet) {
     Preconditions.checkState(tableName != null && !tableName.isEmpty());
     Preconditions.checkState(isIncremental || partitionSet == null);
-    this.tableName_ = tableName;
-    this.table_ = null;
-    this.isIncremental_ = isIncremental;
-    this.partitionSet_ = partitionSet;
+    Preconditions.checkState(!isIncremental || sampleParams == null);
+    tableName_ = tableName;
+    sampleParams_ = sampleParams;
+    table_ = null;
+    isIncremental_ = isIncremental;
+    partitionSet_ = partitionSet;
     if (partitionSet_ != null) {
       partitionSet_.setTableName(tableName);
       partitionSet_.setPrivilegeRequirement(Privilege.ALTER);
@@ -141,18 +169,21 @@ public class ComputeStatsStmt extends StatementBase {
   }
 
   /**
-   * Utility method for constructing the child queries to add partition columns to both a
-   * select list and a group-by list; the former are wrapped in a cast to a string.
+   * Returns a stmt for COMPUTE STATS. The optional 'sampleParams' indicates whether the
+   * stats should be computed with table sampling.
    */
-  private void addPartitionCols(HdfsTable table, List<String> selectList,
-      List<String> groupByCols) {
-    for (int i = 0; i < table.getNumClusteringCols(); ++i) {
-      String colRefSql = ToSqlUtils.getIdentSql(table.getColumns().get(i).getName());
-      groupByCols.add(colRefSql);
-      // For the select list, wrap the group by columns in a cast to string because
-      // the Metastore stores them as strings.
-      selectList.add(colRefSql);
-    }
+  public static ComputeStatsStmt createStatsStmt(TableName tableName,
+      TableSampleClause sampleParams) {
+    return new ComputeStatsStmt(tableName, sampleParams, false, null);
+  }
+
+  /**
+   * Returns a stmt for COMPUTE INCREMENTAL STATS. The optional 'partitioSet' specifies a
+   * set of partitions whose stats should be computed.
+   */
+  public static ComputeStatsStmt createIncrementalStatsStmt(TableName tableName,
+      PartitionSet partitionSet) {
+    return new ComputeStatsStmt(tableName, null, true, partitionSet);
   }
 
   private List<String> getBaseColumnStatsQuerySelectList(Analyzer analyzer) {
@@ -161,7 +192,6 @@ public class ComputeStatsStmt extends StatementBase {
     // cannot store them as part of the non-partition column stats. For HBase tables,
     // include the single clustering column (the row key).
     int startColIdx = (table_ instanceof HBaseTable) ? 0 : table_.getNumClusteringCols();
-    final String ndvUda = isIncremental_ ? "NDV_NO_FINALIZE" : "NDV";
 
     for (int i = startColIdx; i < table_.getColumns().size(); ++i) {
       Column c = table_.getColumns().get(i);
@@ -170,7 +200,11 @@ public class ComputeStatsStmt extends StatementBase {
       // NDV approximation function. Add explicit alias for later identification when
       // updating the Metastore.
       String colRefSql = ToSqlUtils.getIdentSql(c.getName());
-      columnStatsSelectList.add(ndvUda + "(" + colRefSql + ") AS " + colRefSql);
+      if (isIncremental_) {
+        columnStatsSelectList.add("NDV_NO_FINALIZE(" + colRefSql + ") AS " + colRefSql);
+      } else {
+        columnStatsSelectList.add("NDV(" + colRefSql + ") AS " + colRefSql);
+      }
 
       if (COUNT_NULLS) {
         // Count the number of NULL values.
@@ -266,12 +300,10 @@ public class ComputeStatsStmt extends StatementBase {
       isIncremental_ = false;
     }
 
-    // Ensure that we write an entry for every partition if this isn't incremental
-    if (!isIncremental_) expectAllPartitions_ = true;
-
     HdfsTable hdfsTable = null;
     if (table_ instanceof HdfsTable) {
       hdfsTable = (HdfsTable)table_;
+      if (hdfsTable.isAvroTable()) checkIncompleteAvroSchema(hdfsTable);
       if (isIncremental_ && hdfsTable.getNumClusteringCols() == 0 &&
           partitionSet_ != null) {
         throw new AnalysisException(String.format(
@@ -385,6 +417,10 @@ public class ComputeStatsStmt extends StatementBase {
         analyzer.addWarning("No partitions selected for incremental stats update");
         return;
       }
+    } else {
+      // Not computing incremental stats.
+      expectAllPartitions_ = !(table_ instanceof HdfsTable) ||
+          !BackendConfig.INSTANCE.enableStatsExtrapolation();
     }
 
     if (filterPreds.size() > MAX_INCREMENTAL_PARTITIONS) {
@@ -396,31 +432,31 @@ public class ComputeStatsStmt extends StatementBase {
       validPartStats_.clear();
     }
 
-    List<String> groupByCols = Lists.newArrayList();
-    List<String> partitionColsSelectList = Lists.newArrayList();
-    // Only add group by clause for HdfsTables.
-    if (hdfsTable != null) {
-      if (hdfsTable.isAvroTable()) checkIncompleteAvroSchema(hdfsTable);
-      addPartitionCols(hdfsTable, partitionColsSelectList, groupByCols);
-    }
 
     // Query for getting the per-partition row count and the total row count.
     StringBuilder tableStatsQueryBuilder = new StringBuilder("SELECT ");
-    List<String> tableStatsSelectList = Lists.newArrayList();
-    tableStatsSelectList.add("COUNT(*)");
-
-    tableStatsSelectList.addAll(partitionColsSelectList);
+    List<String> tableStatsSelectList = Lists.newArrayList("COUNT(*)");
+    // Add group by columns for incremental stats or with extrapolation disabled.
+    List<String> groupByCols = Lists.newArrayList();
+    if (!updateTableStatsOnly()) {
+      for (Column partCol: hdfsTable.getClusteringColumns()) {
+        groupByCols.add(ToSqlUtils.getIdentSql(partCol.getName()));
+      }
+      tableStatsSelectList.addAll(groupByCols);
+    }
     tableStatsQueryBuilder.append(Joiner.on(", ").join(tableStatsSelectList));
-    tableStatsQueryBuilder.append(" FROM " + tableName_.toSql());
+    // Tablesample clause to be used for all child queries.
+    String tableSampleSql = analyzeTableSampleClause(analyzer);
+    tableStatsQueryBuilder.append(" FROM " + tableName_.toSql() + tableSampleSql);
 
     // Query for getting the per-column NDVs and number of NULLs.
     List<String> columnStatsSelectList = getBaseColumnStatsQuerySelectList(analyzer);
 
-    if (isIncremental_) columnStatsSelectList.addAll(partitionColsSelectList);
+    if (isIncremental_) columnStatsSelectList.addAll(groupByCols);
 
     StringBuilder columnStatsQueryBuilder = new StringBuilder("SELECT ");
     columnStatsQueryBuilder.append(Joiner.on(", ").join(columnStatsSelectList));
-    columnStatsQueryBuilder.append(" FROM " + tableName_.toSql());
+    columnStatsQueryBuilder.append(" FROM " + tableName_.toSql() + tableSampleSql);
 
     // Add the WHERE clause to filter out partitions that we don't want to compute
     // incremental stats for. While this is a win in most situations, we would like to
@@ -459,6 +495,43 @@ public class ComputeStatsStmt extends StatementBase {
   }
 
   /**
+   * Analyzes the TABLESAMPLE clause and computes the sample to set 'sampleFileBytes_'.
+   * Returns the TABLESAMPLE SQL to be used for all child queries or an empty string if
+   * not sampling. If sampling, the returned SQL includes a fixed random seed so all
+   * child queries generate a consistent sample, even if the user did not originally
+   * specify REPEATABLE.
+   * No-op if this statement has no TABLESAMPLE clause.
+   */
+  private String analyzeTableSampleClause(Analyzer analyzer) throws AnalysisException {
+    if (sampleParams_ == null) return "";
+    if (!(table_ instanceof HdfsTable)) {
+      throw new AnalysisException("TABLESAMPLE is only supported on HDFS tables.");
+    }
+    if (!BackendConfig.INSTANCE.enableStatsExtrapolation()) {
+      throw new AnalysisException(
+          "COMPUTE STATS TABLESAMPLE requires --enable_stats_extrapolation=true. " +
+          "Stats extrapolation is currently disabled.");
+    }
+    sampleParams_.analyze(analyzer);
+    long sampleSeed;
+    if (sampleParams_.hasRandomSeed()) {
+      sampleSeed = sampleParams_.getRandomSeed();
+    } else {
+      sampleSeed = System.currentTimeMillis();
+    }
+
+    // Compute the sample of files and set 'sampleFileBytes_'.
+    HdfsTable hdfsTable = (HdfsTable) table_;
+    Map<Long, List<FileDescriptor>> sample = hdfsTable.getFilesSample(
+        hdfsTable.getPartitions(), sampleParams_.getPercentBytes(), sampleSeed);
+    sampleFileBytes_ = 0;
+    for (List<FileDescriptor> fds: sample.values()) {
+      for (FileDescriptor fd: fds) sampleFileBytes_ += fd.getFileLength();
+    }
+    return " " + sampleParams_.toSql(sampleSeed);
+  }
+
+  /**
    * Checks whether the column definitions from the CREATE TABLE stmt match the columns
    * in the Avro schema. If there is a mismatch, then COMPUTE STATS cannot update the
    * statistics in the Metastore's backend DB due to HIVE-6308. Throws an
@@ -526,6 +599,15 @@ public class ComputeStatsStmt extends StatementBase {
   }
 
   /**
+   * Returns true if we are only updating statistics at the table level and not at
+   * the partition level.
+   */
+  private boolean updateTableStatsOnly() {
+    if (!(table_ instanceof HdfsTable)) return true;
+    return !isIncremental_ && BackendConfig.INSTANCE.enableStatsExtrapolation();
+  }
+
+  /**
    * Returns true if the given column should be ignored for the purpose of computing
    * column stats. Columns with an invalid/unsupported/complex type are ignored.
    * For example, complex types in an HBase-backed table will appear as invalid types.
@@ -559,7 +641,9 @@ public class ComputeStatsStmt extends StatementBase {
   @Override
   public String toSql() {
     if (!isIncremental_) {
-      return "COMPUTE STATS " + tableName_.toSql();
+      String tblsmpl = "";
+      if (sampleParams_ != null) tblsmpl = " " + sampleParams_.toSql();
+      return "COMPUTE STATS " + tableName_.toSql() + tblsmpl;
     } else {
       return "COMPUTE INCREMENTAL STATS " + tableName_.toSql() +
           partitionSet_ == null ? "" : partitionSet_.toSql();
@@ -575,7 +659,6 @@ public class ComputeStatsStmt extends StatementBase {
     } else {
       params.setCol_stats_queryIsSet(false);
     }
-
     params.setIs_incremental(isIncremental_);
     params.setExisting_part_stats(validPartStats_);
     params.setExpect_all_partitions(expectAllPartitions_);
@@ -583,8 +666,13 @@ public class ComputeStatsStmt extends StatementBase {
     if (isIncremental_) {
       params.setNum_partition_cols(((HdfsTable)table_).getNumClusteringCols());
     }
-    if (table_ instanceof HdfsTable && !isIncremental_) {
+
+    if (table_ instanceof HdfsTable) {
       params.setTotal_file_bytes(((HdfsTable)table_).getTotalHdfsBytes());
+      if (sampleParams_ != null) {
+        Preconditions.checkState(sampleFileBytes_ >= 0);
+        if (sampleFileBytes_ != -1) params.setSample_file_bytes(sampleFileBytes_);
+      }
     }
     return params;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/main/java/org/apache/impala/analysis/TableRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
index 4dc9e34..80cdea8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
@@ -355,7 +355,7 @@ public class TableRef implements ParseNode {
     if (!(this instanceof BaseTableRef)
         || !(resolvedPath_.destTable() instanceof HdfsTable)) {
       throw new AnalysisException(
-          "TABLESAMPLE is only supported on HDFS base tables: " + getUniqueAlias());
+          "TABLESAMPLE is only supported on HDFS tables: " + getUniqueAlias());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/main/java/org/apache/impala/analysis/TableSampleClause.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableSampleClause.java b/fe/src/main/java/org/apache/impala/analysis/TableSampleClause.java
index 5e19ce8..df388be 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableSampleClause.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableSampleClause.java
@@ -17,9 +17,10 @@
 
 package org.apache.impala.analysis;
 
-import com.google.common.base.Preconditions;
 import org.apache.impala.common.AnalysisException;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Represents a TABLESAMPLE clause.
  *
@@ -62,10 +63,16 @@ public class TableSampleClause implements ParseNode {
   }
 
   @Override
-  public String toSql() {
+  public String toSql() { return toSql(randomSeed_); }
+
+  /**
+   * Prints the SQL of this TABLESAMPLE clause. The optional REPEATABLE clause is
+   * included if 'randomSeed' is non-NULL.
+   */
+  public String toSql(Long randomSeed) {
     StringBuilder builder = new StringBuilder();
     builder.append("TABLESAMPLE SYSTEM(" + percentBytes_ + ")");
-    if (randomSeed_ != null) builder.append(" REPEATABLE(" + randomSeed_ + ")");
+    if (randomSeed != null) builder.append(" REPEATABLE(" + randomSeed + ")");
     return builder.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
index ec4c0f5..dfaaf66 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
@@ -26,12 +26,10 @@ import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.thrift.TColumnStats;
+
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
@@ -157,7 +155,7 @@ public class ColumnStats {
 
   public long getNumDistinctValues() { return numDistinctValues_; }
   public void setNumDistinctValues(long numDistinctValues) {
-    this.numDistinctValues_ = numDistinctValues;
+    numDistinctValues_ = numDistinctValues;
   }
   public void setNumNulls(long numNulls) { numNulls_ = numNulls; }
   public double getAvgSerializedSize() { return avgSerializedSize_; }

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index ab1511b..c2417f6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -30,11 +30,10 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
-
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
@@ -144,23 +143,23 @@ public class HdfsTable extends Table {
   // Array of sorted maps storing the association between partition values and
   // partition ids. There is one sorted map per partition key. It is only populated if
   // this table object is stored in ImpaladCatalog.
-  private ArrayList<TreeMap<LiteralExpr, HashSet<Long>>> partitionValuesMap_ =
+  private final ArrayList<TreeMap<LiteralExpr, HashSet<Long>>> partitionValuesMap_ =
       Lists.newArrayList();
 
   // Array of partition id sets that correspond to partitions with null values
   // in the partition keys; one set per partition key. It is not populated if the table is
   // stored in the catalog server.
-  private ArrayList<HashSet<Long>> nullPartitionIds_ = Lists.newArrayList();
+  private final ArrayList<HashSet<Long>> nullPartitionIds_ = Lists.newArrayList();
 
   // Map of partition ids to HdfsPartitions.
-  private HashMap<Long, HdfsPartition> partitionMap_ = Maps.newHashMap();
+  private final HashMap<Long, HdfsPartition> partitionMap_ = Maps.newHashMap();
 
   // Map of partition name to HdfsPartition object. Used for speeding up
   // table metadata loading.
-  private HashMap<String, HdfsPartition> nameToPartitionMap_ = Maps.newHashMap();
+  private final HashMap<String, HdfsPartition> nameToPartitionMap_ = Maps.newHashMap();
 
   // Store all the partition ids of an HdfsTable.
-  private HashSet<Long> partitionIds_ = Sets.newHashSet();
+  private final HashSet<Long> partitionIds_ = Sets.newHashSet();
 
   // Estimate (in bytes) of the incremental stats size per column per partition
   public static final long STATS_SIZE_PER_COLUMN_BYTES = 400;
@@ -218,7 +217,7 @@ public class HdfsTable extends Table {
   // File/Block metadata loading stats for a single HDFS path.
   private class FileMetadataLoadStats {
     // Path corresponding to this metadata load request.
-    private Path hdfsPath;
+    private final Path hdfsPath;
 
     // Number of files for which the metadata was loaded.
     public int loadedFiles = 0;
@@ -1377,6 +1376,7 @@ public class HdfsTable extends Table {
     return partsByPath;
   }
 
+  @Override
   public void setTableStats(org.apache.hadoop.hive.metastore.api.Table msTbl) {
     super.setTableStats(msTbl);
     // For unpartitioned tables set the numRows in its partitions
@@ -2065,8 +2065,8 @@ public class HdfsTable extends Table {
    * The given 'randomSeed' is used for random number generation.
    * The 'percentBytes' parameter must be between 0 and 100.
    */
-  public Map<Long, List<FileDescriptor>> getFilesSample(List<HdfsPartition> inputParts,
-      long percentBytes, long randomSeed) {
+  public Map<Long, List<FileDescriptor>> getFilesSample(
+      Collection<HdfsPartition> inputParts, long percentBytes, long randomSeed) {
     Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
 
     // Conservative max size for Java arrays. The actual maximum varies

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/main/java/org/apache/impala/hive/executor/UdfExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/UdfExecutor.java b/fe/src/main/java/org/apache/impala/hive/executor/UdfExecutor.java
index 954dfb0..cfe101b 100644
--- a/fe/src/main/java/org/apache/impala/hive/executor/UdfExecutor.java
+++ b/fe/src/main/java/org/apache/impala/hive/executor/UdfExecutor.java
@@ -37,17 +37,16 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.log4j.Logger;
-import org.apache.thrift.protocol.TBinaryProtocol;
-
 import org.apache.impala.catalog.Type;
-import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.JniUtil;
 import org.apache.impala.thrift.THiveUdfExecutorCtorParams;
 import org.apache.impala.thrift.TPrimitiveType;
 import org.apache.impala.util.UnsafeUtil;
+import org.apache.log4j.Logger;
+import org.apache.thrift.protocol.TBinaryProtocol;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index e6679da..5615b6e 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -357,7 +357,7 @@ public class DataSourceScanNode extends ScanNode {
 
     // Add table and column stats in verbose mode.
     if (detailLevel == TExplainLevel.VERBOSE) {
-      output.append(getStatsExplainString(prefix, detailLevel));
+      output.append(getStatsExplainString(prefix));
       output.append("\n");
     }
     return output.toString();

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index d2e47ad..c0b81ca 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -443,7 +443,7 @@ public class HBaseScanNode extends ScanNode {
       }
     }
     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
-      output.append(getStatsExplainString(detailPrefix, detailLevel));
+      output.append(getStatsExplainString(detailPrefix));
       output.append("\n");
     }
     return output.toString();

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 1e0dd72..964119a 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -49,9 +49,9 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
-import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.FileSystemUtil;
@@ -74,6 +74,7 @@ import org.apache.impala.thrift.TReplicaPreference;
 import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocationList;
+import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.MembershipSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,18 +116,20 @@ import com.google.common.collect.Sets;
 public class HdfsScanNode extends ScanNode {
   private final static Logger LOG = LoggerFactory.getLogger(HdfsScanNode.class);
 
+  private static final Configuration CONF = new Configuration();
+
   // Maximum number of I/O buffers per thread executing this scan.
   // TODO: it's unclear how this was chosen - this seems like a very high number
-  private final static long MAX_IO_BUFFERS_PER_THREAD = 10;
+  private static final long MAX_IO_BUFFERS_PER_THREAD = 10;
 
   // Maximum number of thread tokens per core that may be used to spin up extra scanner
   // threads. Corresponds to the default value of --num_threads_per_core in the backend.
-  private final static int MAX_THREAD_TOKENS_PER_CORE = 3;
+  private static final int MAX_THREAD_TOKENS_PER_CORE = 3;
 
   // Factor capturing the worst-case deviation from a uniform distribution of scan ranges
   // among nodes. The factor of 1.2 means that a particular node may have 20% more
   // scan ranges than would have been estimated assuming a uniform distribution.
-  private final static double SCAN_RANGE_SKEW_FACTOR = 1.2;
+  private static final double SCAN_RANGE_SKEW_FACTOR = 1.2;
 
   // The minimum amount of memory we estimate a scan will use. The number is
   // derived experimentally: running metadata-only Parquet count(*) scans on TPC-H
@@ -155,9 +158,10 @@ public class HdfsScanNode extends ScanNode {
   private long totalFiles_ = 0;
   private long totalBytes_ = 0;
 
-  // Input cardinality based on the partition row counts or extrapolation.
-  // -1 if invalid.
-  private long statsNumRows_ = -1;
+  // Input cardinality based on the partition row counts or extrapolation. -1 if invalid.
+  // Both values can be valid to report them in the explain plan, but only one of them is
+  // used for determining the scan cardinality.
+  private long partitionNumRows_ = -1;
   private long extrapolatedNumRows_ = -1;
 
   // True if this scan node should use the MT implementation in the backend.
@@ -180,9 +184,12 @@ public class HdfsScanNode extends ScanNode {
 
   // Map from SlotIds to indices in PlanNodes.conjuncts_ that are eligible for
   // dictionary filtering
-  private Map<Integer, List<Integer>> dictionaryFilterConjuncts_ =
+  private final Map<Integer, List<Integer>> dictionaryFilterConjuncts_ =
       Maps.newLinkedHashMap();
 
+  // Number of partitions that have the row count statistic.
+  private int numPartitionsWithNumRows_ = 0;
+
   // Indicates corrupt table stats based on the number of non-empty scan ranges and
   // numRows set to 0. Set in computeStats().
   private boolean hasCorruptTableStats_;
@@ -197,16 +204,13 @@ public class HdfsScanNode extends ScanNode {
   private int numFilesNoDiskIds_ = 0;
   private int numPartitionsNoDiskIds_ = 0;
 
-  private static final Configuration CONF = new Configuration();
-
-
   // List of conjuncts for min/max values of parquet::Statistics, that are used to skip
   // data when scanning Parquet files.
-  private List<Expr> minMaxConjuncts_ = Lists.newArrayList();
+  private final List<Expr> minMaxConjuncts_ = Lists.newArrayList();
 
   // List of PlanNode conjuncts that have been transformed into conjuncts in
   // 'minMaxConjuncts_'.
-  private List<Expr> minMaxOriginalConjuncts_ = Lists.newArrayList();
+  private final List<Expr> minMaxOriginalConjuncts_ = Lists.newArrayList();
 
   // Tuple that is used to materialize statistics when scanning Parquet files. For each
   // column it can contain 0, 1, or 2 slots, depending on whether the column needs to be
@@ -649,7 +653,6 @@ public class HdfsScanNode extends ScanNode {
    */
   private Set<HdfsFileFormat> computeScanRangeLocations(Analyzer analyzer)
       throws ImpalaRuntimeException {
-    List<HdfsPartition> partitions = partitions_;
     Map<Long, List<FileDescriptor>> sampledFiles = null;
     if (sampleParams_ != null) {
       long percentBytes = sampleParams_.getPercentBytes();
@@ -660,18 +663,12 @@ public class HdfsScanNode extends ScanNode {
         randomSeed = System.currentTimeMillis();
       }
       sampledFiles = tbl_.getFilesSample(partitions_, percentBytes, randomSeed);
-      if (sampledFiles.size() != partitions_.size()) {
-        partitions = Lists.newArrayListWithCapacity(sampledFiles.size());
-        for (Long partId: sampledFiles.keySet()) {
-          partitions.add(tbl_.getPartitionMap().get(partId));
-        }
-      }
     }
 
     long maxScanRangeLength = analyzer.getQueryCtx().client_request.getQuery_options()
         .getMax_scan_range_length();
     scanRanges_ = Lists.newArrayList();
-    numPartitions_ = partitions.size();
+    numPartitions_ = (sampledFiles != null) ? sampledFiles.size() : partitions_.size();
     totalFiles_ = 0;
     totalBytes_ = 0;
     Set<HdfsFileFormat> fileFormats = Sets.newHashSet();
@@ -765,11 +762,21 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
-   * Computes and sets the following members.
-   * inputCardinality_, cardinality_, numHosts_, statsNumRows_, extrapolatedNumRows_,
-   * numPartitionsMissingStats_, and hasCorruptTableStats_.
+   * Computes the average row size, input and output cardinalities, and estimates the
+   * number of nodes.
+   * Requires that computeScanRangeLocations() has been called.
+   */
+  @Override
+  public void computeStats(Analyzer analyzer) {
+    Preconditions.checkNotNull(scanRanges_);
+    super.computeStats(analyzer);
+    computeCardinalities();
+    computeNumNodes(analyzer, cardinality_);
+  }
+
+  /**
+   * Computes and sets the input and output cardinalities.
    *
-   * Row count extrapolation
    * If available, table-level row count and file bytes statistics are used for
    * extrapolating the input cardinality (before conjuncts). The extrapolation is based
    * on the total number of bytes to be scanned and is intended to address the following
@@ -779,77 +786,22 @@ public class HdfsScanNode extends ScanNode {
    * since the last stats collection.
    * Otherwise, the input cardinality is based on the per-partition row count stats
    * and/or the table-level row count stats, depending on which of those are available.
-   * Partitions without stats are ignored.
-   */
-  @Override
-  public void computeStats(Analyzer analyzer) {
-    super.computeStats(analyzer);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("collecting partitions for table " + tbl_.getName());
-    }
-    numPartitionsMissingStats_ = 0;
-    statsNumRows_ = -1;
-    if (tbl_.getNumClusteringCols() == 0) {
-      statsNumRows_ = tbl_.getNumRows();
-      if (statsNumRows_ < -1 || (statsNumRows_ == 0 && tbl_.getTotalHdfsBytes() > 0)) {
-        hasCorruptTableStats_ = true;
-      }
-      if (!partitions_.isEmpty()) {
-        Preconditions.checkState(partitions_.size() == 1);
-      }
-    } else {
-      for (HdfsPartition p: partitions_) {
-        // Check for corrupt table stats
-        if (p.getNumRows() < -1  || (p.getNumRows() == 0 && p.getSize() > 0))  {
-          hasCorruptTableStats_ = true;
-        }
-        // ignore partitions with missing stats in the hope they don't matter
-        // enough to change the planning outcome
-        if (p.getNumRows() > -1) {
-          if (statsNumRows_ == -1) statsNumRows_ = 0;
-          statsNumRows_ = checkedAdd(statsNumRows_, p.getNumRows());
-        } else {
-          ++numPartitionsMissingStats_;
-        }
-      }
-    }
-    extrapolatedNumRows_ = tbl_.getExtrapolatedNumRows(totalBytes_);
-    computeCardinalities();
-    computeNumNodes(analyzer, cardinality_);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("computeStats HdfsScan: #nodes=" + Integer.toString(numNodes_));
-    }
-  }
-
-  /**
-   * Computes and sets the input and output cardinalities, choosing between the
-   * 'extrapolatedNumRows_' and 'statsNumRows_'.
    * Adjusts the output cardinality based on the scan conjuncts and table sampling.
+   *
+   * Sets these members:
+   * extrapolatedNumRows_, inputCardinality_, cardinality_
    */
   private void computeCardinalities() {
-    Preconditions.checkState(statsNumRows_ >= -1 || hasCorruptTableStats_);
-    Preconditions.checkState(extrapolatedNumRows_ >= -1);
-
-    if (totalBytes_ == 0) {
-      // Nothing to scan. Definitely a cardinality of 0.
-      inputCardinality_ = 0;
-      cardinality_ = 0;
-      return;
-    }
-
     // Choose between the extrapolated row count and the one based on stored stats.
+    extrapolatedNumRows_ = tbl_.getExtrapolatedNumRows(totalBytes_);
+    long statsNumRows = getStatsNumRows();
     if (extrapolatedNumRows_ != -1) {
       // The extrapolated row count is based on the 'totalBytes_' which already accounts
       // for table sampling, so no additional adjustment for sampling is necessary.
       cardinality_ = extrapolatedNumRows_;
     } else {
-      if (!partitions_.isEmpty() && numPartitionsMissingStats_ == partitions_.size()) {
-        // if none of the partitions knew its number of rows, and extrapolation was
-        // not possible, we fall back on the table stats
-        cardinality_ = tbl_.getNumRows();
-      } else {
-        cardinality_ = statsNumRows_;
-      }
+      // Set the cardinality based on table or partition stats.
+      cardinality_ = statsNumRows;
       // Adjust the cardinality based on table sampling.
       if (sampleParams_ != null && cardinality_ != -1) {
         double fracPercBytes = (double) sampleParams_.getPercentBytes() / 100;
@@ -858,6 +810,14 @@ public class HdfsScanNode extends ScanNode {
       }
     }
 
+    // Checked after the block above to first collect information for the explain output.
+    if (totalBytes_ == 0) {
+      // Nothing to scan. Definitely a cardinality of 0.
+      inputCardinality_ = 0;
+      cardinality_ = 0;
+      return;
+    }
+
     // Adjust cardinality for all collections referenced along the tuple's path.
     if (cardinality_ != -1) {
       for (Type t: desc_.getPath().getMatchedTypes()) {
@@ -888,6 +848,46 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
+   * Computes and returns the number of rows scanned based on the per-partition row count
+   * stats and/or the table-level row count stats, depending on which of those are
+   * available, and whether the table is partitioned. Partitions without stats are
+   * ignored as long as there is at least one partition with stats. Otherwise,
+   * we fall back to table-level stats even for partitioned tables.
+   *
+   * Sets these members:
+   * numPartitionsWithNumRows_, partitionNumRows_, hasCorruptTableStats_.
+   */
+  private long getStatsNumRows() {
+    numPartitionsWithNumRows_ = 0;
+    partitionNumRows_ = -1;
+    hasCorruptTableStats_ = false;
+    if (tbl_.getNumClusteringCols() > 0) {
+      for (HdfsPartition p: partitions_) {
+        // Check for corrupt partition stats
+        long partNumRows = p.getNumRows();
+        if (partNumRows < -1  || (partNumRows == 0 && p.getSize() > 0))  {
+          hasCorruptTableStats_ = true;
+        }
+        // Ignore partitions with missing stats in the hope they don't matter
+        // enough to change the planning outcome.
+        if (partNumRows > -1) {
+          if (partitionNumRows_ == -1) partitionNumRows_ = 0;
+          partitionNumRows_ = checkedAdd(partitionNumRows_, partNumRows);
+          ++numPartitionsWithNumRows_;
+        }
+      }
+      if (numPartitionsWithNumRows_ > 0) return partitionNumRows_;
+    }
+    // Table is unpartitioned or the table is partitioned but no partitions have stats.
+    // Set cardinality based on table-level stats.
+    long numRows = tbl_.getNumRows();
+    if (numRows < -1 || (numRows == 0 && tbl_.getTotalHdfsBytes() > 0)) {
+      hasCorruptTableStats_ = true;
+    }
+    return numRows;
+  }
+
+  /**
    * Estimate the number of impalad nodes that this scan node will execute on (which is
    * ultimately determined by the scheduling done by the backend's Scheduler).
    * Assume that scan ranges that can be scheduled locally will be, and that scan
@@ -1016,18 +1016,15 @@ public class HdfsScanNode extends ScanNode {
       }
     }
     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
+      output.append(getStatsExplainString(detailPrefix));
+      output.append("\n");
       String extrapRows = String.valueOf(extrapolatedNumRows_);
       if (!BackendConfig.INSTANCE.enableStatsExtrapolation()) {
         extrapRows = "disabled";
       } else if (extrapolatedNumRows_ == -1) {
         extrapRows = "unavailable";
       }
-      String statsRows = String.valueOf(statsNumRows_);
-      if (statsNumRows_ == -1) statsRows = "unavailable";
-      output.append(String.format(
-          "%sstats-rows=%s extrapolated-rows=%s", detailPrefix, statsRows, extrapRows));
-      output.append("\n");
-      output.append(getStatsExplainString(detailPrefix, detailLevel));
+      output.append(String.format("%sextrapolated-rows=%s", detailPrefix, extrapRows));
       output.append("\n");
       if (numScanRangesNoDiskIds_ > 0) {
         output.append(String.format("%smissing disk ids: " +
@@ -1058,6 +1055,26 @@ public class HdfsScanNode extends ScanNode {
   }
 
   @Override
+  protected String getTableStatsExplainString(String prefix) {
+    StringBuilder output = new StringBuilder();
+    TTableStats tblStats = desc_.getTable().getTTableStats();
+    String numRows = String.valueOf(tblStats.num_rows);
+    if (tblStats.num_rows == -1) numRows = "unavailable";
+    String totalBytes = PrintUtils.printBytes(tblStats.total_file_bytes);
+    if (tblStats.total_file_bytes == -1) totalBytes = "unavailable";
+    output.append(String.format("%stable: rows=%s size=%s",
+        prefix, numRows, totalBytes));
+    if (tbl_.getNumClusteringCols() > 0) {
+      output.append("\n");
+      String partNumRows = String.valueOf(partitionNumRows_);
+      if (partitionNumRows_ == -1) partNumRows = "unavailable";
+      output.append(String.format("%spartitions: %s/%s rows=%s",
+          prefix, numPartitionsWithNumRows_, partitions_.size(), partNumRows));
+    }
+    return output.toString();
+  }
+
+  @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkNotNull(scanRanges_, "Cost estimation requires scan ranges.");
     if (scanRanges_.isEmpty()) {
@@ -1140,6 +1157,10 @@ public class HdfsScanNode extends ScanNode {
   @Override
   public boolean isTableMissingTableStats() {
     if (extrapolatedNumRows_ >= 0) return false;
+    if (tbl_.getNumClusteringCols() > 0
+        && numPartitionsWithNumRows_ != partitions_.size()) {
+      return true;
+    }
     return super.isTableMissingTableStats();
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/main/java/org/apache/impala/planner/ScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index d6b7813..eea9c50 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -23,11 +23,8 @@ import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.Table;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.NotImplementedException;
-import org.apache.impala.common.PrintUtils;
-import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TTableStats;
@@ -46,9 +43,6 @@ abstract public class ScanNode extends PlanNode {
   // Total number of rows this node is expected to process
   protected long inputCardinality_ = -1;
 
-  // Counter indicating if partitions have missing statistics
-  protected int numPartitionsMissingStats_ = 0;
-
   // List of scan-range locations. Populated in init().
   protected List<TScanRangeLocationList> scanRanges_;
 
@@ -103,29 +97,26 @@ abstract public class ScanNode extends PlanNode {
   }
 
   /**
-   * Returns the explain string for table and columns stats to be included into the
-   * a ScanNode's explain string. The given prefix is prepended to each of the lines.
-   * The prefix is used for proper formatting when the string returned by this method
-   * is embedded in a query's explain plan.
+   * Returns the explain string for table stats to be included into this ScanNode's
+   * explain string. The prefix is prepended to each returned line for proper formatting
+   * when the string returned by this method is embedded in a query's explain plan.
    */
-  protected String getStatsExplainString(String prefix, TExplainLevel detailLevel) {
+  protected String getTableStatsExplainString(String prefix) {
     StringBuilder output = new StringBuilder();
-    // Table stats.
     TTableStats tblStats = desc_.getTable().getTTableStats();
     String numRows = String.valueOf(tblStats.num_rows);
     if (tblStats.num_rows == -1) numRows = "unavailable";
-    output.append(prefix + "table stats: rows=" + numRows);
-    if (desc_.getTable() instanceof HdfsTable) {
-      String totalBytes = PrintUtils.printBytes(tblStats.total_file_bytes);
-      if (tblStats.total_file_bytes == -1) totalBytes = "unavailable";
-      output.append(" size=" + totalBytes);
-    }
-    if (tblStats.num_rows != -1 && numPartitionsMissingStats_ > 0) {
-      output.append(" (" + numPartitionsMissingStats_ + " partition(s) missing stats)");
-    }
-    output.append("\n");
+    output.append(prefix + "table: rows=" + numRows);
+    return output.toString();
+  }
 
-    // Column stats.
+  /**
+   * Returns the explain string for column stats to be included into this ScanNode's
+   * explain string. The prefix is prepended to each returned line for proper formatting
+   * when the string returned by this method is embedded in a query's explain plan.
+   */
+  protected String getColumnStatsExplainString(String prefix) {
+    StringBuilder output = new StringBuilder();
     List<String> columnsMissingStats = Lists.newArrayList();
     for (SlotDescriptor slot: desc_.getSlots()) {
       if (!slot.getStats().hasStats() && slot.getColumn() != null) {
@@ -133,9 +124,9 @@ abstract public class ScanNode extends PlanNode {
       }
     }
     if (columnsMissingStats.isEmpty()) {
-      output.append(prefix + "column stats: all");
+      output.append(prefix + "columns: all");
     } else if (columnsMissingStats.size() == desc_.getSlots().size()) {
-      output.append(prefix + "column stats: unavailable");
+      output.append(prefix + "columns: unavailable");
     } else {
       output.append(String.format("%scolumns missing stats: %s", prefix,
           Joiner.on(", ").join(columnsMissingStats)));
@@ -144,6 +135,19 @@ abstract public class ScanNode extends PlanNode {
   }
 
   /**
+   * Combines the explain string for table and column stats.
+   */
+  protected String getStatsExplainString(String prefix) {
+    StringBuilder output = new StringBuilder(prefix);
+    output.append("stored statistics:\n");
+    prefix = prefix + "  ";
+    output.append(getTableStatsExplainString(prefix));
+    output.append("\n");
+    output.append(getColumnStatsExplainString(prefix));
+    return output.toString();
+  }
+
+  /**
    * Returns true if the table underlying this scan is missing table stats
    * or column stats relevant to this scan node.
    */
@@ -152,8 +156,7 @@ abstract public class ScanNode extends PlanNode {
   }
 
   public boolean isTableMissingTableStats() {
-    if (desc_.getTable().getNumRows() == -1) return true;
-    return numPartitionsMissingStats_ > 0;
+    return desc_.getTable().getNumRows() == -1;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index dcec430..ba42583 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -61,7 +61,6 @@ import org.apache.impala.catalog.ColumnNotFoundException;
 import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.Function;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsTable;
@@ -156,6 +155,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.math.LongMath;
 
 /**
  * Class used to execute Catalog Operations, including DDL and refresh/invalidate
@@ -472,7 +472,7 @@ public class CatalogOpExecutor {
         case UPDATE_STATS:
           Preconditions.checkState(params.isSetUpdate_stats_params());
           Reference<Long> numUpdatedColumns = new Reference<>(0L);
-          alterTableUpdateStats(tbl, params.getUpdate_stats_params(), response,
+          alterTableUpdateStats(tbl, params.getUpdate_stats_params(),
               numUpdatedPartitions, numUpdatedColumns);
           reloadTableSchema = true;
           resultColVal.setString_val("Updated " + numUpdatedPartitions.getRef() +
@@ -607,15 +607,6 @@ public class CatalogOpExecutor {
     result.setVersion(updatedCatalogObject.getCatalog_version());
   }
 
-  /**
-   * Creates a new HdfsPartition object and adds it to the corresponding HdfsTable.
-   * Does not create the object in the Hive metastore.
-   */
-  private Table addHdfsPartition(Table tbl, Partition partition)
-      throws CatalogException {
-    return addHdfsPartitions(tbl, Lists.newArrayList(partition));
-  }
-
   private Table addHdfsPartitions(Table tbl, List<Partition> partitions)
       throws CatalogException {
     Preconditions.checkNotNull(tbl);
@@ -684,95 +675,86 @@ public class CatalogOpExecutor {
   /**
    * Alters an existing table's table and/or column statistics. Partitions are updated
    * in batches of size 'MAX_PARTITION_UPDATES_PER_RPC'.
+   * This function is used by COMPUTE STATS, COMPUTE INCREMENTAL STATS and
+   * ALTER TABLE SET COLUMN STATS.
+   * Returns the number of updated partitions and columns in 'numUpdatedPartitions'
+   * and 'numUpdatedColumns', respectively.
    */
   private void alterTableUpdateStats(Table table, TAlterTableUpdateStatsParams params,
-      TDdlExecResponse resp, Reference<Long> numUpdatedPartitions,
-      Reference<Long> numUpdatedColumns) throws ImpalaException {
+      Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns)
+      throws ImpalaException {
     Preconditions.checkState(table.getLock().isHeldByCurrentThread());
-    if (params.isSetTable_stats()) {
-      // Updating table and column stats via COMPUTE STATS.
-      Preconditions.checkState(
-          params.isSetPartition_stats() && params.isSetTable_stats());
-    } else {
-      // Only changing column stats via ALTER TABLE SET COLUMN STATS.
-      Preconditions.checkState(params.isSetColumn_stats());
-    }
+    Preconditions.checkState(params.isSetTable_stats() || params.isSetColumn_stats());
 
     TableName tableName = table.getTableName();
     Preconditions.checkState(tableName != null && tableName.isFullyQualified());
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(String.format("Updating table stats for: %s", tableName));
+    if (LOG.isInfoEnabled()) {
+      int numPartitions =
+          params.isSetPartition_stats() ? params.partition_stats.size() : 0;
+      int numColumns =
+          params.isSetColumn_stats() ? params.column_stats.size() : 0;
+      LOG.info(String.format(
+          "Updating stats for table %s: table-stats=%s partitions=%s column-stats=%s",
+          tableName, params.isSetTable_stats(), numPartitions, numColumns));
+    }
+
+    // Update column stats.
+    ColumnStatistics colStats = null;
+    numUpdatedColumns.setRef(Long.valueOf(0));
+    if (params.isSetColumn_stats()) {
+      colStats = createHiveColStats(params, table);
+      if (colStats.getStatsObjSize() > 0) {
+        try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+          msClient.getHiveClient().updateTableColumnStatistics(colStats);
+        } catch (Exception e) {
+          throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR,
+              "updateTableColumnStatistics"), e);
+        }
+      }
+      numUpdatedColumns.setRef(Long.valueOf(colStats.getStatsObjSize()));
     }
 
     // Deep copy the msTbl to avoid updating our cache before successfully persisting
     // the results to the metastore.
     org.apache.hadoop.hive.metastore.api.Table msTbl =
         table.getMetaStoreTable().deepCopy();
-    List<HdfsPartition> partitions = Lists.newArrayList();
-    if (table instanceof HdfsTable) {
-      // Build a list of non-default partitions to update.
-      HdfsTable hdfsTable = (HdfsTable) table;
-      for (HdfsPartition p: hdfsTable.getPartitions()) {
-        if (!p.isDefaultPartition()) partitions.add(p);
-      }
-    }
 
-    long numTargetedPartitions = 0L;
-    long numTargetedColumns = 0L;
-    try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      // Update the table and partition row counts based on the query results.
-      List<HdfsPartition> modifiedParts = Lists.newArrayList();
-      if (params.isSetTable_stats()) {
-        numTargetedPartitions = updateTableStats(table, params, msTbl, partitions,
-            modifiedParts);
-      }
+    // Update partition-level row counts and incremental column stats for
+    // partitioned Hdfs tables.
+    List<HdfsPartition> modifiedParts = null;
+    if (params.isSetPartition_stats() && table.getNumClusteringCols() > 0) {
+      Preconditions.checkState(table instanceof HdfsTable);
+      modifiedParts = updatePartitionStats(params, (HdfsTable) table);
+      bulkAlterPartitions(table.getDb().getName(), table.getName(), modifiedParts);
+    }
 
-      ColumnStatistics colStats = null;
-      if (params.isSetColumn_stats()) {
-        // Create Hive column stats from the query results.
-        colStats = createHiveColStats(params.getColumn_stats(), table);
-        numTargetedColumns = colStats.getStatsObjSize();
-      }
+    // Update table row count and total file bytes. Apply table alteration to HMS last to
+    // ensure the lastDdlTime is as accurate as possible.
+    if (params.isSetTable_stats()) updateTableStats(params, msTbl);
+    applyAlterTable(msTbl);
 
-      // Update all partitions.
-      bulkAlterPartitions(table.getDb().getName(), table.getName(), modifiedParts);
-      if (numTargetedColumns > 0) {
-        Preconditions.checkNotNull(colStats);
-        // Update column stats.
-        try {
-          msClient.getHiveClient().updateTableColumnStatistics(colStats);
-        } catch (Exception e) {
-          throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR,
-              "updateTableColumnStatistics"), e);
-        }
-      }
-      // Update the table stats. Apply the table alteration last to ensure the
-      // lastDdlTime is as accurate as possible.
-      applyAlterTable(msTbl);
+    numUpdatedPartitions.setRef(Long.valueOf(0));
+    if (modifiedParts != null) {
+      numUpdatedPartitions.setRef(Long.valueOf(modifiedParts.size()));
+    } else if (params.isSetTable_stats()) {
+      numUpdatedPartitions.setRef(Long.valueOf(1));
     }
-    numUpdatedPartitions.setRef(numTargetedPartitions);
-    numUpdatedColumns.setRef(numTargetedColumns);
   }
 
   /**
-   * Updates the row counts of the given Hive partitions and the total row count of the
-   * given Hive table based on the given update stats parameters. The partitions whose
-   * row counts have not changed are skipped. The modified partitions are returned
-   * in the modifiedParts parameter.
-   * Row counts for missing or new partitions as a result of concurrent table
-   * alterations are set to 0.
-   * Returns the number of partitions that were targeted for update (includes partitions
-   * whose row counts have not changed).
+   * Updates the row counts and incremental column stats of the partitions in the given
+   * Impala table based on the given update stats parameters. Returns the modified Impala
+   * partitions.
+   * Row counts for missing or new partitions as a result of concurrent table alterations
+   * are set to 0.
    */
-  private int updateTableStats(Table table, TAlterTableUpdateStatsParams params,
-      org.apache.hadoop.hive.metastore.api.Table msTbl,
-      List<HdfsPartition> partitions, List<HdfsPartition> modifiedParts)
-      throws ImpalaException {
+  private List<HdfsPartition> updatePartitionStats(TAlterTableUpdateStatsParams params,
+      HdfsTable table) throws ImpalaException {
     Preconditions.checkState(params.isSetPartition_stats());
-    Preconditions.checkState(params.isSetTable_stats());
-    // Update the partitions' ROW_COUNT parameter.
-    int numTargetedPartitions = 0;
-    for (HdfsPartition partition: partitions) {
+    List<HdfsPartition> modifiedParts = Lists.newArrayList();
+    for (HdfsPartition partition: table.getPartitions()) {
+      if (partition.isDefaultPartition()) continue;
+
       // NULL keys are returned as 'NULL' in the partition_stats map, so don't substitute
       // this partition's keys with Hive's replacement value.
       List<String> partitionValues = partition.getPartitionValuesAsStrings(false);
@@ -806,23 +788,27 @@ public class CatalogOpExecutor {
       partition.putToParameters(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
       // HMS requires this param for stats changes to take effect.
       partition.putToParameters(MetastoreShim.statsGeneratedViaStatsTaskParam());
-      ++numTargetedPartitions;
       modifiedParts.add(partition);
     }
+    return modifiedParts;
+  }
 
-    // For unpartitioned tables and HBase tables report a single updated partition.
-    if (table.getNumClusteringCols() == 0 || table instanceof HBaseTable) {
-      numTargetedPartitions = 1;
-      if (table instanceof HdfsTable) {
-        Preconditions.checkState(modifiedParts.size() == 1);
-        // Delete stats for this partition as they are included in table stats.
-        PartitionStatsUtil.deletePartStats(modifiedParts.get(0));
-      }
+  /**
+   * Updates the row count and total file bytes of the given HMS table based on the
+   * the update stats parameters.
+   */
+  private void updateTableStats(TAlterTableUpdateStatsParams params,
+      org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaException {
+    Preconditions.checkState(params.isSetTable_stats());
+    long numRows = params.table_stats.num_rows;
+    // Extrapolate based on sampling (if applicable).
+    if (params.isSetSample_file_bytes() && params.table_stats.isSetTotal_file_bytes()) {
+      numRows = getExtrapolatedStatsVal(numRows, params.sample_file_bytes,
+          params.table_stats.total_file_bytes);
     }
 
     // Update the table's ROW_COUNT and TOTAL_SIZE parameters.
-    msTbl.putToParameters(StatsSetupConst.ROW_COUNT,
-        String.valueOf(params.getTable_stats().num_rows));
+    msTbl.putToParameters(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
     if (params.getTable_stats().isSetTotal_file_bytes()) {
       msTbl.putToParameters(StatsSetupConst.TOTAL_SIZE,
           String.valueOf(params.getTable_stats().total_file_bytes));
@@ -830,28 +816,28 @@ public class CatalogOpExecutor {
     // HMS requires this param for stats changes to take effect.
     Pair<String, String> statsTaskParam = MetastoreShim.statsGeneratedViaStatsTaskParam();
     msTbl.putToParameters(statsTaskParam.first, statsTaskParam.second);
-    return numTargetedPartitions;
   }
 
   /**
-   * Create Hive column statistics for the given table based on the give map from column
+   * Create HMS column statistics for the given table based on the give map from column
    * name to column stats. Missing or new columns as a result of concurrent table
    * alterations are ignored.
    */
   private static ColumnStatistics createHiveColStats(
-      Map<String, TColumnStats> columnStats, Table table) {
+      TAlterTableUpdateStatsParams params, Table table) {
+    Preconditions.checkState(params.isSetColumn_stats());
     // Collection of column statistics objects to be returned.
     ColumnStatistics colStats = new ColumnStatistics();
     colStats.setStatsDesc(
         new ColumnStatisticsDesc(true, table.getDb().getName(), table.getName()));
     // Generate Hive column stats objects from the update stats params.
-    for (Map.Entry<String, TColumnStats> entry: columnStats.entrySet()) {
+    for (Map.Entry<String, TColumnStats> entry: params.getColumn_stats().entrySet()) {
       String colName = entry.getKey();
       Column tableCol = table.getColumn(entry.getKey());
       // Ignore columns that were dropped in the meantime.
       if (tableCol == null) continue;
       ColumnStatisticsData colStatsData =
-          createHiveColStatsData(entry.getValue(), tableCol.getType());
+          createHiveColStatsData(params, entry.getValue(), tableCol.getType());
       if (colStatsData == null) continue;
       if (LOG.isTraceEnabled()) {
         LOG.trace(String.format("Updating column stats for %s: numDVs=%s numNulls=%s " +
@@ -866,31 +852,58 @@ public class CatalogOpExecutor {
     return colStats;
   }
 
-  private static ColumnStatisticsData createHiveColStatsData(TColumnStats colStats,
-      Type colType) {
+  /**
+   * Returns 'val' extrapolated based on the sampled and total file bytes. Uses a basic
+   * linear extrapolation. All parameters must be >= 0.
+   * The returned value is >= 'val' and >= 0. Returns Long.MAX_VALUE if a computation
+   * overflows.
+   */
+  private static long getExtrapolatedStatsVal(long val, long sampleFileBytes,
+      long totalFileBytes) {
+    Preconditions.checkArgument(val >= 0 && sampleFileBytes >= 0 && totalFileBytes >= 0);
+    double mult = 0.0;
+    if (sampleFileBytes > 0) mult = (double) totalFileBytes / sampleFileBytes;
+    // The round() caps the returned value at Long.MAX_VALUE.
+    return Math.round(val * mult);
+  }
+
+  private static ColumnStatisticsData createHiveColStatsData(
+      TAlterTableUpdateStatsParams params, TColumnStats colStats, Type colType) {
     ColumnStatisticsData colStatsData = new ColumnStatisticsData();
-    long ndvs = colStats.getNum_distinct_values();
+    long ndv = colStats.getNum_distinct_values();
+    // Cap NDV at row count if available.
+    if (params.isSetTable_stats()) ndv = Math.min(ndv, params.table_stats.num_rows);
+    // Extrapolate NDV based on sampling if applicable.
+    if (params.isSetSample_file_bytes() && params.isSetTable_stats()
+        && params.table_stats.isSetTotal_file_bytes()) {
+      ndv = getExtrapolatedStatsVal(ndv, params.sample_file_bytes,
+          params.table_stats.total_file_bytes);
+    }
+
     long numNulls = colStats.getNum_nulls();
     switch(colType.getPrimitiveType()) {
       case BOOLEAN:
-        // TODO: Gather and set the numTrues and numFalse stats as well. The planner
-        // currently does not rely on them.
         colStatsData.setBooleanStats(new BooleanColumnStatsData(1, -1, numNulls));
         break;
       case TINYINT:
+        ndv = Math.min(ndv, LongMath.pow(2, Byte.SIZE));
+        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        break;
       case SMALLINT:
+        ndv = Math.min(ndv, LongMath.pow(2, Short.SIZE));
+        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        break;
       case INT:
+        ndv = Math.min(ndv, LongMath.pow(2, Integer.SIZE));
+        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        break;
       case BIGINT:
       case TIMESTAMP: // Hive and Impala use LongColumnStatsData for timestamps.
-        // TODO: Gather and set the min/max values stats as well. The planner
-        // currently does not rely on them.
-        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndvs));
+        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
         break;
       case FLOAT:
       case DOUBLE:
-        // TODO: Gather and set the min/max values stats as well. The planner
-        // currently does not rely on them.
-        colStatsData.setDoubleStats(new DoubleColumnStatsData(numNulls, ndvs));
+        colStatsData.setDoubleStats(new DoubleColumnStatsData(numNulls, ndv));
         break;
       case CHAR:
       case VARCHAR:
@@ -898,13 +911,12 @@ public class CatalogOpExecutor {
         long maxStrLen = colStats.getMax_size();
         double avgStrLen = colStats.getAvg_size();
         colStatsData.setStringStats(
-            new StringColumnStatsData(maxStrLen, avgStrLen, numNulls, ndvs));
+            new StringColumnStatsData(maxStrLen, avgStrLen, numNulls, ndv));
         break;
       case DECIMAL:
-        // TODO: Gather and set the min/max values stats as well. The planner
-        // currently does not rely on them.
-        colStatsData.setDecimalStats(
-            new DecimalColumnStatsData(numNulls, ndvs));
+        double decMaxNdv = Math.pow(10, colType.getPrecision());
+        ndv = (long) Math.min(ndv, decMaxNdv);
+        colStatsData.setDecimalStats(new DecimalColumnStatsData(numNulls, ndv));
         break;
       default:
         return null;
@@ -2916,7 +2928,7 @@ public class CatalogOpExecutor {
       org.apache.hadoop.hive.metastore.api.Partition msPart = p.toHmsPartition();
       if (msPart != null) hmsPartitions.add(msPart);
     }
-    if (hmsPartitions.size() == 0) return;
+    if (hmsPartitions.isEmpty()) return;
 
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       // Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'.

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 11fea8d..6f6942a 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -44,7 +44,9 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.FrontendTestBase;
 import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.testutil.TestUtils;
+import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TDescribeTableParams;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
@@ -1151,9 +1153,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
   public void TestComputeStats() throws AnalysisException {
     // Analyze the stmt itself as well as the generated child queries.
     checkComputeStatsStmt("compute stats functional.alltypes");
-
     checkComputeStatsStmt("compute stats functional_hbase.alltypes");
-
     // Test that complex-typed columns are ignored.
     checkComputeStatsStmt("compute stats functional.allcomplextypes");
 
@@ -1185,6 +1185,37 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "column 'col1' of type 'string' does not match the Avro-schema column " +
         "'boolean1' of type 'BOOLEAN' at position '0'.\nPlease re-create the table " +
         "with column definitions, e.g., using the result of 'SHOW CREATE TABLE'");
+
+    // Test tablesample clause with extrapolation enabled/disabled. Replace/restore the
+    // static backend config for this test to control stats extrapolation.
+    BackendConfig origInstance = BackendConfig.INSTANCE;
+    try {
+      TBackendGflags testGflags = new TBackendGflags();
+      testGflags.setEnable_stats_extrapolation(true);
+      BackendConfig.create(testGflags);
+
+      checkComputeStatsStmt("compute stats functional.alltypes tablesample system (10)");
+      checkComputeStatsStmt(
+          "compute stats functional.alltypes tablesample system (55) repeatable(1)");
+      AnalysisError("compute stats functional.alltypes tablesample system (101)",
+          "Invalid percent of bytes value '101'. " +
+          "The percent of bytes to sample must be between 0 and 100.");
+      AnalysisError("compute stats functional_kudu.alltypes tablesample system (1)",
+          "TABLESAMPLE is only supported on HDFS tables.");
+      AnalysisError("compute stats functional_hbase.alltypes tablesample system (2)",
+          "TABLESAMPLE is only supported on HDFS tables.");
+      AnalysisError(
+          "compute stats functional.alltypes_datasource tablesample system (3)",
+          "TABLESAMPLE is only supported on HDFS tables.");
+
+      testGflags.setEnable_stats_extrapolation(false);
+      BackendConfig.create(testGflags);
+      AnalysisError("compute stats functional.alltypes tablesample system (10)",
+          "COMPUTE STATS TABLESAMPLE requires --enable_stats_extrapolation=true. " +
+          "Stats extrapolation is currently disabled.");
+    } finally {
+      BackendConfig.INSTANCE = origInstance;
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 1032d07..87d0761 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -338,26 +338,26 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
 
     // Only applicable to HDFS base table refs.
     AnalysisError("select * from functional_kudu.alltypes tablesample system (10)",
-        "TABLESAMPLE is only supported on HDFS base tables: functional_kudu.alltypes");
+        "TABLESAMPLE is only supported on HDFS tables: functional_kudu.alltypes");
     AnalysisError("select * from functional_hbase.alltypes tablesample system (10)",
-        "TABLESAMPLE is only supported on HDFS base tables: functional_hbase.alltypes");
+        "TABLESAMPLE is only supported on HDFS tables: functional_hbase.alltypes");
     AnalysisError("select * from functional.alltypes_datasource tablesample system (10)",
-        "TABLESAMPLE is only supported on HDFS base tables: " +
+        "TABLESAMPLE is only supported on HDFS tables: " +
         "functional.alltypes_datasource");
     AnalysisError("select * from (select * from functional.alltypes) v " +
         "tablesample system (10)",
-        "TABLESAMPLE is only supported on HDFS base tables: v");
+        "TABLESAMPLE is only supported on HDFS tables: v");
     AnalysisError("with v as (select * from functional.alltypes) " +
         "select * from v tablesample system (10)",
-        "TABLESAMPLE is only supported on HDFS base tables: v");
+        "TABLESAMPLE is only supported on HDFS tables: v");
     AnalysisError("select * from functional.alltypes_view tablesample system (10)",
-        "TABLESAMPLE is only supported on HDFS base tables: functional.alltypes_view");
+        "TABLESAMPLE is only supported on HDFS tables: functional.alltypes_view");
     AnalysisError("select * from functional.allcomplextypes.int_array_col " +
         "tablesample system (10)",
-        "TABLESAMPLE is only supported on HDFS base tables: int_array_col");
+        "TABLESAMPLE is only supported on HDFS tables: int_array_col");
     AnalysisError("select * from functional.allcomplextypes a, a.int_array_col " +
         "tablesample system (10)",
-        "TABLESAMPLE is only supported on HDFS base tables: int_array_col");
+        "TABLESAMPLE is only supported on HDFS tables: int_array_col");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 1f9a6af..583e353 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -631,7 +631,7 @@ public class ParserTest extends FrontendTestBase {
     ParserError("select * from t tablesample system (10) repeatable");
     // Random seed must be an int literal.
     ParserError("select * from t tablesample system (10) repeatable (10 + 10)");
-    // Negative precent.
+    // Negative percent.
     ParserError("select * from t tablesample system (-10)");
     // Negative random seed.
     ParserError("select * from t tablesample system (10) repeatable(-10)");
@@ -3587,24 +3587,27 @@ public class ParserTest extends FrontendTestBase {
 
   @Test
   public void TestComputeStats() {
+    ParsesOk("COMPUTE STATS alltypes");
     ParsesOk("COMPUTE STATS functional.alltypes");
+    ParsesOk("COMPUTE STATS alltypes TABLESAMPLE SYSTEM(10)");
+    ParsesOk("COMPUTE STATS alltypes TABLESAMPLE SYSTEM(10) REPEATABLE(10)");
+    ParsesOk("COMPUTE STATS functional.alltypes TABLESAMPLE SYSTEM(10) REPEATABLE(10)");
     ParserError("COMPUTE functional.alltypes");
     ParserError("COMPUTE STATS ON functional.alltypes");
     ParserError("COMPUTE STATS");
   }
 
   @Test
-  public void TestComputeStatsIncremental() {
+  public void TestComputeIncrementalStats() {
     ParsesOk("COMPUTE INCREMENTAL STATS functional.alltypes");
-    ParserError("COMPUTE INCREMENTAL functional.alltypes");
-
     ParsesOk(
         "COMPUTE INCREMENTAL STATS functional.alltypes PARTITION(month=10, year=2010)");
-
+    ParsesOk(
+        "DROP INCREMENTAL STATS functional.alltypes PARTITION(month=10, year=2010)");
     ParserError("COMPUTE INCREMENTAL STATS");
-
-    ParsesOk("DROP INCREMENTAL STATS functional.alltypes PARTITION(month=10, year=2010)");
+    ParserError("COMPUTE INCREMENTAL functional.alltypes");
     ParserError("DROP INCREMENTAL STATS functional.alltypes");
+    ParserError("COMPUTE INCREMENTAL STATS functional.alltypes TABLESAMPLE SYSTEM(10)");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d8a507/fe/src/test/java/org/apache/impala/hive/executor/UdfExecutorTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/hive/executor/UdfExecutorTest.java b/fe/src/test/java/org/apache/impala/hive/executor/UdfExecutorTest.java
index 0601aa3..0a5d95c 100644
--- a/fe/src/test/java/org/apache/impala/hive/executor/UdfExecutorTest.java
+++ b/fe/src/test/java/org/apache/impala/hive/executor/UdfExecutorTest.java
@@ -71,6 +71,7 @@ import org.apache.thrift.protocol.TBinaryProtocol;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;