You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/07/06 17:44:32 UTC

[2/2] incubator-impala git commit: IMPALA-5036: Parquet count star optimization

IMPALA-5036: Parquet count star optimization

Instead of materializing empty rows when computing count star, we use
the data stored in the Parquet RowGroup.num_rows field. The Parquet
scanner tuple is modified to have one slot into which we will write the
num rows statistic. The aggregate function is changed from count to a
special sum function that gets initialized to 0. We also add a rewrite
rule so that count(<literal>) is rewritten to count(*) in order to make
sure that this optimization is applied in all cases.

Testing:
- Added functional and planner tests

Change-Id: I536b85c014821296aed68a0c68faadae96005e62
Reviewed-on: http://gerrit.cloudera.org:8080/6812
Reviewed-by: Taras Bobrovytsky <tb...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/57d7c614
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/57d7c614
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/57d7c614

Branch: refs/heads/master
Commit: 57d7c614bcce018f486f1bfeccba369cfc21391c
Parents: d7d6c03
Author: Taras Bobrovytsky <tb...@cloudera.com>
Authored: Thu Apr 6 13:51:39 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jul 6 01:26:44 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc             |  53 ++-
 be/src/exec/hdfs-scan-node-base.cc              |   5 +
 be/src/exec/hdfs-scan-node-base.h               |  11 +
 common/thrift/PlanNodes.thrift                  |   4 +
 .../apache/impala/analysis/AggregateInfo.java   |  11 +
 .../org/apache/impala/analysis/Analyzer.java    |   6 +-
 .../impala/analysis/FunctionCallExpr.java       |  22 +-
 .../apache/impala/analysis/FunctionParams.java  |   2 +-
 .../apache/impala/analysis/TupleDescriptor.java |   2 +-
 .../org/apache/impala/catalog/BuiltinsDb.java   |  10 +
 .../org/apache/impala/planner/HdfsScanNode.java |  94 +++-
 .../org/apache/impala/planner/PlanNode.java     |   4 +-
 .../org/apache/impala/planner/ScanNode.java     |   2 +-
 .../impala/planner/SingleNodePlanner.java       |  74 +--
 .../impala/rewrite/NormalizeCountStarRule.java  |  55 +++
 .../impala/analysis/ExprRewriteRulesTest.java   |  17 +-
 .../org/apache/impala/planner/PlannerTest.java  |   3 +
 .../queries/PlannerTest/disable-codegen.test    |   8 +-
 .../queries/PlannerTest/distinct.test           |  16 +-
 .../queries/PlannerTest/join-order.test         |   2 +-
 .../queries/PlannerTest/parquet-stats-agg.test  | 349 ++++++++++++++
 .../PlannerTest/resource-requirements.test      |  12 +-
 .../queries/QueryTest/aggregation.test          |  64 +++
 .../queries/QueryTest/parquet-stats-agg.test    | 117 +++++
 .../queries/QueryTest/parquet-stats.test        | 460 +++++++++++++++++++
 .../queries/QueryTest/parquet_stats.test        | 460 -------------------
 tests/query_test/test_aggregation.py            |   9 +
 tests/query_test/test_parquet_stats.py          |   3 +-
 28 files changed, 1333 insertions(+), 542 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 2689045..63298cb 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -18,7 +18,6 @@
 #include "exec/hdfs-parquet-scanner.h"
 
 #include <limits> // for std::numeric_limits
-#include <memory>
 #include <queue>
 
 #include <gflags/gflags.h>
