You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2017/12/18 17:55:58 UTC

[5/6] impala git commit: IMPALA-5310: Part 3: Use SAMPLED_NDV() in COMPUTE STATS.

IMPALA-5310: Part 3: Use SAMPLED_NDV() in COMPUTE STATS.

Modifies COMPUTE STATS TABLESAMPLE to use the new SAMPLED_NDV()
function.

Testing:
- modified/improved existing functional tests
- core/hdfs run passed

Change-Id: I6ec0831f77698695975e45ec0bc0364c765d819b
Reviewed-on: http://gerrit.cloudera.org:8080/8840
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1f7b3b00
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1f7b3b00
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1f7b3b00

Branch: refs/heads/master
Commit: 1f7b3b00e921d68857abb22f48656ee4444c194c
Parents: 5a7c10e
Author: Alex Behm <al...@cloudera.com>
Authored: Wed Dec 13 12:34:00 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Dec 16 04:58:59 2017 +0000

----------------------------------------------------------------------
 be/src/exec/catalog-op-executor.cc              |   3 -
 common/thrift/JniCatalog.thrift                 |   8 --
 .../impala/analysis/ComputeStatsStmt.java       | 124 +++++++++++------
 .../impala/service/CatalogOpExecutor.java       |  27 ----
 .../QueryTest/compute-stats-tablesample.test    | 137 -------------------
 tests/common/impala_test_suite.py               |  13 ++
 .../custom_cluster/test_stats_extrapolation.py  | 117 +++++++++++-----
 tests/query_test/test_aggregation.py            |   9 +-
 8 files changed, 179 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/1f7b3b00/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index bb9508b..7490ed1 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -117,9 +117,6 @@ Status CatalogOpExecutor::ExecComputeStats(
     update_stats_params.table_stats.__set_total_file_bytes(
         compute_stats_params.total_file_bytes);
   }