@@ -374,7 +373,7 @@ static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group,
 }
 
 int HdfsParquetScanner::CountScalarColumns(const vector<ParquetColumnReader*>& column_readers) {
-  DCHECK(!column_readers.empty());
+  DCHECK(!column_readers.empty() || scan_node_->optimize_parquet_count_star());
   int num_columns = 0;
   stack<ParquetColumnReader*> readers;
   for (ParquetColumnReader* r: column_readers_) readers.push(r);
@@ -425,9 +424,39 @@ Status HdfsParquetScanner::ProcessSplit() {
 }
 
 Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
-  if (scan_node_->IsZeroSlotTableScan()) {
-    // There are no materialized slots, e.g. count(*) over the table.  We can serve
-    // this query from just the file metadata. We don't need to read the column data.
+  if (scan_node_->optimize_parquet_count_star()) {
+    // Populate the single slot with the Parquet num rows statistic.
+    int64_t tuple_buf_size;
+    uint8_t* tuple_buf;
+    // We try to allocate a smaller row batch here because in most cases the number row
+    // groups in a file is much lower than the default row batch capacity.
+    int capacity = min(
+        static_cast<int>(file_metadata_.row_groups.size()), row_batch->capacity());
+    RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer(state_,
+        row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(),
+        &capacity, &tuple_buf_size, &tuple_buf));
+    while (!row_batch->AtCapacity()) {
+      RETURN_IF_ERROR(NextRowGroup());
+      DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
+      DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
+      if (row_group_idx_ == file_metadata_.row_groups.size()) break;
+      Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
+      TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
+      InitTuple(template_tuple_, dst_tuple);
+      int64_t* dst_slot = reinterpret_cast<int64_t*>(dst_tuple->GetSlot(
+          scan_node_->parquet_count_star_slot_offset()));
+      *dst_slot = file_metadata_.row_groups[row_group_idx_].num_rows;
+      row_group_rows_read_ += *dst_slot;
+      dst_row->SetTuple(0, dst_tuple);
+      row_batch->CommitLastRow();
+      tuple_buf += scan_node_->tuple_desc()->byte_size();
+    }
+    eos_ = row_group_idx_ == file_metadata_.row_groups.size();
+    return Status::OK();
+  } else if (scan_node_->IsZeroSlotTableScan()) {
+    // There are no materialized slots and we are not optimizing count(*), e.g.
+    // "select 1 from alltypes". We can serve this query from just the file metadata.
+    // We don't need to read the column data.
     if (row_group_rows_read_ == file_metadata_.num_rows) {
       eos_ = true;
       return Status::OK();
@@ -466,7 +495,8 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
       if (!status.ok()) RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
     }
     RETURN_IF_ERROR(NextRowGroup());
-    if (row_group_idx_ >= file_metadata_.row_groups.size()) {
+    DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
+    if (row_group_idx_ == file_metadata_.row_groups.size()) {
       eos_ = true;
       DCHECK(parse_status_.ok());
       return Status::OK();
@@ -540,7 +570,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
     }
 
     int col_idx = node->col_idx;
-    DCHECK(col_idx < row_group.columns.size());
+    DCHECK_LT(col_idx, row_group.columns.size());
 
     const vector<parquet::ColumnOrder>& col_orders = file_metadata.column_orders;
     const parquet::ColumnOrder* col_order = nullptr;
@@ -1422,6 +1452,12 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
   DCHECK(column_readers != NULL);
   DCHECK(column_readers->empty());
 
+  if (scan_node_->optimize_parquet_count_star()) {
+    // Column readers are not needed because we are not reading from any columns if this
+    // optimization is enabled.
+    return Status::OK();
+  }
+
   // Each tuple can have at most one position slot. We'll process this slot desc last.
   SlotDescriptor* pos_slot_desc = NULL;
 
@@ -1605,7 +1641,8 @@ Status HdfsParquetScanner::InitColumns(
     int64_t col_end = col_start + col_len;
 
     // Already validated in ValidateColumnOffsets()
-    DCHECK(col_end > 0 && col_end < file_desc->file_length);
+    DCHECK_GT(col_end, 0);
+    DCHECK_LT(col_end, file_desc->file_length);
     if (file_version_.application == "parquet-mr" && file_version_.VersionLt(1, 2, 9)) {
       // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
       // dictionary page header size in total_compressed_size and total_uncompressed_size

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 469c00e..0e5e974 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -88,6 +88,11 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
       skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
           tnode.hdfs_scan_node.skip_header_line_count : 0),
       tuple_id_(tnode.hdfs_scan_node.tuple_id),
+      optimize_parquet_count_star_(
+          tnode.hdfs_scan_node.__isset.parquet_count_star_slot_offset),
+      parquet_count_star_slot_offset_(
+          tnode.hdfs_scan_node.__isset.parquet_count_star_slot_offset ?
+          tnode.hdfs_scan_node.parquet_count_star_slot_offset : -1),
       tuple_desc_(descs.GetTupleDescriptor(tuple_id_)),
       thrift_dict_filter_conjuncts_map_(
           tnode.hdfs_scan_node.__isset.dictionary_filter_conjuncts ?

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 11e5718..f71f5b4 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -154,6 +154,8 @@ class HdfsScanNodeBase : public ScanNode {
   RuntimeState* runtime_state() { return runtime_state_; }
   int skip_header_line_count() const { return skip_header_line_count_; }
   DiskIoRequestContext* reader_context() { return reader_context_; }
+  bool optimize_parquet_count_star() const { return optimize_parquet_count_star_; }
+  bool parquet_count_star_slot_offset() const { return parquet_count_star_slot_offset_; }
 
   typedef std::unordered_map<TupleId, std::vector<ScalarExprEvaluator*>>
     ConjunctEvaluatorsMap;
@@ -324,6 +326,15 @@ class HdfsScanNodeBase : public ScanNode {
   /// Tuple id resolved in Prepare() to set tuple_desc_
   const int tuple_id_;
 
+  /// Set to true when this scan node can optimize a count(*) query by populating the
+  /// tuple with data from the Parquet num rows statistic. See
+  /// applyParquetCountStartOptimization() in HdfsScanNode.java.
+  const bool optimize_parquet_count_star_;
+
+  // The byte offset of the slot for Parquet metadata if Parquet count star optimization
+  // is enabled.
+  const int parquet_count_star_slot_offset_;
+
   /// RequestContext object to use with the disk-io-mgr for reads.
   DiskIoRequestContext* reader_context_ = nullptr;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 6299b9e..e5e7f24 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -216,6 +216,10 @@ struct THdfsScanNode {
   // Map from SlotIds to the indices in TPlanNode.conjuncts that are eligible
   // for dictionary filtering.
   9: optional map<Types.TSlotId, list<i32>> dictionary_filter_conjuncts
+
+  // The byte offset of the slot for Parquet metadata if Parquet count star optimization
+  // is enabled.
+  10: optional i32 parquet_count_star_slot_offset
 }
 
 struct TDataSourceScanNode {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
index 4f8b4fc..3b0ad9c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
@@ -652,6 +652,17 @@ public class AggregateInfo extends AggregateInfoBase {
   }
 
   /**
+   * Returns true if there is a single count(*) materialized aggregate expression.
+   */
+  public boolean hasCountStarOnly() {
+    if (getMaterializedAggregateExprs().size() != 1) return false;
+    if (isDistinctAgg()) return false;
+    FunctionCallExpr origExpr = getMaterializedAggregateExprs().get(0);
+    if (!origExpr.getFnName().getFunction().equalsIgnoreCase("count")) return false;
+    return origExpr.getParams().isStar();
+  }
+
+  /**
    * Validates the internal state of this agg info: Checks that the number of
    * materialized slots of the output tuple corresponds to the number of materialized
    * aggregate functions plus the number of grouping exprs. Also checks that the return

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 3527b85..33fbb8b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -60,11 +60,12 @@ import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.rewrite.BetweenToCompoundRule;
 import org.apache.impala.rewrite.EqualityDisjunctsToInRule;
-import org.apache.impala.rewrite.ExprRewriter;
 import org.apache.impala.rewrite.ExprRewriteRule;
+import org.apache.impala.rewrite.ExprRewriter;
 import org.apache.impala.rewrite.ExtractCommonConjunctRule;
 import org.apache.impala.rewrite.FoldConstantsRule;
 import org.apache.impala.rewrite.NormalizeBinaryPredicatesRule;
+import org.apache.impala.rewrite.NormalizeCountStarRule;
 import org.apache.impala.rewrite.NormalizeExprsRule;
 import org.apache.impala.rewrite.SimplifyConditionalsRule;
 import org.apache.impala.service.FeSupport;
@@ -343,6 +344,7 @@ public class Analyzer {
         // Relies on FoldConstantsRule and NormalizeExprsRule.
         rules.add(SimplifyConditionalsRule.INSTANCE);
         rules.add(EqualityDisjunctsToInRule.INSTANCE);
+        rules.add(NormalizeCountStarRule.INSTANCE);
       }
       exprRewriter_ = new ExprRewriter(rules);
     }
@@ -602,7 +604,6 @@ public class Analyzer {
     Preconditions.checkNotNull(resolvedPath);
     if (resolvedPath.destTable() != null) {
       Table table = resolvedPath.destTable();
-      Preconditions.checkNotNull(table);
       if (table instanceof View) return new InlineViewRef((View) table, tableRef);
       // The table must be a base table.
       Preconditions.checkState(table instanceof HdfsTable ||
@@ -695,6 +696,7 @@ public class Analyzer {
     return globalState_.descTbl.getSlotDesc(id);
   }
 
+  public int getNumTableRefs() { return tableRefMap_.size(); }
   public TableRef getTableRef(TupleId tid) { return tableRefMap_.get(tid); }
   public ExprRewriter getConstantFolder() { return globalState_.constantFolder_; }
   public ExprRewriter getExprRewriter() { return globalState_.exprRewriter_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index 8538492..f263cd5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -51,7 +51,7 @@ public class FunctionCallExpr extends Expr {
   // feeding into this Merge(). This is stored so that we can access the types of the
   // original input argument exprs. Note that the nullness affects the behaviour of
   // resetAnalysisState(), which is used during expr substitution.
-  private final FunctionCallExpr mergeAggInputFn_;
+  private FunctionCallExpr mergeAggInputFn_;
 
   // Printed in toSqlImpl(), if set. Used for merge agg fns.
   private String label_;
@@ -568,6 +568,9 @@ public class FunctionCallExpr extends Expr {
     if (hasChildCosts()) evalCost_ = getChildCosts() + FUNCTION_CALL_COST;
   }
 
+  public FunctionCallExpr getMergeAggInputFn() { return mergeAggInputFn_; }
+  public void setMergeAggInputFn(FunctionCallExpr fn) { mergeAggInputFn_ = fn; }
+
   /**
    * Checks that no special aggregate params are included in 'params' that would be
    * invalid for a scalar function. Analysis of the param exprs is not done.
@@ -604,4 +607,21 @@ public class FunctionCallExpr extends Expr {
 
   @Override
   public Expr clone() { return new FunctionCallExpr(this); }
+
+  @Override
+  protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer)
+      throws AnalysisException {
+    Expr e = super.substituteImpl(smap, analyzer);
+    if (!(e instanceof FunctionCallExpr)) return e;
+    FunctionCallExpr fn = (FunctionCallExpr) e;
+    FunctionCallExpr mergeFn = fn.getMergeAggInputFn();
+    if (mergeFn != null) {
+      // The merge function needs to be substituted as well.
+      Expr substitutedFn = mergeFn.substitute(smap, analyzer, true);
+      Preconditions.checkState(substitutedFn instanceof FunctionCallExpr);
+      fn.setMergeAggInputFn((FunctionCallExpr) substitutedFn);
+    }
+    return e;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/fe/src/main/java/org/apache/impala/analysis/FunctionParams.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionParams.java b/fe/src/main/java/org/apache/impala/analysis/FunctionParams.java
index c0a78c7..32e23d7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionParams.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionParams.java
@@ -23,7 +23,7 @@ import java.util.List;
  * Return value of the grammar production that parses function
  * parameters. These parameters can be for scalar or aggregate functions.
  */
-class FunctionParams implements Cloneable {
+public class FunctionParams implements Cloneable {
   private final boolean isStar_;
   private boolean isDistinct_;
   private boolean isIgnoreNulls_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
index bc7da2b..a87cd3a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
@@ -339,7 +339,7 @@ public class TupleDescriptor {
     for (SlotDescriptor slotDesc: getSlots()) {
       if (!slotDesc.isMaterialized()) continue;
       if (slotDesc.getColumn() == null ||
-          slotDesc.getColumn().getPosition() >= hdfsTable.getNumClusteringCols()) {
+          !hdfsTable.isClusteringColumn(slotDesc.getColumn())) {
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 93f859d..58f5d15 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -880,6 +880,16 @@ public class BuiltinsDb extends Db {
         prefix + "16SumDecimalRemoveEPN10impala_udf15FunctionContextERKNS1_10DecimalValEPS4_",
         null, false, true, false));
 
+    // Sum that returns zero on an empty input.
+    db.addBuiltin(AggregateFunction.createBuiltin(db, "sum_init_zero",
+        Lists.<Type>newArrayList(Type.BIGINT), Type.BIGINT, Type.BIGINT,
+        prefix + "8InitZeroIN10impala_udf9BigIntValEEEvPNS2_15FunctionContextEPT_",
+        prefix + "9SumUpdateIN10impala_udf9BigIntValES3_EEvPNS2_15FunctionContextERKT_PT0_",
+        prefix + "9SumUpdateIN10impala_udf9BigIntValES3_EEvPNS2_15FunctionContextERKT_PT0_",
+        null, null,
+        prefix + "9SumRemoveIN10impala_udf9BigIntValES3_EEvPNS2_15FunctionContextERKT_PT0_",
+        null, false, true, true));
+
     // Avg
     // TODO: switch to CHAR(sizeof(AvgIntermediateType) when that becomes available
     db.addBuiltin(AggregateFunction.createBuiltin(db, "avg",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index a08f0e8..fa9038a 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -27,10 +27,15 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.ExprSubstitutionMap;
+import org.apache.impala.analysis.FunctionCallExpr;
+import org.apache.impala.analysis.FunctionName;
+import org.apache.impala.analysis.FunctionParams;
 import org.apache.impala.analysis.InPredicate;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
@@ -43,9 +48,9 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.HdfsFileFormat;
-import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.FileSystemUtil;
@@ -97,6 +102,13 @@ import com.google.common.collect.Sets;
  * parquet::Statistics of row groups. If the conjuncts don't match, then whole row groups
  * will be skipped.
  *
+ * Count(*) aggregation optimization flow:
+ * The caller passes in an AggregateInfo to the constructor that this scan node uses to
+ * determine whether to apply the optimization or not. The produced smap must then be
+ * applied to the AggregateInfo in this query block. We do not apply the smap in this
+ * class directly to avoid side effects and make it easier to reason about.
+ * See HdfsScanNode.applyParquetCountStartOptimization().
+ *
  * TODO: pass in range restrictions.
  */
 public class HdfsScanNode extends ScanNode {
@@ -126,6 +138,10 @@ public class HdfsScanNode extends ScanNode {
   private final TReplicaPreference replicaPreference_;
   private final boolean randomReplica_;
 
+  // The AggregationInfo from the query block of this scan node. Used for determining if
+  // the Parquet count(*) optimization can be applied.
+  private final AggregateInfo aggInfo_;
+
   // Number of partitions, files and bytes scanned. Set in computeScanRangeLocations().
   // Might not match 'partitions_' due to table sampling.
   private int numPartitions_ = 0;
@@ -140,6 +156,11 @@ public class HdfsScanNode extends ScanNode {
   // True if this scan node should use the MT implementation in the backend.
   private boolean useMtScanNode_;
 
+  // Should be applied to the AggregateInfo from the same query block. We cannot use the
+  // PlanNode.outputSmap_ for this purpose because we don't want the smap entries to be
+  // propagated outside the query block.
+  protected ExprSubstitutionMap optimizedAggSmap_;
+
   // Conjuncts that can be evaluated while materializing the items (tuples) of
   // collection-typed slots. Maps from tuple descriptor to the conjuncts bound by that
   // tuple. Uses a linked hash map for consistent display in explain.
@@ -182,6 +203,10 @@ public class HdfsScanNode extends ScanNode {
   // parquet::Statistics.
   private TupleDescriptor minMaxTuple_;
 
+  // Slot that is used to record the Parquet metatdata for the count(*) aggregation if
+  // this scan node has the count(*) optimization enabled.
+  private SlotDescriptor countStarSlot_ = null;
+
   /**
    * Construct a node to scan given data files into tuples described by 'desc',
    * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and
@@ -189,7 +214,7 @@ public class HdfsScanNode extends ScanNode {
    * class comments above for details.
    */
   public HdfsScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> conjuncts,
-      List<HdfsPartition> partitions, TableRef hdfsTblRef) {
+      List<HdfsPartition> partitions, TableRef hdfsTblRef, AggregateInfo aggInfo) {
     super(id, desc, "SCAN HDFS");
     Preconditions.checkState(desc.getTable() instanceof HdfsTable);
     tbl_ = (HdfsTable)desc.getTable();
@@ -201,6 +226,7 @@ public class HdfsScanNode extends ScanNode {
     HdfsTable hdfsTable = (HdfsTable)hdfsTblRef.getTable();
     Preconditions.checkState(tbl_ == hdfsTable);
     StringBuilder error = new StringBuilder();
+    aggInfo_ = aggInfo;
     skipHeaderLineCount_ = tbl_.parseSkipHeaderLineCount(error);
     if (error.length() > 0) {
       // Any errors should already have been caught during analysis.
@@ -218,6 +244,47 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
+   * Adds a new slot descriptor to the tuple descriptor of this scan. The new slot will be
+   * used for storing the data extracted from the Parquet num rows statistic. Also adds an
+   * entry to 'optimizedAggSmap_' that substitutes count(*) with
+   * sum_init_zero(<new-slotref>). Returns the new slot descriptor.
+   */
+  private SlotDescriptor applyParquetCountStartOptimization(Analyzer analyzer) {
+    FunctionCallExpr countFn = new FunctionCallExpr(new FunctionName("count"),
+        FunctionParams.createStarParam());
+    countFn.analyzeNoThrow(analyzer);
+
+    // Create the sum function.
+    SlotDescriptor sd = analyzer.addSlotDescriptor(getTupleDesc());
+    sd.setType(Type.BIGINT);
+    sd.setIsMaterialized(true);
+    sd.setIsNullable(false);
+    sd.setLabel("parquet-stats: num_rows");
+    ArrayList<Expr> args = Lists.newArrayList();
+    args.add(new SlotRef(sd));
+    FunctionCallExpr sumFn = new FunctionCallExpr("sum_init_zero", args);
+    sumFn.analyzeNoThrow(analyzer);
+
+    optimizedAggSmap_ = new ExprSubstitutionMap();
+    optimizedAggSmap_.put(countFn, sumFn);
+    return sd;
+  }
+
+  /**
+   * Returns true if the Parquet count(*) optimization can be applied to the query block
+   * of this scan node.
+   */
+  private boolean canApplyParquetCountStarOptimization(Analyzer analyzer,
+      Set<HdfsFileFormat> fileFormats) {
+    if (analyzer.getNumTableRefs() != 1) return false;
+    if (aggInfo_ == null || !aggInfo_.hasCountStarOnly()) return false;
+    if (fileFormats.size() != 1) return false;
+    if (!fileFormats.contains(HdfsFileFormat.PARQUET)) return false;
+    if (!conjuncts_.isEmpty()) return false;
+    return desc_.getMaterializedSlots().isEmpty() || desc_.hasClusteringColsOnly();
+  }
+
+  /**
    * Populate collectionConjuncts_ and scanRanges_.
    */
   @Override
@@ -227,7 +294,6 @@ public class HdfsScanNode extends ScanNode {
 
     assignCollectionConjuncts(analyzer);
     computeDictionaryFilterConjuncts(analyzer);
-    computeMemLayout(analyzer);
 
     // compute scan range locations with optional sampling
     Set<HdfsFileFormat> fileFormats = computeScanRangeLocations(analyzer);
@@ -248,7 +314,16 @@ public class HdfsScanNode extends ScanNode {
       computeMinMaxTupleAndConjuncts(analyzer);
     }
 
-    // do this at the end so it can take all conjuncts and scan ranges into account
+    if (canApplyParquetCountStarOptimization(analyzer, fileFormats)) {
+      Preconditions.checkState(desc_.getPath().destTable() != null);
+      Preconditions.checkState(collectionConjuncts_.isEmpty());
+      countStarSlot_ = applyParquetCountStartOptimization(analyzer);
+    }
+
+    computeMemLayout(analyzer);
+
+    // This is towards the end, so that it can take all conjuncts, scan ranges and mem
+    // layout into account.
     computeStats(analyzer);
 
     // TODO: do we need this?
@@ -310,10 +385,6 @@ public class HdfsScanNode extends ScanNode {
     }
   }
 
-  public boolean isPartitionedTable() {
-    return desc_.getTable().getNumClusteringCols() > 0;
-  }
-
   /**
    * Populates the collection conjuncts, materializes their required slots, and marks
    * the conjuncts as assigned, if it is correct to do so. Some conjuncts may have to
@@ -855,6 +926,11 @@ public class HdfsScanNode extends ScanNode {
       msg.hdfs_scan_node.setSkip_header_line_count(skipHeaderLineCount_);
     }
     msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_);
+    Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == null));
+    if (countStarSlot_ != null) {
+      msg.hdfs_scan_node.setParquet_count_star_slot_offset(
+          countStarSlot_.getByteOffset());
+    }
     if (!minMaxConjuncts_.isEmpty()) {
       for (Expr e: minMaxConjuncts_) {
         msg.hdfs_scan_node.addToMin_max_conjuncts(e.treeToThrift());
@@ -1016,6 +1092,8 @@ public class HdfsScanNode extends ScanNode {
         MAX_IO_BUFFERS_PER_THREAD * BackendConfig.INSTANCE.getReadSize();
   }
 
+  public ExprSubstitutionMap getOptimizedAggSmap() { return optimizedAggSmap_; }
+
   @Override
   public boolean isTableMissingTableStats() {
     if (extrapolatedNumRows_ >= 0) return false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/fe/src/main/java/org/apache/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 0b36922..8448da5 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -93,7 +93,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   // assigned to a fragment. Set and maintained by enclosing PlanFragment.
   protected PlanFragment fragment_;
 
-  // if set, needs to be applied by parent node to reference this node's output
+  // If set, needs to be applied by parent node to reference this node's output. The
+  // entries need to be propagated all the way to the root node.
   protected ExprSubstitutionMap outputSmap_;
 
   // global state of planning wrt conjunct assignment; used by planner as a shortcut
@@ -206,7 +207,6 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   /**
    * Set the limit_ to the given limit_ only if the limit_ hasn't been set, or the new limit_
    * is lower.
-   * @param limit_
    */
   public void setLimit(long limit) {
     if (limit_ == -1 || (limit != -1 && limit_ > limit)) limit_ = limit;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/fe/src/main/java/org/apache/impala/planner/ScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index b85532f..1373e89 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -168,7 +168,7 @@ abstract public class ScanNode extends PlanNode {
 
   public boolean isTableMissingColumnStats() {
     for (SlotDescriptor slot: desc_.getSlots()) {
-      if (!slot.getStats().hasStats()) return true;
+      if (slot.getColumn() != null && !slot.getStats().hasStats()) return true;
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 62c045b..3e0692b 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -31,12 +31,13 @@ import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.AnalyticInfo;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BaseTableRef;
-import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.BinaryPredicate.Operator;
+import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.CollectionTableRef;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprId;
 import org.apache.impala.analysis.ExprSubstitutionMap;
+import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.InlineViewRef;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.NullLiteral;
@@ -51,8 +52,8 @@ import org.apache.impala.analysis.TableSampleClause;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.analysis.TupleIsNullPredicate;
-import org.apache.impala.analysis.UnionStmt;
 import org.apache.impala.analysis.UnionStmt.UnionOperand;
+import org.apache.impala.analysis.UnionStmt;
 import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.DataSourceTable;
 import org.apache.impala.catalog.HBaseTable;
@@ -601,24 +602,22 @@ public class SingleNodePlanner {
       return createAggregationPlan(selectStmt, analyzer, emptySetNode);
     }
 
-    AggregateInfo aggInfo = selectStmt.getAggInfo();
-    // For queries which contain partition columns only, we may use the metadata instead
-    // of table scans. This is only feasible if all materialized aggregate expressions
-    // have distinct semantics. Please see createHdfsScanPlan() for details.
-    boolean fastPartitionKeyScans =
-        analyzer.getQueryCtx().client_request.query_options.optimize_partition_key_scans &&
-        aggInfo != null && aggInfo.hasAllDistinctAgg();
-
     // Separate table refs into parent refs (uncorrelated or absolute) and
     // subplan refs (correlated or relative), and generate their plan.
     List<TableRef> parentRefs = Lists.newArrayList();
     List<SubplanRef> subplanRefs = Lists.newArrayList();
     computeParentAndSubplanRefs(
         selectStmt.getTableRefs(), analyzer.isStraightJoin(), parentRefs, subplanRefs);
-    PlanNode root = createTableRefsPlan(parentRefs, subplanRefs, fastPartitionKeyScans,
-        analyzer);
+    AggregateInfo aggInfo = selectStmt.getAggInfo();
+    PlanNode root = createTableRefsPlan(parentRefs, subplanRefs, aggInfo, analyzer);
     // add aggregation, if any
-    if (aggInfo != null) root = createAggregationPlan(selectStmt, analyzer, root);
+    if (aggInfo != null) {
+      if (root instanceof HdfsScanNode) {
+        aggInfo.substitute(((HdfsScanNode) root).getOptimizedAggSmap(), analyzer);
+        aggInfo.getMergeAggInfo().substitute(((HdfsScanNode) root).getOptimizedAggSmap(), analyzer);
+      }
+      root = createAggregationPlan(selectStmt, analyzer, root);
+    }
 
     // All the conjuncts_ should be assigned at this point.
     // TODO: Re-enable this check here and/or elswehere.
@@ -763,19 +762,16 @@ public class SingleNodePlanner {
 
   /**
    * Returns a plan tree for evaluating the given parentRefs and subplanRefs.
-   *
-   * 'fastPartitionKeyScans' indicates whether to try to produce slots with
-   * metadata instead of table scans.
    */
   private PlanNode createTableRefsPlan(List<TableRef> parentRefs,
-      List<SubplanRef> subplanRefs, boolean fastPartitionKeyScans,
-      Analyzer analyzer) throws ImpalaException {
+      List<SubplanRef> subplanRefs, AggregateInfo aggInfo, Analyzer analyzer)
+      throws ImpalaException {
     // create plans for our table refs; use a list here instead of a map to
     // maintain a deterministic order of traversing the TableRefs during join
     // plan generation (helps with tests)
     List<Pair<TableRef, PlanNode>> parentRefPlans = Lists.newArrayList();
     for (TableRef ref: parentRefs) {
-      PlanNode root = createTableRefNode(ref, fastPartitionKeyScans, analyzer);
+      PlanNode root = createTableRefNode(ref, aggInfo, analyzer);
       Preconditions.checkNotNull(root);
       root = createSubplan(root, subplanRefs, true, analyzer);
       parentRefPlans.add(new Pair<TableRef, PlanNode>(ref, root));
@@ -845,7 +841,7 @@ public class SingleNodePlanner {
     // their containing SubplanNode. Also, further plan generation relies on knowing
     // whether we are in a subplan context or not (see computeParentAndSubplanRefs()).
     ctx_.pushSubplan(subplanNode);
-    PlanNode subplan = createTableRefsPlan(applicableRefs, subplanRefs, false, analyzer);
+    PlanNode subplan = createTableRefsPlan(applicableRefs, subplanRefs, null, analyzer);
     ctx_.popSubplan();
     subplanNode.setSubplan(subplan);
     subplanNode.init(analyzer);
@@ -1194,11 +1190,10 @@ public class SingleNodePlanner {
   /**
    * Create a node to materialize the slots in the given HdfsTblRef.
    *
-   * If 'hdfsTblRef' only contains partition columns and 'fastPartitionKeyScans'
-   * is true, the slots may be produced directly in this function using the metadata.
-   * Otherwise, a HdfsScanNode will be created.
+   * The given 'aggInfo' is used for detecting and applying optimizations that span both
+   * the scan and aggregation.
    */
-  private PlanNode createHdfsScanPlan(TableRef hdfsTblRef, boolean fastPartitionKeyScans,
+  private PlanNode createHdfsScanPlan(TableRef hdfsTblRef, AggregateInfo aggInfo,
       List<Expr> conjuncts, Analyzer analyzer) throws ImpalaException {
     TupleDescriptor tupleDesc = hdfsTblRef.getDesc();
 
@@ -1210,6 +1205,13 @@ public class SingleNodePlanner {
     // Mark all slots referenced by the remaining conjuncts as materialized.
     analyzer.materializeSlots(conjuncts);
 
+    // For queries which contain partition columns only, we may use the metadata instead
+    // of table scans. This is only feasible if all materialized aggregate expressions
+    // have distinct semantics. Please see createHdfsScanPlan() for details.
+    boolean fastPartitionKeyScans =
+        analyzer.getQueryCtx().client_request.query_options.optimize_partition_key_scans &&
+        aggInfo != null && aggInfo.hasAllDistinctAgg();
+
     // If the optimization for partition key scans with metadata is enabled,
     // try evaluating with metadata first. If not, fall back to scanning.
     if (fastPartitionKeyScans && tupleDesc.hasClusteringColsOnly()) {
@@ -1245,7 +1247,7 @@ public class SingleNodePlanner {
     } else {
       ScanNode scanNode =
           new HdfsScanNode(ctx_.getNextNodeId(), tupleDesc, conjuncts, partitions,
-              hdfsTblRef);
+              hdfsTblRef, aggInfo);
       scanNode.init(analyzer);
       return scanNode;
     }
@@ -1254,13 +1256,13 @@ public class SingleNodePlanner {
   /**
    * Create node for scanning all data files of a particular table.
    *
-   * 'fastPartitionKeyScans' indicates whether to try to produce the slots with
-   * metadata instead of table scans. Only applicable to HDFS tables.
+   * The given 'aggInfo' is used for detecting and applying optimizations that span both
+   * the scan and aggregation. Only applicable to HDFS table refs.
    *
    * Throws if a PlanNode.init() failed or if planning of the given
    * table ref is not implemented.
    */
-  private PlanNode createScanNode(TableRef tblRef, boolean fastPartitionKeyScans,
+  private PlanNode createScanNode(TableRef tblRef, AggregateInfo aggInfo,
       Analyzer analyzer) throws ImpalaException {
     ScanNode scanNode = null;
 
@@ -1289,9 +1291,10 @@ public class SingleNodePlanner {
 
     Table table = tblRef.getTable();
     if (table instanceof HdfsTable) {
-      return createHdfsScanPlan(tblRef, fastPartitionKeyScans, conjuncts, analyzer);
+      return createHdfsScanPlan(tblRef, aggInfo, conjuncts, analyzer);
     } else if (table instanceof DataSourceTable) {
-      scanNode = new DataSourceScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), conjuncts);
+      scanNode = new DataSourceScanNode(ctx_.getNextNodeId(), tblRef.getDesc(),
+          conjuncts);
       scanNode.init(analyzer);
       return scanNode;
     } else if (table instanceof HBaseTable) {
@@ -1496,18 +1499,17 @@ public class SingleNodePlanner {
    * Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef,
    * CollectionTableRef or an InlineViewRef.
    *
-   * 'fastPartitionKeyScans' indicates whether to try to produce the slots with
-   * metadata instead of table scans. Only applicable to BaseTableRef which is also
-   * an HDFS table.
+   * The given 'aggInfo' is used for detecting and applying optimizations that span both
+   * the scan and aggregation. Only applicable to HDFS table refs.
    *
    * Throws if a PlanNode.init() failed or if planning of the given
    * table ref is not implemented.
    */
-  private PlanNode createTableRefNode(TableRef tblRef, boolean fastPartitionKeyScans,
+  private PlanNode createTableRefNode(TableRef tblRef, AggregateInfo aggInfo,
       Analyzer analyzer) throws ImpalaException {
     PlanNode result = null;
     if (tblRef instanceof BaseTableRef) {
-      result = createScanNode(tblRef, fastPartitionKeyScans, analyzer);
+      result = createScanNode(tblRef, aggInfo, analyzer);
     } else if (tblRef instanceof CollectionTableRef) {
       if (tblRef.isRelative()) {
         Preconditions.checkState(ctx_.hasSubplan());
@@ -1515,7 +1517,7 @@ public class SingleNodePlanner {
             (CollectionTableRef) tblRef);
         result.init(analyzer);
       } else {
-        result = createScanNode(tblRef, false, analyzer);
+        result = createScanNode(tblRef, null, analyzer);
       }
     } else if (tblRef instanceof InlineViewRef) {
       result = createInlineViewPlan(analyzer, (InlineViewRef) tblRef);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/fe/src/main/java/org/apache/impala/rewrite/NormalizeCountStarRule.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/rewrite/NormalizeCountStarRule.java b/fe/src/main/java/org/apache/impala/rewrite/NormalizeCountStarRule.java
new file mode 100644
index 0000000..90556c1
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/rewrite/NormalizeCountStarRule.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.rewrite;
+
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.FunctionCallExpr;
+import org.apache.impala.analysis.FunctionName;
+import org.apache.impala.analysis.FunctionParams;
+import org.apache.impala.common.AnalysisException;
+
+/**
+ * Replaces count(<literal>) with an equivalent count{*}.
+ *
+ * Examples:
+ * count(1)    --> count(*)
+ * count(2017) --> count(*)
+ * count(null) --> count(null)
+ */
+public class NormalizeCountStarRule implements ExprRewriteRule {
+  public static ExprRewriteRule INSTANCE = new NormalizeCountStarRule();
+
+  @Override
+  public Expr apply(Expr expr, Analyzer analyzer) throws AnalysisException {
+    if (!(expr instanceof FunctionCallExpr)) return expr;
+    FunctionCallExpr origExpr = (FunctionCallExpr) expr;
+    if (!origExpr.getFnName().getFunction().equalsIgnoreCase("count")) return expr;
+    if (origExpr.getParams().isStar()) return expr;
+    if (origExpr.getParams().isDistinct()) return expr;
+    if (origExpr.getParams().exprs().size() != 1) return expr;
+    Expr child = origExpr.getChild(0);
+    if (!child.isLiteral()) return expr;
+    if (child.isNullLiteral()) return expr;
+    FunctionCallExpr result = new FunctionCallExpr(new FunctionName("count"),
+        FunctionParams.createStarParam());
+    return result;
+  }
+
+  private NormalizeCountStarRule() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
index d20aedf..e49c652 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
@@ -24,13 +24,14 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FrontendTestBase;
 import org.apache.impala.rewrite.BetweenToCompoundRule;
 import org.apache.impala.rewrite.EqualityDisjunctsToInRule;
-import org.apache.impala.rewrite.SimplifyConditionalsRule;
 import org.apache.impala.rewrite.ExprRewriteRule;
 import org.apache.impala.rewrite.ExprRewriter;
 import org.apache.impala.rewrite.ExtractCommonConjunctRule;
 import org.apache.impala.rewrite.FoldConstantsRule;
 import org.apache.impala.rewrite.NormalizeBinaryPredicatesRule;
+import org.apache.impala.rewrite.NormalizeCountStarRule;
 import org.apache.impala.rewrite.NormalizeExprsRule;
+import org.apache.impala.rewrite.SimplifyConditionalsRule;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -510,4 +511,18 @@ public class ExprRewriteRulesTest extends FrontendTestBase {
             + "(select smallint_col from functional.alltypessmall where smallint_col<10)",
         edToInrule, null);
   }
+
+  @Test
+  public void TestNormalizeCountStarRule() throws AnalysisException {
+    ExprRewriteRule rule = NormalizeCountStarRule.INSTANCE;
+
+    RewritesOk("count(1)", rule, "count(*)");
+    RewritesOk("count(5)", rule, "count(*)");
+
+    // Verify that these don't get rewritten.
+    RewritesOk("count(null)", rule, null);
+    RewritesOk("count(id)", rule, null);
+    RewritesOk("count(1 + 1)", rule, null);
+    RewritesOk("count(1 + null)", rule, null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index d33e678..4641d68 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -302,6 +302,9 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testParquetStatsAgg() { runPlannerTestFile("parquet-stats-agg"); }
+
+  @Test
   public void testParquetFiltering() {
     TQueryOptions options = defaultQueryOptions();
     options.setExplain_level(TExplainLevel.EXTENDED);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
index ffabd6b..a707e44 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
@@ -41,7 +41,7 @@ PLAN-ROOT SINK
 select count(*) from functional_parquet.alltypes
 ---- DISTRIBUTEDPLAN
 Per-Host Resource Reservation: Memory=0B
-Per-Host Resource Estimates: Memory=20.00MB
+Per-Host Resource Estimates: Memory=26.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_parquet.alltypes
 
@@ -53,10 +53,10 @@ PLAN-ROOT SINK
 02:EXCHANGE [UNPARTITIONED]
 |
 01:AGGREGATE
-|  output: count(*)
+|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=179.68KB
 ====
 # > 3000 rows returned to coordinator: codegen should be enabled
 select * from functional_parquet.alltypes
@@ -71,7 +71,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=179.68KB
 ====
 # Optimisation is enabled for join producing < 3000 rows
 select count(*)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
index b5361a6..0a41fb9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
@@ -445,11 +445,11 @@ where t.x + t.y > 10 and t.x > 0 and t.y > 1
 PLAN-ROOT SINK
 |
 02:AGGREGATE [FINALIZE]
-|  output: count(1), count:merge(1)
-|  having: count(1) > 0, zeroifnull(count(1)) > 1, count(1) + zeroifnull(count(1)) > 10
+|  output: count(1), count:merge(*)
+|  having: count(1) > 0, zeroifnull(count(*)) > 1, count(1) + zeroifnull(count(*)) > 10
 |
 01:AGGREGATE
-|  output: count(1)
+|  output: count(*)
 |  group by: 1
 |
 00:SCAN HDFS [functional.alltypes]
@@ -458,22 +458,22 @@ PLAN-ROOT SINK
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
-|  output: count:merge(1), count:merge(1)
-|  having: count(1) > 0, zeroifnull(count(1)) > 1, count(1) + zeroifnull(count(1)) > 10
+|  output: count:merge(1), count:merge(*)
+|  having: count(1) > 0, zeroifnull(count(*)) > 1, count(1) + zeroifnull(count(*)) > 10
 |
 05:EXCHANGE [UNPARTITIONED]
 |
 02:AGGREGATE
-|  output: count(1), count:merge(1)
+|  output: count(1), count:merge(*)
 |
 04:AGGREGATE
-|  output: count:merge(1)
+|  output: count:merge(*)
 |  group by: 1
 |
 03:EXCHANGE [HASH(1)]
 |
 01:AGGREGATE [STREAMING]
-|  output: count(1)
+|  output: count(*)
 |  group by: 1
 |
 00:SCAN HDFS [functional.alltypes]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test b/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
index 08dbd4d..c4b45ba 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
@@ -1458,7 +1458,7 @@ on (t3.string_col = t1.string_col_1 and t3.date_string_col = t1.string_col_2)
 PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
-|  output: count(1)
+|  output: count(*)
 |
 04:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t2.string_col = t3.string_col, t1.string_col = t3.date_string_col

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
new file mode 100644
index 0000000..1afe61c
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
@@ -0,0 +1,349 @@
+# Verify that that the parquet count(*) optimization is applied in all count(*) or
+# count(<literal>) cases when scanning a Parquet table. In the last case, we are scanning
+# a text table, so the optimization is not applied.
+select count(*) from functional_parquet.alltypes
+union all
+select count(1) from functional_parquet.alltypes
+union all
+select count(123) from functional_parquet.alltypes
+union all
+select count(*) from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|
+|--08:AGGREGATE [FINALIZE]
+|  |  output: count(*)
+|  |
+|  07:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|
+|--06:AGGREGATE [FINALIZE]
+|  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |
+|  05:SCAN HDFS [functional_parquet.alltypes]
+|     partitions=24/24 files=24 size=178.13KB
+|
+|--04:AGGREGATE [FINALIZE]
+|  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |
+|  03:SCAN HDFS [functional_parquet.alltypes]
+|     partitions=24/24 files=24 size=178.13KB
+|
+02:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|
+01:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|
+|--16:AGGREGATE [FINALIZE]
+|  |  output: count:merge(*)
+|  |
+|  15:EXCHANGE [UNPARTITIONED]
+|  |
+|  08:AGGREGATE
+|  |  output: count(*)
+|  |
+|  07:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|
+|--14:AGGREGATE [FINALIZE]
+|  |  output: count:merge(*)
+|  |
+|  13:EXCHANGE [UNPARTITIONED]
+|  |
+|  06:AGGREGATE
+|  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |
+|  05:SCAN HDFS [functional_parquet.alltypes]
+|     partitions=24/24 files=24 size=178.13KB
+|
+|--12:AGGREGATE [FINALIZE]
+|  |  output: count:merge(*)
+|  |
+|  11:EXCHANGE [UNPARTITIONED]
+|  |
+|  04:AGGREGATE
+|  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |
+|  03:SCAN HDFS [functional_parquet.alltypes]
+|     partitions=24/24 files=24 size=178.13KB
+|
+10:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|
+09:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|
+01:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+====
+# Verify that the parquet count(*) optimization is applied even if there is more than
+# one item in the select list.
+select count(*), count(1), count(123) from functional_parquet.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+====
+# Select count(<partition col>) - the optimization should be disabled because it's not a
+# count(<literal>) or count(*) aggregate function.
+select count(year) from functional_parquet.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(year)
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+====
+# Group by partition columns.
+select month, count(*) from functional_parquet.alltypes group by month, year
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  group by: month, year
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+====
+# The optimization is disabled because tinyint_col is not a partition col.
+select tinyint_col, count(*) from functional_parquet.alltypes group by tinyint_col, year
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: tinyint_col, year
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+====
+# The optimization is disabled because there are two aggregate functions.
+select avg(year), count(*) from functional_parquet.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: avg(year), count(*)
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+====
+# Optimization is not applied because the inner count(*) is not materialized. The outer
+# count(*) does not reference a base table.
+select count(*) from (select count(*) from functional_parquet.alltypes) t
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+01:AGGREGATE [FINALIZE]
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+====
+# The optimization is applied if count(*) is in the having clause.
+select 1 from functional_parquet.alltypes having count(*) > 1
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  having: count(*) > 1
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+====
+# The count(*) optimization is applied in the inline view.
+select count(*), count(a) from (select count(1) as a from functional_parquet.alltypes) t
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(*), count(count(*))
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+====
+# The count(*) optimization is applied to the inline view even if there is a join.
+select *
+from functional.alltypes x inner join (
+  select count(1) as a from functional_parquet.alltypes group by year
+) t on x.id = t.a;
+---- PLAN
+PLAN-ROOT SINK
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: x.id = count(*)
+|  runtime filters: RF000 <- count(*)
+|
+|--02:AGGREGATE [FINALIZE]
+|  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |  group by: year
+|  |
+|  01:SCAN HDFS [functional_parquet.alltypes]
+|     partitions=24/24 files=24 size=178.13KB
+|
+00:SCAN HDFS [functional.alltypes x]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> x.id
+====
+# The count(*) optimization is not applied if there is more than 1 table ref.
+select count(*) from functional_parquet.alltypes a, functional_parquet.alltypes b
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+02:NESTED LOOP JOIN [CROSS JOIN]
+|
+|--01:SCAN HDFS [functional_parquet.alltypes b]
+|     partitions=24/24 files=24 size=178.13KB
+|
+00:SCAN HDFS [functional_parquet.alltypes a]
+   partitions=24/24 files=24 size=178.13KB
+====
+# The count(*) optimization is applied if there are predicates on partition columns.
+select count(1) from functional_parquet.alltypes where year < 2010 and month > 8;
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=4/24 files=4 size=29.75KB
+====
+# tinyint_col is not a partition column so the optimization is disabled.
+select count(1) from functional_parquet.alltypes where year < 2010 and tinyint_col > 8;
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=12/24 files=12 size=89.05KB
+   predicates: tinyint_col > 8
+====
+# Optimization is applied after constant folding.
+select count(1 + 2 + 3) from functional_parquet.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+====
+# Optimization is not applied to count(null).
+select count(1 + null + 3) from functional_parquet.alltypes
+union all
+select count(null) from functional_parquet.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|
+|--04:AGGREGATE [FINALIZE]
+|  |  output: count(NULL)
+|  |
+|  03:SCAN HDFS [functional_parquet.alltypes]
+|     partitions=24/24 files=24 size=178.13KB
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(NULL + 3)
+|
+01:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+====
+# Optimization is not applied when selecting from an empty table.
+select count(*) from functional_parquet.emptytable
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:SCAN HDFS [functional_parquet.emptytable]
+   partitions=0/0 files=0 size=0B
+====
+# Optimization is not applied when all partitions are pruned.
+select count(1) from functional_parquet.alltypes where year = -1
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=0/24 files=0 size=0B
+====
+# Optimization is not applied across query blocks, even though it would be correct here.
+select count(*) from (select int_col from functional_parquet.alltypes) t
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+====
+# Optimization is not applied when there is a distinct agg.
+select count(*), count(distinct 1) from functional_parquet.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(1), count:merge(*)
+|
+01:AGGREGATE
+|  output: count(*)
+|  group by: 1
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=178.13KB
+====
+# The optimization is applied here because only the count(*) and a partition column are
+# materialized. Non-materialized agg exprs are ignored.
+select year, cnt from (
+  select year, count(bigint_col), count(*) cnt, avg(int_col)
+  from functional_parquet.alltypes
+  where month=1
+  group by year
+) t
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  group by: year
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=2/24 files=2 size=15.01KB
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 0d527de..bb97c26 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -320,7 +320,7 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 select count(*) from tpch_parquet.lineitem
 ---- DISTRIBUTEDPLAN
 Per-Host Resource Reservation: Memory=0B
-Per-Host Resource Estimates: Memory=20.00MB
+Per-Host Resource Estimates: Memory=90.00MB
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
@@ -337,7 +337,7 @@ PLAN-ROOT SINK
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 01:AGGREGATE
-|  output: count(*)
+|  output: sum_init_zero(tpch_parquet.lineitem.parquet-stats: num_rows)
 |  mem-estimate=10.00MB mem-reservation=0B
 |  tuple-ids=1 row-size=8B cardinality=1
 |
@@ -346,8 +346,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
    stats-rows=6001215 extrapolated-rows=disabled
    table stats: rows=6001215 size=193.92MB
    column stats: all
-   mem-estimate=0B mem-reservation=0B
-   tuple-ids=0 row-size=0B cardinality=6001215
+   mem-estimate=80.00MB mem-reservation=0B
+   tuple-ids=0 row-size=8B cardinality=6001215
 ---- PARALLELPLANS
 Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=180.00MB
@@ -367,7 +367,7 @@ PLAN-ROOT SINK
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 01:AGGREGATE
-|  output: count(*)
+|  output: sum_init_zero(tpch_parquet.lineitem.parquet-stats: num_rows)
 |  mem-estimate=10.00MB mem-reservation=0B
 |  tuple-ids=1 row-size=8B cardinality=1
 |
@@ -377,7 +377,7 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
    table stats: rows=6001215 size=193.92MB
    column stats: all
    mem-estimate=80.00MB mem-reservation=0B
-   tuple-ids=0 row-size=0B cardinality=6001215
+   tuple-ids=0 row-size=8B cardinality=6001215
 ====
 # Sort
 select *

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
index 524c63b..de1c507 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
@@ -1255,3 +1255,67 @@ where t2.int_col IN (t1.int_col_1, t1.int_col)
 ---- TYPES
 TIMESTAMP,BIGINT
 ====
+---- QUERY
+# IMPALA-5036: Tests the correctness of the Parquet count(*) optimization.
+select count(1)
+from functional_parquet.alltypes
+---- RESULTS
+7300
+---- TYPES
+bigint
+=====
+---- QUERY
+# IMPALA-5036: Parquet count(*) optimization with predicates on the partition columns.
+select count(1)
+from functional_parquet.alltypes where year < 2010 and month > 8
+---- RESULTS
+1220
+---- TYPES
+bigint
+=====
+---- QUERY
+# IMPALA-5036: Parquet count(*) optimization with group by partition columns.
+select year, month, count(1)
+from functional_parquet.alltypes where month > 10 group by year, month
+---- RESULTS
+2009,11,300
+2009,12,310
+2010,11,300
+2010,12,310
+---- TYPES
+int, int, bigint
+=====
+---- QUERY
+# IMPALA-5036: Parquet count(*) optimization with both group by and predicates on
+# partition columns.
+select count(1)
+from functional_parquet.alltypes where year < 2010 and month > 8
+group by month
+---- RESULTS
+310
+300
+310
+300
+---- TYPES
+bigint
+=====
+---- QUERY
+# IMPALA-5036: Parquet count(*) optimization with the result of the going into a join.
+select x.bigint_col from functional.alltypes x
+  inner join (
+    select count(1) as a from functional_parquet.alltypes group by year
+  ) t on x.id = t.a;
+---- RESULTS
+0
+0
+---- TYPES
+bigint
+=====
+---- QUERY
+# IMPALA-5036: Parquet count(*) optimization with the agg function in the having clause.
+select 1 from functional_parquet.alltypes having count(*) > 1
+---- RESULTS
+1
+---- TYPES
+tinyint
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test
new file mode 100644
index 0000000..3b1c33b
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test
@@ -0,0 +1,117 @@
+====
+---- QUERY
+# Tests the correctness of the Parquet count(*) optimization.
+select count(1)
+from functional_parquet.alltypes
+---- RESULTS
+7300
+---- TYPES
+bigint
+=====
+---- QUERY
+# Parquet count(*) optimization with predicates on the partition columns.
+select count(1)
+from functional_parquet.alltypes where year < 2010 and month > 8
+---- RESULTS
+1220
+---- TYPES
+bigint
+=====
+---- QUERY
+# Parquet count(*) optimization with group by partition columns.
+select year, month, count(1)
+from functional_parquet.alltypes group by year, month
+---- RESULTS
+2009,1,310
+2009,2,280
+2009,3,310
+2009,4,300
+2009,5,310
+2009,6,300
+2009,7,310
+2009,8,310
+2009,9,300
+2009,10,310
+2009,11,300
+2009,12,310
+2010,1,310
+2010,2,280
+2010,3,310
+2010,4,300
+2010,5,310
+2010,6,300
+2010,7,310
+2010,8,310
+2010,9,300
+2010,10,310
+2010,11,300
+2010,12,310
+---- TYPES
+int, int, bigint
+=====
+---- QUERY
+# Parquet count(*) optimization with both group by and predicates on partition columns.
+select count(1)
+from functional_parquet.alltypes where year < 2010 and month > 8
+group by month
+---- RESULTS
+310
+300
+310
+300
+---- TYPES
+bigint
+=====
+---- QUERY
+# Parquet count(*) optimization with the result going into a join.
+select x.bigint_col from functional.alltypes x
+  inner join (
+    select count(1) as a from functional_parquet.alltypes group by year
+  ) t on x.id = t.a;
+---- RESULTS
+0
+0
+---- TYPES
+bigint
+=====
+---- QUERY
+# Parquet count(*) optimization with the agg function in the having clause.
+select 1 from functional_parquet.alltypes having count(*) > 1
+---- RESULTS
+1
+---- TYPES
+tinyint
+====
+---- QUERY
+# Verify that 0 is returned for count(*) on an empty table.
+select count(1) from functional_parquet.emptytable
+---- RESULTS
+0
+---- TYPES
+bigint
+=====
+---- QUERY
+# Verify that 0 is returned when all partitions are pruned.
+select count(1) from functional_parquet.alltypes where year = -1
+---- RESULTS
+0
+---- TYPES
+bigint
+=====
+---- QUERY
+# Test different row group size combinations.
+select count(*) from functional_parquet.lineitem_multiblock
+union all
+select count(*) from functional_parquet.lineitem_multiblock_one_row_group
+union all
+select count(*) from functional_parquet.lineitem_sixblocks
+union all
+select count(*) from tpch_parquet.lineitem
+---- RESULTS
+20000
+40000
+40000
+6001215
+---- TYPES
+bigint
+=====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57d7c614/testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test
new file mode 100644
index 0000000..d03b4c9
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test
@@ -0,0 +1,460 @@
+====
+---- QUERY
+select id, bool_col from functional_parquet.alltypessmall where int_col < 0
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+set explain_level=2;
+explain select id, bool_col from functional_parquet.alltypessmall where int_col < 0;
+---- RESULTS: VERIFY_IS_SUBSET
+'   parquet statistics predicates: int_col < 0'
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where smallint_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where int_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where bigint_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where float_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where double_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+# Test with inverted predicate
+select id, bool_col from functional_parquet.alltypessmall where -1 > int_col
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col > 9
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where smallint_col > 9
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select id, bool_col from functional_parquet.alltypessmall where int_col > 9
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where bigint_col > 90
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where float_col > 9.9
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where double_col > 99
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col >= 10
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col <= 0
+---- RESULTS
+12
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col >= 9
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col = -1
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col = 10
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+set explain_level=2;
+explain select count(*) from functional_parquet.alltypessmall where tinyint_col = 10
+---- RESULTS: VERIFY_IS_SUBSET
+'   parquet statistics predicates: tinyint_col = 10'
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where id >= 30 and id <= 80
+---- RESULTS
+51
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 1
+====
+---- QUERY
+# Mix with partitioning columns
+select count(*) from functional_parquet.alltypessmall where int_col < 0 and year < 2012
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select id, bool_col from functional_parquet.alltypessmall where int_col < 3 - 3
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select id, bool_col from functional_parquet.alltypessmall where int_col < 3 - 3
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+# Test that without expr rewrite and thus without constant folding, constant exprs still
+# can be used to prune row groups.
+set enable_expr_rewrites=0;
+select id, bool_col from functional_parquet.alltypessmall where int_col < 3 - 3
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select id, bool_col from functional_parquet.alltypessmall where 5 + 5 < int_col
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+# Test that without expr rewrite and thus without constant folding, constant exprs still
+# can be used to prune row groups.
+set enable_expr_rewrites=0;
+select id, bool_col from functional_parquet.alltypessmall where 5 + 5 < int_col
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+# Test name based column resolution
+create table name_resolve stored as parquet as select * from functional_parquet.alltypessmall;
+alter table name_resolve replace columns (int_col int, bool_col boolean, tinyint_col tinyint, smallint_col smallint, id int);
+set parquet_fallback_schema_resolution=NAME;
+# If this picks up the stats from int_col, then it will filter all row groups and return
+# an incorrect result.
+select count(*) from name_resolve where id > 10;
+---- RESULTS
+89
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+# Query that has an implicit cast to a larger integer type
+select count(*) from functional_parquet.alltypessmall where tinyint_col > 1000000000000
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+# Predicates with explicit casts are not supported when evaluating parquet::Statistics.
+select count(*) from functional_parquet.alltypessmall where '0' > cast(tinyint_col as string)
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+# Explicit casts between numerical types can violate the transitivity of "min()", so they
+# are not supported when evaluating parquet::Statistics.
+select count(*) from functional_parquet.alltypes where cast(id as tinyint) < 10;
+---- RESULTS
+3878
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 24
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+select count(*) from functional_parquet.complextypestbl.int_array where pos < 5;
+---- RESULTS
+9
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+# Test the conversion of constant IN lists to min/max predicats
+select count(*) from functional_parquet.alltypes where int_col in (-1,-2,-3,-4);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 24
+aggregation(SUM, NumStatsFilteredRowGroups): 24
+====
+---- QUERY
+select count(*) from functional_parquet.alltypes where id IN (1,25,49);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 24
+aggregation(SUM, NumStatsFilteredRowGroups): 23
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where string_col < "0"
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where string_col <= "/"
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where string_col < "1"
+---- RESULTS
+12
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where string_col >= "9"
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where string_col > ":"
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where timestamp_col < "2009-01-01 00:00:00"
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where timestamp_col <= "2009-01-01 00:00:00"
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 3
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where timestamp_col = "2009-01-01 00:00:00"
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 3
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where timestamp_col > "2009-04-03 00:24:00.96"
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where timestamp_col >= "2009-04-03 00:24:00.96"
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 3
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where timestamp_col = "2009-04-03 00:24:00.96"
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 3
+====
+---- QUERY
+select count(*) from functional_parquet.decimal_tbl where d1 < 1234
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumStatsFilteredRowGroups): 1
+====
+---- QUERY
+select count(*) from functional_parquet.decimal_tbl where d3 < 1.23456789
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumStatsFilteredRowGroups): 1
+====
+---- QUERY
+select count(*) from functional_parquet.decimal_tbl where d3 = 1.23456788
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumStatsFilteredRowGroups): 1
+====
+---- QUERY
+select count(*) from functional_parquet.decimal_tbl where d3 = 1.23456789
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+select count(*) from functional_parquet.decimal_tbl where d4 > 0.123456789
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumStatsFilteredRowGroups): 1
+====
+---- QUERY
+select count(*) from functional_parquet.decimal_tbl where d4 >= 0.12345678
+---- RESULTS
+5
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+select count(*) from functional_parquet.decimal_tbl where d4 >= 0.12345679
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumStatsFilteredRowGroups): 1
+====
+---- QUERY
+# Test that stats are disabled for CHAR type columns.
+create table chars (id int, c char(4)) stored as parquet;
+insert into chars values (1, cast("abaa" as char(4))), (2, cast("abab" as char(4)));
+select count(*) from chars;
+---- RESULTS
+2
+====
+---- QUERY
+select count(*) from chars where c <= "aaaa"
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+# IMPALA-4988: Test that stats support can be disabled using the parquet_read_statistics
+# query option.
+set parquet_read_statistics=0;
+select count(*) from functional_parquet.alltypes where id < 0;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 24
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====