-  if (compute_stats_params.__isset.sample_file_bytes) {
-    update_stats_params.__set_sample_file_bytes(compute_stats_params.sample_file_bytes);
-  }
   // col_stats_schema and col_stats_data will be empty if there was no column stats query.
   if (!col_stats_schema.columns.empty()) {
     if (compute_stats_params.is_incremental) {

http://git-wip-us.apache.org/repos/asf/impala/blob/1f7b3b00/common/thrift/JniCatalog.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 18f30fe..6e746c7 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -295,10 +295,6 @@ struct TAlterTableUpdateStatsParams {
 
   // If true, this is the result of an incremental stats computation
   6: optional bool is_incremental
-
-  // Sum of file sizes in the table sample. Only set when TABLESAMPLE was specified.
-  // Only set for tables of type HDFS_TABLE and if is_incremental is false.
-  7: optional i64 sample_file_bytes
 }
 
 // Parameters for ALTER TABLE SET [PARTITION partitionSet] CACHED|UNCACHED
@@ -505,10 +501,6 @@ struct TComputeStatsParams {
   // Sum of file sizes in the table. Only set for tables of type HDFS_TABLE and if
   // is_incremental is false.
   9: optional i64 total_file_bytes
-
-  // Sum of file sizes in the table sample. Only set when TABLESAMPLE was specified.
-  // Only set for tables of type HDFS_TABLE and if is_incremental is false.
-  10: optional i64 sample_file_bytes
 }
 
 // Parameters for CREATE/DROP ROLE

http://git-wip-us.apache.org/repos/asf/impala/blob/1f7b3b00/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 64dc9bf..ed81f89 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -58,11 +58,11 @@ import com.google.common.collect.Sets;
  *   Computes and replaces the table-level row count and total file size, as well as all
  *   table-level column statistics. Existing partition-objects and their row count are
  *   not modified at all. The TABLESAMPLE clause can be used to limit the scanned data
- *   volume to a desired percentage. When sampling, the results of the COMPUTE STATS
- *   queries are sent to the CatalogServer. There, the stats are extrapolated before
- *   storing them into the HMS so as not to confuse other engines like Hive/SparkSQL
- *   which may rely on the shared HMS fields representing to the whole table and not
- *   a sample. See {@link CatalogOpExecutor#getExtrapolatedStatsVal}.
+ *   volume to a desired percentage. When sampling, the COMPUTE STATS queries directly
+ *   produce extrapolated stats which are then stored in the HMS via the CatalogServer.
+ *   We store extrapolated stats in the HMS so as not to confuse other engines like
+ *   Hive/SparkSQL which may rely on the shared HMS fields representing to the whole
+ *   table and not a sample. See {@link CatalogOpExecutor#getExtrapolatedStatsVal}.
  * - Stats extrapolation disabled:
  *   Computes and replaces the table-level row count and total file size, the row counts
  *   for all partitions (if applicable), as well as all table-level column statistics.
@@ -101,9 +101,9 @@ public class ComputeStatsStmt extends StatementBase {
   // Set during analysis.
   protected Table table_;
 
-  // Total number of bytes in the file sample of the target HDFS table. Set to -1 for
-  // non-HDFS tables or when TABLESAMPLE is not specified.
-  protected long sampleFileBytes_ = -1;
+  // Effective sampling percent based on the total number of bytes in the files sample.
+  // Set to -1 for non-HDFS tables or if TABLESAMPLE was not specified.
+  protected double effectiveSamplePerc_ = -1;
 
   // The Null count is not currently being used in optimization or run-time,
   // and compute stats runs 2x faster in many cases when not counting NULLs.
@@ -202,7 +202,11 @@ public class ComputeStatsStmt extends StatementBase {
       String colRefSql = ToSqlUtils.getIdentSql(c.getName());
       if (isIncremental_) {
         columnStatsSelectList.add("NDV_NO_FINALIZE(" + colRefSql + ") AS " + colRefSql);
+      } else if (effectiveSamplePerc_ > 0) {
+        columnStatsSelectList.add(String.format("SAMPLED_NDV(%s, %.10f) AS %s",
+            colRefSql, effectiveSamplePerc_, colRefSql));
       } else {
+        // Regular (non-incremental) compute stats without sampling.
         columnStatsSelectList.add("NDV(" + colRefSql + ") AS " + colRefSql);
       }
 
@@ -241,39 +245,58 @@ public class ComputeStatsStmt extends StatementBase {
   }
 
   /**
-   * Constructs two queries to compute statistics for 'tableName_', if that table exists
-   * (although if we can detect that no work needs to be done for either query, that query
-   * will be 'null' and not executed).
+   * Constructs two SQL queries for computing the row-count and column statistics and
+   * sets them in 'tableStatsQueryStr_' and 'columnStatsQueryStr_', respectively.
+   * The queries are generated as follows.
    *
-   * The first query computes the number of rows (on a per-partition basis if the table is
-   * partitioned) and has the form "SELECT COUNT(*) FROM tbl GROUP BY part_col1,
-   * part_col2...", with an optional WHERE clause for incremental computation (see below).
+   * 1. Regular COMPUTE STATS (not incremental and no sampling)
+   * 1.1 Row counts:
+   * SELECT COUNT(*) FROM tbl [GROUP BY part_col1, part_col2 ...]
+   * The GROUP BY clause is added if the target is a partitioned HDFS table and
+   * stats extrapolation is disabled. Otherwise, no GROUP BY is used.
    *
-   * The second query computes the NDV estimate, the average width, the maximum width and,
-   * optionally, the number of nulls for each column. For non-partitioned tables (or
-   * non-incremental computations), the query is simple:
+   * 1.2 Column stats:
+   * SELECT NDV(c1), CAST(-1 as typeof(c1)), MAX(length(c1)), AVG(length(c1)),
+   *        NDV(c2), CAST(-1 as typeof(c2)), MAX(length(c2)), AVG(length(c2)),
+   *        ...
+   * FROM tbl
    *
-   * SELECT NDV(col), COUNT(<nulls>), MAX(length(col)), AVG(length(col)) FROM tbl
+   * 2. COMPUTE STATS with TABLESAMPLE
+   * 2.1 Row counts:
+   * SELECT ROUND(COUNT(*) / <effective_sample_perc>)
+   * FROM tbl TABLESAMPLE SYSTEM(<sample_perc>) REPEATABLE (<random_seed>)
    *
-   * (For non-string columns, the widths are hard-coded as they are known at query
-   * construction time).
+   * 2.1 Column stats:
+   * SELECT SAMPLED_NDV(c1, p), CAST(-1 as typeof(c1)), MAX(length(c1)), AVG(length(c1)),
+   *        SAMPLED_NDV(c2, p), CAST(-1 as typeof(c2)), MAX(length(c2)), AVG(length(c2)),
+   *        ...
+   * FROM tbl TABLESAMPLE SYSTEM(<sample_perc>) REPEATABLE (<random_seed>)
+   * SAMPLED_NDV() is a specialized aggregation function that estimates the NDV based on
+   * a sample. The "p" passed to the SAMPLED_NDV() is the effective sampling rate.
    *
-   * If computation is incremental (i.e. the original statement was COMPUTE INCREMENTAL
-   * STATS.., and the underlying table is a partitioned HdfsTable), some modifications are
-   * made to the non-incremental per-column query. First, a different UDA,
-   * NDV_NO_FINALIZE() is used to retrieve and serialise the intermediate state from each
-   * column. Second, the results are grouped by partition, as with the row count query, so
-   * that the intermediate NDV computation state can be stored per-partition. The number
-   * of rows per-partition are also recorded.
+   * 3. COMPUTE INCREMENTAL STATS
+   * 3.1 Row counts:
+   * SELECT COUNT(*) FROM tbl GROUP BY part_col1, part_col2 ...
+   * [WHERE ((part_col1 = p1_val1) AND (part_col2 = p1_val2)) OR
+   *        ((part_col1 = p2_val1) AND (part_col2 = p2_val2)) OR ...]
+   * The WHERE clause is constructed to select the relevant partitions.
    *
-   * For both the row count query, and the column stats query, the query's WHERE clause is
-   * used to restrict execution only to partitions that actually require new statistics to
-   * be computed.
-   *
-   * SELECT NDV_NO_FINALIZE(col), <nulls, max, avg>, COUNT(col) FROM tbl
+   * 3.2 Column stats:
+   * SELECT NDV_NO_FINALIZE(c1), <nulls, max, avg>, COUNT(c1),
+   *        NDV_NO_FINALIZE(c2), <nulls, max, avg>, COUNT(c2),
+   *        ...
+   * FROM tbl
    * GROUP BY part_col1, part_col2, ...
-   * WHERE ((part_col1 = p1_val1) AND (part_col2 = p1_val2)) OR
-   *       ((part_col1 = p2_val1) AND (part_col2 = p2_val2)) OR ...
+   * [WHERE ((part_col1 = p1_val1) AND (part_col2 = p1_val2)) OR
+   *       ((part_col1 = p2_val1) AND (part_col2 = p2_val2)) OR ...]
+   * The WHERE clause is constructed to select the relevant partitions.
+   * NDV_NO_FINALIZE() produces a non-finalized HyperLogLog intermediate byte array.
+   *
+   * 4. For all COMPUTE STATS variants:
+   * - The MAX() and AVG() for the column stats queries are only relevant for var-len
+   *   columns like STRING. For fixed-len columns MAX() and AVG() are replaced with the
+   *   appropriate literals.
+   * - Queries will be set to null if we can detect that no work needs to be performed.
    */
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
@@ -432,10 +455,17 @@ public class ComputeStatsStmt extends StatementBase {
       validPartStats_.clear();
     }
 
+    // Tablesample clause to be used for all child queries.
+    String tableSampleSql = analyzeTableSampleClause(analyzer);
 
     // Query for getting the per-partition row count and the total row count.
     StringBuilder tableStatsQueryBuilder = new StringBuilder("SELECT ");
-    List<String> tableStatsSelectList = Lists.newArrayList("COUNT(*)");
+    String countSql = "COUNT(*)";
+    if (effectiveSamplePerc_ > 0) {
+      // Extrapolate the count based on the effective sampling rate.
+      countSql = String.format("ROUND(COUNT(*) / %.10f)", effectiveSamplePerc_);
+    }
+    List<String> tableStatsSelectList = Lists.newArrayList(countSql);
     // Add group by columns for incremental stats or with extrapolation disabled.
     List<String> groupByCols = Lists.newArrayList();
     if (!updateTableStatsOnly()) {
@@ -445,8 +475,6 @@ public class ComputeStatsStmt extends StatementBase {
       tableStatsSelectList.addAll(groupByCols);
     }
     tableStatsQueryBuilder.append(Joiner.on(", ").join(tableStatsSelectList));
-    // Tablesample clause to be used for all child queries.
-    String tableSampleSql = analyzeTableSampleClause(analyzer);
     tableStatsQueryBuilder.append(" FROM " + tableName_.toSql() + tableSampleSql);
 
     // Query for getting the per-column NDVs and number of NULLs.
@@ -495,7 +523,8 @@ public class ComputeStatsStmt extends StatementBase {
   }
 
   /**
-   * Analyzes the TABLESAMPLE clause and computes the sample to set 'sampleFileBytes_'.
+   * Analyzes the TABLESAMPLE clause and computes the files sample to set
+   * 'effectiveSamplePerc_'.
    * Returns the TABLESAMPLE SQL to be used for all child queries or an empty string if
    * not sampling. If sampling, the returned SQL includes a fixed random seed so all
    * child queries generate a consistent sample, even if the user did not originally
@@ -524,10 +553,20 @@ public class ComputeStatsStmt extends StatementBase {
     HdfsTable hdfsTable = (HdfsTable) table_;
     Map<Long, List<FileDescriptor>> sample = hdfsTable.getFilesSample(
         hdfsTable.getPartitions(), sampleParams_.getPercentBytes(), sampleSeed);
-    sampleFileBytes_ = 0;
+    long sampleFileBytes = 0;
     for (List<FileDescriptor> fds: sample.values()) {
-      for (FileDescriptor fd: fds) sampleFileBytes_ += fd.getFileLength();
+      for (FileDescriptor fd: fds) sampleFileBytes += fd.getFileLength();
     }
+
+    // Compute effective sampling percent.
+    long totalFileBytes = ((HdfsTable)table_).getTotalHdfsBytes();
+    if (totalFileBytes > 0) {
+      effectiveSamplePerc_ = (double) sampleFileBytes / (double) totalFileBytes;
+    } else {
+      effectiveSamplePerc_ = 0;
+    }
+    Preconditions.checkState(effectiveSamplePerc_ >= 0.0 && effectiveSamplePerc_ <= 1.0);
+
     return " " + sampleParams_.toSql(sampleSeed);
   }
 
@@ -666,13 +705,8 @@ public class ComputeStatsStmt extends StatementBase {
     if (isIncremental_) {
       params.setNum_partition_cols(((HdfsTable)table_).getNumClusteringCols());
     }
-
     if (table_ instanceof HdfsTable) {
       params.setTotal_file_bytes(((HdfsTable)table_).getTotalHdfsBytes());
-      if (sampleParams_ != null) {
-        Preconditions.checkState(sampleFileBytes_ >= 0);
-        if (sampleFileBytes_ != -1) params.setSample_file_bytes(sampleFileBytes_);
-      }
     }
     return params;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/1f7b3b00/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index ba42583..b0ed45f 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -801,12 +801,6 @@ public class CatalogOpExecutor {
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaException {
     Preconditions.checkState(params.isSetTable_stats());
     long numRows = params.table_stats.num_rows;
-    // Extrapolate based on sampling (if applicable).
-    if (params.isSetSample_file_bytes() && params.table_stats.isSetTotal_file_bytes()) {
-      numRows = getExtrapolatedStatsVal(numRows, params.sample_file_bytes,
-          params.table_stats.total_file_bytes);
-    }
-
     // Update the table's ROW_COUNT and TOTAL_SIZE parameters.
     msTbl.putToParameters(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
     if (params.getTable_stats().isSetTotal_file_bytes()) {
@@ -852,33 +846,12 @@ public class CatalogOpExecutor {
     return colStats;
   }
 
-  /**
-   * Returns 'val' extrapolated based on the sampled and total file bytes. Uses a basic
-   * linear extrapolation. All parameters must be >= 0.
-   * The returned value is >= 'val' and >= 0. Returns Long.MAX_VALUE if a computation
-   * overflows.
-   */
-  private static long getExtrapolatedStatsVal(long val, long sampleFileBytes,
-      long totalFileBytes) {
-    Preconditions.checkArgument(val >= 0 && sampleFileBytes >= 0 && totalFileBytes >= 0);
-    double mult = 0.0;
-    if (sampleFileBytes > 0) mult = (double) totalFileBytes / sampleFileBytes;
-    // The round() caps the returned value at Long.MAX_VALUE.
-    return Math.round(val * mult);
-  }
-
   private static ColumnStatisticsData createHiveColStatsData(
       TAlterTableUpdateStatsParams params, TColumnStats colStats, Type colType) {
     ColumnStatisticsData colStatsData = new ColumnStatisticsData();
     long ndv = colStats.getNum_distinct_values();
     // Cap NDV at row count if available.
     if (params.isSetTable_stats()) ndv = Math.min(ndv, params.table_stats.num_rows);
-    // Extrapolate NDV based on sampling if applicable.
-    if (params.isSetSample_file_bytes() && params.isSetTable_stats()
-        && params.table_stats.isSetTotal_file_bytes()) {
-      ndv = getExtrapolatedStatsVal(ndv, params.sample_file_bytes,
-          params.table_stats.total_file_bytes);
-    }
 
     long numNulls = colStats.getNum_nulls();
     switch(colType.getPrimitiveType()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/1f7b3b00/testdata/workloads/functional-query/queries/QueryTest/compute-stats-tablesample.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/compute-stats-tablesample.test b/testdata/workloads/functional-query/queries/QueryTest/compute-stats-tablesample.test
deleted file mode 100644
index 893f428..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/compute-stats-tablesample.test
+++ /dev/null
@@ -1,137 +0,0 @@
-====
----- QUERY
-compute stats alltypes tablesample system (50) repeatable (1)
----- RESULTS
-'Updated 1 partition(s) and 11 column(s).'
----- TYPES
-STRING
-====
----- QUERY
-# Only the table-level row count is stored. The partition row counts are extrapolated.
-show table stats alltypes
----- LABELS
-YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
----- RESULTS
-'2009','1',-1,305,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=1'
-'2009','2',-1,277,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=2'
-'2009','3',-1,306,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=3'
-'2009','4',-1,299,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=4'
-'2009','5',-1,311,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=5'
-'2009','6',-1,301,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=6'
-'2009','7',-1,311,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=7'
-'2009','8',-1,311,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=8'
-'2009','9',-1,301,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=9'
-'2009','10',-1,311,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=10'
-'2009','11',-1,301,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=11'
-'2009','12',-1,311,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=12'
-'Total','',3643,3643,12,regex:.*B,'0B','','','',''
----- TYPES
-STRING,STRING,BIGINT,BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING
-====
----- QUERY
-show column stats alltypes
----- LABELS
-COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE
----- RESULTS
-'id','INT',3643,-1,4,4
-'bool_col','BOOLEAN',2,-1,1,1
-'tinyint_col','TINYINT',17,-1,1,1
-'smallint_col','SMALLINT',17,-1,2,2
-'int_col','INT',17,-1,4,4
-'bigint_col','BIGINT',17,-1,8,8
-'float_col','FLOAT',17,-1,4,4
-'double_col','DOUBLE',17,-1,8,8
-'date_string_col','STRING',364,-1,8,8
-'string_col','STRING',17,-1,1,1
-'timestamp_col','TIMESTAMP',3612,-1,16,16
-'year','INT',1,0,4,4
-'month','INT',12,0,4,4
----- TYPES
-STRING,STRING,BIGINT,BIGINT,BIGINT,DOUBLE
-====
----- QUERY
-# Repeat tests on a very small sample.
-compute stats alltypes tablesample system (1) repeatable (1)
----- RESULTS
-'Updated 1 partition(s) and 11 column(s).'
----- TYPES
-STRING
-====
----- QUERY
-show table stats alltypes
----- LABELS
-YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
----- RESULTS
-'2009','1',-1,304,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=1'
-'2009','2',-1,276,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=2'
-'2009','3',-1,305,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=3'
-'2009','4',-1,298,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=4'
-'2009','5',-1,310,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=5'
-'2009','6',-1,300,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=6'
-'2009','7',-1,310,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=7'
-'2009','8',-1,310,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=8'
-'2009','9',-1,300,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=9'
-'2009','10',-1,310,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=10'
-'2009','11',-1,300,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=11'
-'2009','12',-1,310,1,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypes/year=2009/month=12'
-'Total','',3633,3633,12,regex:.*B,'0B','','','',''
----- TYPES
-STRING,STRING,BIGINT,BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING
-====
----- QUERY
-show column stats alltypes
----- LABELS
-COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE
----- RESULTS
-'id','INT',3633,-1,4,4
-'bool_col','BOOLEAN',2,-1,1,1
-'tinyint_col','TINYINT',117,-1,1,1
-'smallint_col','SMALLINT',117,-1,2,2
-'int_col','INT',117,-1,4,4
-'bigint_col','BIGINT',117,-1,8,8
-'float_col','FLOAT',117,-1,4,4
-'double_col','DOUBLE',117,-1,8,8
-'date_string_col','STRING',352,-1,8,8
-'string_col','STRING',117,-1,1,1
-'timestamp_col','TIMESTAMP',3633,-1,16,16
-'year','INT',1,0,4,4
-'month','INT',12,0,4,4
----- TYPES
-STRING,STRING,BIGINT,BIGINT,BIGINT,DOUBLE
-====
----- QUERY
-# Test unpartitioned table.
-compute stats alltypesnopart tablesample system (10) repeatable (999)
----- RESULTS
-'Updated 1 partition(s) and 11 column(s).'
----- TYPES
-STRING
-====
----- QUERY
-show table stats alltypesnopart
----- LABELS
-#ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
----- RESULTS
-3660,3660,12,regex:.*B,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypesnopart'
----- TYPES
-BIGINT,BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING
-====
----- QUERY
-show column stats alltypesnopart
----- LABELS
-COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE
----- RESULTS
-'id','INT',3660,-1,4,4
-'bool_col','BOOLEAN',2,-1,1,1
-'tinyint_col','TINYINT',63,-1,1,1
-'smallint_col','SMALLINT',63,-1,2,2
-'int_col','INT',63,-1,4,4
-'bigint_col','BIGINT',63,-1,8,8
-'float_col','FLOAT',63,-1,4,4
-'double_col','DOUBLE',63,-1,8,8
-'date_string_col','STRING',360,-1,8,8
-'string_col','STRING',63,-1,1,1
-'timestamp_col','TIMESTAMP',3660,-1,16,16
----- TYPES
-STRING,STRING,BIGINT,BIGINT,BIGINT,DOUBLE
-====

http://git-wip-us.apache.org/repos/asf/impala/blob/1f7b3b00/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index b35e054..86bbf71 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -623,6 +623,19 @@ class ImpalaTestSuite(BaseTestSuite):
     self.hive_client.drop_table(db_name, table_name, True)
     self.hive_client.create_table(table)
 
+  def clone_table(self, src_tbl, dst_tbl, recover_partitions, vector):
+    src_loc = self._get_table_location(src_tbl, vector)
+    self.client.execute("create external table {0} like {1} location '{2}'"\
+        .format(dst_tbl, src_tbl, src_loc))
+    if recover_partitions:
+      self.client.execute("alter table {0} recover partitions".format(dst_tbl))
+
+  def appx_equals(self, a, b, diff_perc):
+    """Returns True if 'a' and 'b' are within 'diff_perc' percent of each other,
+    False otherwise. 'diff_perc' must be a float in [0,1]."""
+    if a == b: return True # Avoid division by 0
+    assert abs(a - b) / float(max(a,b)) <= diff_perc
+
   def _get_table_location(self, table_name, vector):
     """ Returns the HDFS location of the table """
     result = self.execute_query_using_client(self.client,

http://git-wip-us.apache.org/repos/asf/impala/blob/1f7b3b00/tests/custom_cluster/test_stats_extrapolation.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_stats_extrapolation.py b/tests/custom_cluster/test_stats_extrapolation.py
index acf259a..6d0e34d 100644
--- a/tests/custom_cluster/test_stats_extrapolation.py
+++ b/tests/custom_cluster/test_stats_extrapolation.py
@@ -45,40 +45,93 @@ class TestStatsExtrapolation(CustomClusterTestSuite):
 
   @CustomClusterTestSuite.with_args(impalad_args=('--enable_stats_extrapolation=true'))
   def test_compute_stats_tablesample(self, vector, unique_database):
-    # Create a partitioned and unpartitioned text table. Use the existing files from
-    # functional.alltypes as data because those have a known, stable file size. This
-    # test is sensitive to changes in file sizes across test runs because the sampling
-    # is file based. Creating test tables with INSERT does not guarantee that the same
-    # file sample is selected across test runs, even with REPEATABLE.
+    """COMPUTE STATS TABLESAMPLE is inherently non-deterministic due to its use of
+    SAMPLED_NDV() so we test it specially. The goal of this test is to ensure that
+    COMPUTE STATS TABLESAMPLE computes in-the-right-ballpark stats and successfully
+    stores them in the HMS."""
 
-    # Create partitioned test table. External to avoid dropping files from alltypes.
+    # Test partitioned table.
     part_test_tbl = unique_database + ".alltypes"
-    self.client.execute(
-      "create external table %s like functional.alltypes" % part_test_tbl)
-    alltypes_loc = self._get_table_location("functional.alltypes", vector)
-    for m in xrange(1, 13):
-      part_loc = path.join(alltypes_loc, "year=2009/month=%s" % m)
-      self.client.execute(
-        "alter table %s add partition (year=2009,month=%s) location '%s'"
-        % (part_test_tbl, m, part_loc))
+    self.clone_table("functional.alltypes", part_test_tbl, True, vector)
+    self.__run_sampling_test(part_test_tbl, "functional.alltypes", 1, 3)
+    self.__run_sampling_test(part_test_tbl, "functional.alltypes", 10, 7)
+    self.__run_sampling_test(part_test_tbl, "functional.alltypes", 20, 13)
+    self.__run_sampling_test(part_test_tbl, "functional.alltypes", 100, 99)
 
-    # Create unpartitioned test table.
+    # Test unpartitioned table.
     nopart_test_tbl = unique_database + ".alltypesnopart"
-    self.client.execute("drop table if exists %s" % nopart_test_tbl)
-    self.client.execute(
-      "create table %s like functional.alltypesnopart" % nopart_test_tbl)
-    nopart_test_tbl_loc = self._get_table_location(nopart_test_tbl, vector)
-    # Remove NameNode prefix and first '/' because PyWebHdfs expects that
-    if nopart_test_tbl_loc.startswith(NAMENODE):
-      nopart_test_tbl_loc = nopart_test_tbl_loc[len(NAMENODE)+1:]
-    for m in xrange(1, 13):
-      src_part_loc = alltypes_loc + "/year=2009/month=%s" % m
-      # Remove NameNode prefix and first '/' because PyWebHdfs expects that
-      if src_part_loc.startswith(NAMENODE): src_part_loc = src_part_loc[len(NAMENODE)+1:]
-      file_names = self.filesystem_client.ls(src_part_loc)
-      for f in file_names:
-        self.filesystem_client.copy(path.join(src_part_loc, f),
-                                    path.join(nopart_test_tbl_loc, f))
-    self.client.execute("refresh %s" % nopart_test_tbl)
+    self.client.execute("create table {0} as select * from functional.alltypes"\
+      .format(nopart_test_tbl))
+    # Clone to use as a baseline. We run the regular COMPUTE STATS on this table.
+    nopart_test_tbl_exp = unique_database + ".alltypesnopart_exp"
+    self.clone_table(nopart_test_tbl, nopart_test_tbl_exp, False, vector)
+    self.client.execute("compute stats {0}".format(nopart_test_tbl_exp))
+    self.__run_sampling_test(nopart_test_tbl, nopart_test_tbl_exp, 1, 3)
+    self.__run_sampling_test(nopart_test_tbl, nopart_test_tbl_exp, 10, 7)
+    self.__run_sampling_test(nopart_test_tbl, nopart_test_tbl_exp, 20, 13)
+    self.__run_sampling_test(nopart_test_tbl, nopart_test_tbl_exp, 100, 99)
+
+    # Test empty table.
+    empty_test_tbl = unique_database + ".empty"
+    self.clone_table("functional.alltypes", empty_test_tbl, False, vector)
+    self.__run_sampling_test(empty_test_tbl, empty_test_tbl, 10, 7)
+
+    # Test wide table. Should not crash or error. This takes a few minutes so restrict
+    # to exhaustive.
+    if self.exploration_strategy() == "exhaustive":
+      wide_test_tbl = unique_database + ".wide"
+      self.clone_table("functional.widetable_1000_cols", wide_test_tbl, False, vector)
+      self.client.execute(
+        "compute stats {0} tablesample system(10)".format(wide_test_tbl))
+
+  def __run_sampling_test(self, tbl, expected_tbl, perc, seed):
+    """Drops stats on 'tbl' and then runs COMPUTE STATS TABLESAMPLE on 'tbl' with the
+    given sampling percent and random seed. Checks that the resulting table and column
+    stats are reasoanbly close to those of 'expected_tbl'."""
+    self.client.execute("drop stats {0}".format(tbl))
+    self.client.execute("compute stats {0} tablesample system ({1}) repeatable ({2})"\
+      .format(tbl, perc, seed))
+    self.__check_table_stats(tbl, expected_tbl)
+    self.__check_column_stats(tbl, expected_tbl)
+
+  def __check_table_stats(self, tbl, expected_tbl):
+    """Checks that the row counts reported in SHOW TABLE STATS on 'tbl' are within 2x
+    of those reported for 'expected_tbl'. Assumes that COMPUTE STATS was previously run
+    on 'expected_table' and that COMPUTE STATS TABLESAMPLE was run on 'tbl'."""
+    actual = self.client.execute("show table stats {0}".format(tbl))
+    expected = self.client.execute("show table stats {0}".format(expected_tbl))
+    assert len(actual.data) == len(expected.data)
+    assert len(actual.schema.fieldSchemas) == len(expected.schema.fieldSchemas)
+    col_names = [fs.name.upper() for fs in actual.schema.fieldSchemas]
+    rows_col_idx = col_names.index("#ROWS")
+    extrap_rows_col_idx = col_names.index("EXTRAP #ROWS")
+    for i in xrange(0, len(actual.data)):
+      act_cols = actual.data[i].split("\t")
+      exp_cols = expected.data[i].split("\t")
+      assert int(exp_cols[rows_col_idx]) >= 0
+      self.appx_equals(\
+        int(act_cols[extrap_rows_col_idx]), int(exp_cols[rows_col_idx]), 2)
+      # Only the table-level row count is stored. The partition row counts
+      # are extrapolated.
+      if act_cols[0] == "Total":
+        self.appx_equals(
+          int(act_cols[rows_col_idx]), int(exp_cols[rows_col_idx]), 2)
+      elif len(actual.data) > 1:
+        # Partition row count is expected to not be set.
+        assert int(act_cols[rows_col_idx]) == -1
 
-    self.run_test_case('QueryTest/compute-stats-tablesample', vector, unique_database)
+  def __check_column_stats(self, tbl, expected_tbl):
+    """Checks that the NDVs in SHOW COLUMNS STATS on 'tbl' are within 2x of those
+    reported for 'expected_tbl'. Assumes that COMPUTE STATS was previously run
+    on 'expected_table' and that COMPUTE STATS TABLESAMPLE was run on 'tbl'."""
+    actual = self.client.execute("show column stats {0}".format(tbl))
+    expected = self.client.execute("show column stats {0}".format(expected_tbl))
+    assert len(actual.data) == len(expected.data)
+    assert len(actual.schema.fieldSchemas) == len(expected.schema.fieldSchemas)
+    col_names = [fs.name.upper() for fs in actual.schema.fieldSchemas]
+    ndv_col_idx = col_names.index("#DISTINCT VALUES")
+    for i in xrange(0, len(actual.data)):
+      act_cols = actual.data[i].split("\t")
+      exp_cols = expected.data[i].split("\t")
+      assert int(exp_cols[ndv_col_idx]) >= 0
+      self.appx_equals(int(act_cols[ndv_col_idx]), int(exp_cols[ndv_col_idx]), 2)

http://git-wip-us.apache.org/repos/asf/impala/blob/1f7b3b00/tests/query_test/test_aggregation.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py
index 233c33a..aee2236 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -329,7 +329,7 @@ class TestAggregationQueries(ImpalaTestSuite):
       # Low NDV columns. We expect a reasonaby accurate estimate regardless of the
       # sampling percent.
       for i in xrange(0, 14):
-        self.__appx_equals(int(sampled_ndv_vals[i]), int(ndv_vals[i]), 0.1)
+        self.appx_equals(int(sampled_ndv_vals[i]), int(ndv_vals[i]), 0.1)
       # High NDV columns. We expect the estimate to have high variance and error.
       # Since we give NDV() and SAMPLED_NDV() the same input data, i.e., we are not
       # actually sampling for SAMPLED_NDV(), we expect the result of SAMPLED_NDV() to
@@ -337,12 +337,7 @@ class TestAggregationQueries(ImpalaTestSuite):
       # For example, the column 'id' is a PK so we expect the result of SAMPLED_NDV()
       # with a sampling percent of 0.1 to be approximately 10x of the NDV().
       for i in xrange(14, 16):
-        self.__appx_equals(int(sampled_ndv_vals[i]) * sample_perc, int(ndv_vals[i]), 2.0)
-
-  def __appx_equals(self, a, b, diff_perc):
-    """Returns True if 'a' and 'b' are within 'diff_perc' percent of each other,
-    False otherwise. 'diff_perc' must be a float in [0,1]."""
-    assert abs(a - b) / float(max(a, b)) <= diff_perc
+        self.appx_equals(int(sampled_ndv_vals[i]) * sample_perc, int(ndv_vals[i]), 2.0)
 
 class TestWideAggregationQueries(ImpalaTestSuite):
   """Test that aggregations with many grouping columns work"""