You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2017/02/23 16:31:15 UTC

[7/7] incubator-impala git commit: IMPALA-2328: Read support for min/max Parquet statistics

IMPALA-2328: Read support for min/max Parquet statistics

This change adds support for skipping row groups based on Parquet row
group statistics. With this change we only support reading statistics
from Parquet files for numerical types (bool, integer, floating point)
and for simple predicates of the forms <slot> <op> <constant> or
<constant> <op> <slot>, where <op> is LT, LE, GE, GT, and EQ.

Change-Id: I39b836165756fcf929c801048d91c50c8fdcdae4
Reviewed-on: http://gerrit.cloudera.org:8080/6032
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/749a55c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/749a55c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/749a55c4

Branch: refs/heads/master
Commit: 749a55c4ad93849e4a63a43ae910a70f5d5f334b
Parents: d564508
Author: Lars Volker <lv...@cloudera.com>
Authored: Sat Feb 4 04:38:49 2017 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 23 11:16:17 2017 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |   1 +
 be/src/exec/hdfs-parquet-scanner.cc             |  90 +++++++-
 be/src/exec/hdfs-parquet-scanner.h              |  23 +-
 be/src/exec/hdfs-parquet-table-writer.cc        |   2 +-
 be/src/exec/hdfs-scan-node-base.cc              |  25 ++
 be/src/exec/hdfs-scan-node-base.h               |  16 ++
 be/src/exec/parquet-column-stats.cc             |  64 +++++
 be/src/exec/parquet-column-stats.h              | 119 +++-------
 be/src/exec/parquet-column-stats.inline.h       | 152 ++++++++++++
 be/src/exec/parquet-metadata-utils.cc           |   8 +
 be/src/exec/parquet-metadata-utils.h            |   3 +
 be/src/exprs/expr.h                             |   2 +
 common/thrift/PlanNodes.thrift                  |   7 +
 .../apache/impala/analysis/AnalysisContext.java |   7 +-
 .../apache/impala/analysis/BinaryPredicate.java |  15 +-
 .../impala/planner/HdfsPartitionPruner.java     |   2 +-
 .../org/apache/impala/planner/HdfsScanNode.java | 106 ++++++++-
 .../org/apache/impala/planner/KuduScanNode.java |  36 +--
 .../rewrite/NormalizeBinaryPredicatesRule.java  |  49 ++++
 .../impala/analysis/ExprRewriteRulesTest.java   |  18 ++
 .../org/apache/impala/planner/PlannerTest.java  |   4 +-
 .../queries/PlannerTest/aggregation.test        |   4 +-
 .../queries/PlannerTest/constant-folding.test   | 110 ++++++++-
 .../queries/PlannerTest/data-source-tables.test |   4 +-
 .../queries/PlannerTest/hdfs.test               |   2 +-
 .../queries/PlannerTest/implicit-joins.test     |   2 +-
 .../queries/PlannerTest/kudu.test               |  10 +-
 .../queries/PlannerTest/mt-dop-validation.test  |  16 +-
 .../PlannerTest/runtime-filter-propagation.test |   6 +-
 .../queries/PlannerTest/subquery-rewrite.test   |   8 +-
 .../queries/QueryTest/parquet_stats.test        | 231 +++++++++++++++++++
 tests/query_test/test_insert_parquet.py         |   1 +
 32 files changed, 980 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index fce5c81..57c12cb 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -70,6 +70,7 @@ add_library(Exec
   nested-loop-join-builder.cc
   nested-loop-join-node.cc
   parquet-column-readers.cc
+  parquet-column-stats.cc
   parquet-metadata-utils.cc
   partitioned-aggregation-node.cc
   partitioned-aggregation-node-ir.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 1af711e..2b0454e 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -29,6 +29,7 @@
 #include "exec/hdfs-scanner.h"
 #include "exec/hdfs-scan-node.h"
 #include "exec/parquet-column-readers.h"
+#include "exec/parquet-column-stats.h"
 #include "exec/scanner-context.inline.h"
 #include "exprs/expr.h"
 #include "runtime/collection-value-builder.h"
@@ -37,7 +38,6 @@
 #include "runtime/mem-pool.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter.inline.h"
-#include "runtime/scoped-buffer.h"
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
 #include "runtime/string-value.h"
@@ -149,6 +149,7 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
       row_group_idx_(-1),
       row_group_rows_read_(0),
       advance_row_group_(true),
+      min_max_tuple_buffer_(scan_node->mem_tracker()),
       row_batches_produced_(0),
       scratch_batch_(new ScratchTupleBatch(
           scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())),
@@ -169,6 +170,9 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   metadata_range_ = stream_->scan_range();
   num_cols_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT);
+  num_stats_filtered_row_groups_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumStatsFilteredRowGroups",
+          TUnit::UNIT);
   num_row_groups_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroups", TUnit::UNIT);
   num_scanners_with_no_reads_counter_ =
@@ -187,6 +191,21 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   level_cache_pool_.reset(new MemPool(scan_node_->mem_tracker()));
 
+  // Allocate tuple buffer to evaluate conjuncts on parquet::Statistics.
+  const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
+  if (min_max_tuple_desc) {
+    int64_t tuple_size = min_max_tuple_desc->byte_size();
+    if (!min_max_tuple_buffer_.TryAllocate(tuple_size)) {
+      return Status(Substitute("Could not allocate buffer of $0 bytes for Parquet "
+            "statistics tuple for file '$1'.", tuple_size, filename()));
+    }
+  }
+
+  // Clone the min/max statistics conjuncts.
+  RETURN_IF_ERROR(Expr::CloneIfNotExists(scan_node_->min_max_conjunct_ctxs(),
+      state_, &min_max_conjuncts_ctxs_));
+  min_max_conjuncts_ctxs_to_eval_.reserve(min_max_conjuncts_ctxs_.size());
+
   for (int i = 0; i < context->filter_ctxs().size(); ++i) {
     const FilterContext* ctx = &context->filter_ctxs()[i];
     DCHECK(ctx->filter != NULL);
@@ -283,6 +302,8 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
 
   if (schema_resolver_.get() != nullptr) schema_resolver_.reset();
 
+  Expr::Close(min_max_conjuncts_ctxs_, state_);
+
   for (int i = 0; i < filter_ctxs_.size(); ++i) {
     const FilterStats* stats = filter_ctxs_[i]->stats;
     const LocalFilterStats& local = filter_stats_[i];
@@ -451,6 +472,64 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
   return Status::OK();
 }
 
+Status HdfsParquetScanner::EvaluateStatsConjuncts(const parquet::RowGroup& row_group,
+    bool* skip_row_group) {
+  *skip_row_group = false;
+
+  const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
+  if (!min_max_tuple_desc) return Status::OK();
+
+  int64_t tuple_size = min_max_tuple_desc->byte_size();
+
+  Tuple* min_max_tuple = reinterpret_cast<Tuple*>(min_max_tuple_buffer_.buffer());
+  min_max_tuple->Init(tuple_size);
+
+  DCHECK(min_max_tuple_desc->slots().size() == min_max_conjuncts_ctxs_.size());
+
+  min_max_conjuncts_ctxs_to_eval_.clear();
+  for (int i = 0; i < min_max_conjuncts_ctxs_.size(); ++i) {
+    SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i];
+    ExprContext* conjunct = min_max_conjuncts_ctxs_[i];
+    Expr* e = conjunct->root();
+    DCHECK(e->GetChild(0)->is_slotref());
+
+    int col_idx = slot_desc->col_pos() - scan_node_->num_partition_keys();
+    DCHECK(col_idx < row_group.columns.size());
+
+    if (!ParquetMetadataUtils::HasRowGroupStats(row_group, col_idx)) continue;
+    const parquet::Statistics& stats = row_group.columns[col_idx].meta_data.statistics;
+
+    bool stats_read = false;
+    void* slot = min_max_tuple->GetSlot(slot_desc->tuple_offset());
+    const ColumnType& col_type = slot_desc->type();
+
+    if (e->function_name() == "lt" || e->function_name() == "le") {
+      // We need to get min stats.
+      stats_read = ColumnStatsBase::ReadFromThrift(stats, col_type,
+          ColumnStatsBase::StatsField::MIN, slot);
+    } else if (e->function_name() == "gt" || e->function_name() == "ge") {
+      // We need to get max stats.
+      stats_read = ColumnStatsBase::ReadFromThrift(stats, col_type,
+          ColumnStatsBase::StatsField::MAX, slot);
+    } else {
+      DCHECK(false) << "Unsupported function name for statistics evaluation: "
+          << e->function_name();
+    }
+    if (stats_read) min_max_conjuncts_ctxs_to_eval_.push_back(conjunct);
+  }
+
+  if (!min_max_conjuncts_ctxs_to_eval_.empty()) {
+    TupleRow row;
+    row.SetTuple(0, min_max_tuple);
+    if (!ExecNode::EvalConjuncts(&min_max_conjuncts_ctxs_to_eval_[0],
+          min_max_conjuncts_ctxs_to_eval_.size(), &row)) {
+      *skip_row_group = true;
+    }
+  }
+
+  return Status::OK();
+}
+
 Status HdfsParquetScanner::NextRowGroup() {
   const DiskIoMgr::ScanRange* split_range = static_cast<ScanRangeMetadata*>(
       metadata_range_->meta_data())->original_split;
@@ -502,8 +581,17 @@ Status HdfsParquetScanner::NextRowGroup() {
       misaligned_row_group_skipped |= CheckRowGroupOverlapsSplit(row_group, split_range);
       continue;
     }
+
     COUNTER_ADD(num_row_groups_counter_, 1);
 
+    // Evaluate row group statistics.
+    bool skip_row_group_on_stats;
+    RETURN_IF_ERROR(EvaluateStatsConjuncts(row_group, &skip_row_group_on_stats));
+    if (skip_row_group_on_stats) {
+      COUNTER_ADD(num_stats_filtered_row_groups_counter_, 1);
+      continue;
+    }
+
     // Prepare column readers for first read
     RETURN_IF_ERROR(InitColumns(row_group_idx_, column_readers_));
     bool seeding_ok = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 02d7d5d..87f488f 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -24,6 +24,7 @@
 #include "exec/parquet-common.h"
 #include "exec/parquet-scratch-tuple-batch.h"
 #include "exec/parquet-metadata-utils.h"
+#include "runtime/scoped-buffer.h"
 #include "util/runtime-profile-counters.h"
 
 namespace impala {
@@ -375,7 +376,19 @@ class HdfsParquetScanner : public HdfsScanner {
 
   boost::scoped_ptr<ParquetSchemaResolver> schema_resolver_;
 
-  /// Cached runtime filter contexts, one for each filter that applies to this column.
+  /// Buffer to back tuples when reading parquet::Statistics.
+  ScopedBuffer min_max_tuple_buffer_;
+
+  /// Min/max statistics contexts, owned by HdfsScanner::state_->obj_pool_.
+  vector<ExprContext*> min_max_conjuncts_ctxs_;
+
+  /// Used in EvaluateRowGroupStats() to store non-owning copies of conjunct pointers from
+  /// 'min_max_conjunct_ctxs_'. It is declared here to avoid the dynamic allocation
+  /// overhead.
+  vector<ExprContext*> min_max_conjuncts_ctxs_to_eval_;
+
+  /// Cached runtime filter contexts, one for each filter that applies to this column,
+  /// owned by instances of this class.
   vector<const FilterContext*> filter_ctxs_;
 
   struct LocalFilterStats {
@@ -440,6 +453,9 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Number of columns that need to be read.
   RuntimeProfile::Counter* num_cols_counter_;
 
+  /// Number of row groups that are skipped because of Parquet row group statistics.
+  RuntimeProfile::Counter* num_stats_filtered_row_groups_counter_;
+
   /// Number of row groups that need to be read.
   RuntimeProfile::Counter* num_row_groups_counter_;
 
@@ -455,6 +471,11 @@ class HdfsParquetScanner : public HdfsScanner {
 
   virtual Status GetNextInternal(RowBatch* row_batch);
 
+  /// Evaluates the min/max predicates of the 'scan_node_' using the parquet::Statistics
+  /// of 'row_group'. Sets 'skip_row_group' to true if the row group can be skipped,
+  /// 'false' otherwise.
+  Status EvaluateStatsConjuncts(const parquet::RowGroup& row_group, bool* skip_row_group);
+
   /// Check runtime filters' effectiveness every BATCHES_PER_FILTER_SELECTIVITY_CHECK
   /// row batches. Will update 'filter_stats_'.
   void CheckFiltersEffectiveness();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 9e9cb3e..6114f4e 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -18,7 +18,7 @@
 #include "exec/hdfs-parquet-table-writer.h"
 
 #include "common/version.h"
-#include "exec/parquet-column-stats.h"
+#include "exec/parquet-column-stats.inline.h"
 #include "exprs/expr-context.h"
 #include "exprs/expr.h"
 #include "rpc/thrift-util.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 3a5719f..6746ef5 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -83,6 +83,9 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
                            const DescriptorTbl& descs)
     : ScanNode(pool, tnode, descs),
       runtime_state_(NULL),
+      min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ?
+          tnode.hdfs_scan_node.min_max_tuple_id : -1),
+      min_max_tuple_desc_(nullptr),
       skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
           tnode.hdfs_scan_node.skip_header_line_count : 0),
       tuple_id_(tnode.hdfs_scan_node.tuple_id),
@@ -146,6 +149,12 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
   // Add row batch conjuncts
   DCHECK(conjuncts_map_[tuple_id_].empty());
   conjuncts_map_[tuple_id_] = conjunct_ctxs_;
+
+  // Add min max conjuncts
+  RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, tnode.hdfs_scan_node.min_max_conjuncts,
+      &min_max_conjunct_ctxs_));
+  DCHECK(min_max_conjunct_ctxs_.empty() == (min_max_tuple_id_ == -1));
+
   return Status::OK();
 }
 
@@ -169,6 +178,17 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
         Expr::Prepare(entry.second, state, *collection_row_desc, expr_mem_tracker()));
   }
 
+  // Prepare min max statistics conjuncts.
+  if (min_max_tuple_id_ != -1) {
+    min_max_tuple_desc_ = state->desc_tbl().GetTupleDescriptor(min_max_tuple_id_);
+    DCHECK(min_max_tuple_desc_ != NULL);
+    RowDescriptor* min_max_row_desc =
+        state->obj_pool()->Add(new RowDescriptor(min_max_tuple_desc_, /* is_nullable */
+        false));
+    RETURN_IF_ERROR(Expr::Prepare(min_max_conjunct_ctxs_, state, *min_max_row_desc,
+        expr_mem_tracker()));
+  }
+
   // One-time initialisation of state that is constant across scan ranges
   DCHECK(tuple_desc_->table_desc() != NULL);
   hdfs_table_ = static_cast<const HdfsTableDescriptor*>(tuple_desc_->table_desc());
@@ -352,6 +372,9 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
     RETURN_IF_ERROR(Expr::Open(entry.second, state));
   }
 
+  // Open min max conjuncts
+  RETURN_IF_ERROR(Expr::Open(min_max_conjunct_ctxs_, state));
+
   for (FilterContext& filter: filter_ctxs_) RETURN_IF_ERROR(filter.expr_ctx->Open(state));
 
   // Create template tuples for all partitions.
@@ -458,6 +481,8 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
     Expr::Close(tid_conjunct.second, state);
   }
 
+  Expr::Close(min_max_conjunct_ctxs_, state);
+
   for (auto& filter_ctx: filter_ctxs_) filter_ctx.expr_ctx->Close(state);
   ScanNode::Close(state);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 6313f8b..cc31d9b 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -143,6 +143,13 @@ class HdfsScanNodeBase : public ScanNode {
   /// Returns number of partition key slots.
   int num_materialized_partition_keys() const { return partition_key_slots_.size(); }
 
+  int min_max_tuple_id() const { return min_max_tuple_id_; }
+
+  const std::vector<ExprContext*> min_max_conjunct_ctxs() const {
+    return min_max_conjunct_ctxs_;
+  }
+
+  const TupleDescriptor* min_max_tuple_desc() const { return min_max_tuple_desc_; }
   const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
   const HdfsTableDescriptor* hdfs_table() { return hdfs_table_; }
   const AvroSchemaElement& avro_schema() { return *avro_schema_.get(); }
@@ -280,6 +287,15 @@ class HdfsScanNodeBase : public ScanNode {
 
   RuntimeState* runtime_state_;
 
+  /// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics.
+  const int min_max_tuple_id_;
+
+  /// Conjuncts to evaluate on parquet::Statistics.
+  vector<ExprContext*> min_max_conjunct_ctxs_;
+
+  /// Descriptor for the tuple used to evaluate conjuncts on parquet::Statistics.
+  TupleDescriptor* min_max_tuple_desc_;
+
   // Number of header lines to skip at the beginning of each file of this table. Only set
   // to values > 0 for hdfs text files.
   const int skip_header_line_count_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/be/src/exec/parquet-column-stats.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.cc b/be/src/exec/parquet-column-stats.cc
new file mode 100644
index 0000000..202dc02
--- /dev/null
+++ b/be/src/exec/parquet-column-stats.cc
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet-column-stats.inline.h"
+
+namespace impala {
+
+bool ColumnStatsBase::ReadFromThrift(const parquet::Statistics& thrift_stats,
+    const ColumnType& col_type, const StatsField& stats_field, void* slot) {
+  switch (col_type.type) {
+    case TYPE_BOOLEAN:
+      return ColumnStats<bool>::ReadFromThrift(thrift_stats, stats_field, slot);
+    case TYPE_TINYINT:
+      return ColumnStats<int32_t>::ReadFromThrift(thrift_stats, stats_field, slot);
+    case TYPE_SMALLINT:
+      return ColumnStats<int32_t>::ReadFromThrift(thrift_stats, stats_field, slot);
+    case TYPE_INT:
+      return ColumnStats<int32_t>::ReadFromThrift(thrift_stats, stats_field, slot);
+    case TYPE_BIGINT:
+      return ColumnStats<int64_t>::ReadFromThrift(thrift_stats, stats_field, slot);
+    case TYPE_FLOAT:
+      return ColumnStats<float>::ReadFromThrift(thrift_stats, stats_field, slot);
+    case TYPE_DOUBLE:
+      return ColumnStats<double>::ReadFromThrift(thrift_stats, stats_field, slot);
+    case TYPE_TIMESTAMP:
+      /// TODO add support for TimestampValue (IMPALA-4819)
+      break;
+    case TYPE_STRING:
+    case TYPE_VARCHAR:
+    case TYPE_CHAR:
+      /// TODO add support for StringValue (IMPALA-4817)
+      break;
+    case TYPE_DECIMAL:
+      /// TODO add support for DecimalValue (IMPALA-4815)
+      switch (col_type.GetByteSize()) {
+        case 4:
+          break;
+        case 8:
+          break;
+        case 16:
+          break;
+      }
+      break;
+    default:
+      DCHECK(false) << col_type.DebugString();
+  }
+  return false;
+}
+
+}  // end ns impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/be/src/exec/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h
index a279781..995f20a 100644
--- a/be/src/exec/parquet-column-stats.h
+++ b/be/src/exec/parquet-column-stats.h
@@ -18,14 +18,19 @@
 #ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_H
 #define IMPALA_EXEC_PARQUET_COLUMN_STATS_H
 
-#include <algorithm>
+#include <string>
 #include <type_traits>
 
+#include "runtime/timestamp-value.h"
+#include "runtime/types.h"
+#include "exec/parquet-common.h"
+
 namespace impala {
 
 /// This class, together with its derivatives, is used to track column statistics when
 /// writing parquet files. It provides an interface to populate a parquet::Statistics
-/// object and attach it to an object supplied by the caller.
+/// object and attach it to an object supplied by the caller. It can also be used to
+/// decode parquet::Statistics into slots.
 ///
 /// We currently support tracking 'min' and 'max' values for statistics. The other two
 /// statistical values in parquet.thrift, 'null_count' and 'distinct_count' are not
@@ -50,9 +55,19 @@ namespace impala {
 /// TODO: Populate null_count and distinct_count.
 class ColumnStatsBase {
  public:
+  /// Enum to select statistics value when reading from parquet::Statistics structs.
+  enum class StatsField { MAX, MIN, NULL_COUNT, DISTINCT_COUNT };
+
   ColumnStatsBase() : has_values_(false) {}
   virtual ~ColumnStatsBase() {}
 
+  /// Decodes the parquet::Statistics from 'row_group' and writes the value selected by
+  /// 'stats_field' into the buffer pointed to by 'slot', based on 'col_type'. Returns
+  /// 'true' if reading statistics for columns of type 'col_type' is supported and
+  /// decoding was successful, 'false' otherwise.
+  static bool ReadFromThrift(const parquet::Statistics& thrift_stats,
+      const ColumnType& col_type, const StatsField& stats_field, void* slot);
+
   /// Merges this statistics object with values from 'other'. If other has not been
   /// initialized, then this object will not be changed.
   virtual void Merge(const ColumnStatsBase& other) = 0;
@@ -96,63 +111,31 @@ class ColumnStats : public ColumnStatsBase {
   ColumnStats(int plain_encoded_value_size)
     : ColumnStatsBase(), plain_encoded_value_size_(plain_encoded_value_size) {}
 
+  /// Decodes the parquet::Statistics from 'row_group' and writes the value selected by
+  /// 'stats_field' into the buffer pointed to by 'slot'. Returns 'true' if reading
+  /// statistics for columns of type 'col_type' is supported and decoding was successful,
+  /// 'false' otherwise.
+  static bool ReadFromThrift(const parquet::Statistics& thrift_stats,
+      const StatsField& stats_field, void* slot);
+
   /// Updates the statistics based on the value 'v'. If necessary, initializes the
   /// statistics.
-  void Update(const T& v) {
-    if (!has_values_) {
-      has_values_ = true;
-      min_value_ = v;
-      max_value_ = v;
-    } else {
-      min_value_ = std::min(min_value_, v);
-      max_value_ = std::max(max_value_, v);
-    }
-  }
-
-  virtual void Merge(const ColumnStatsBase& other) override {
-    DCHECK(dynamic_cast<const ColumnStats<T>*>(&other));
-    const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other);
-    if (!cs->has_values_) return;
-    if (!has_values_) {
-      has_values_ = true;
-      min_value_ = cs->min_value_;
-      max_value_ = cs->max_value_;
-    } else {
-      min_value_ = std::min(min_value_, cs->min_value_);
-      max_value_ = std::max(max_value_, cs->max_value_);
-    }
-  }
-
-  virtual int64_t BytesNeeded() const override {
-    return BytesNeededInternal(min_value_) + BytesNeededInternal(max_value_);
-  }
-
-  virtual void EncodeToThrift(parquet::Statistics* out) const override {
-    DCHECK(has_values_);
-    string min_str;
-    EncodeValueToString(min_value_, &min_str);
-    out->__set_min(move(min_str));
-    string max_str;
-    EncodeValueToString(max_value_, &max_str);
-    out->__set_max(move(max_str));
-  }
+  void Update(const T& v);
+
+  virtual void Merge(const ColumnStatsBase& other) override;
+  virtual int64_t BytesNeeded() const override;
+  virtual void EncodeToThrift(parquet::Statistics* out) const override;
 
  protected:
   /// Encodes a single value using parquet's PLAIN encoding and stores it into the
   /// binary string 'out'.
-  void EncodeValueToString(const T& v, string* out) const {
-    int64_t bytes_needed = BytesNeededInternal(v);
-    out->resize(bytes_needed);
-    int64_t bytes_written = ParquetPlainEncoder::Encode(
-        reinterpret_cast<uint8_t*>(&(*out)[0]), bytes_needed, v);
-    DCHECK_EQ(bytes_needed, bytes_written);
-  }
+  void EncodeValueToString(const T& v, std::string* out) const;
+
+  /// Decodes a statistics values from 'buffer' into 'result'.
+  static bool DecodeValueFromThrift(const std::string& buffer, T* result);
 
   /// Returns the number of bytes needed to encode value 'v'.
-  int64_t BytesNeededInternal(const T& v) const {
-    return plain_encoded_value_size_ < 0 ? ParquetPlainEncoder::ByteSize<T>(v) :
-        plain_encoded_value_size_;
-  }
+  int64_t BytesNeededInternal(const T& v) const;
 
   // Size of each encoded value in plain encoding, -1 if the type is variable-length.
   int plain_encoded_value_size_;
@@ -164,39 +147,5 @@ class ColumnStats : public ColumnStatsBase {
   T max_value_;
 };
 
-/// Plain encoding for Boolean values is not handled by the ParquetPlainEncoder and thus
-/// needs special handling here.
-template <>
-void ColumnStats<bool>::EncodeValueToString(const bool& v, string* out) const {
-  char c = v;
-  out->assign(1, c);
-}
-
-template <>
-int64_t ColumnStats<bool>::BytesNeededInternal(const bool& v) const {
-  return 1;
-}
-
-/// parquet-mr and subsequently Hive currently do not handle the following types
-/// correctly (PARQUET-251, PARQUET-686), so we disable support for them.
-/// The relevant Impala Jiras are for
-/// - StringValue    IMPALA-4817
-/// - TimestampValue IMPALA-4819
-/// - DecimalValue   IMPALA-4815
-template <>
-void ColumnStats<StringValue>::Update(const StringValue& v) {}
-
-template <>
-void ColumnStats<TimestampValue>::Update(const TimestampValue& v) {}
-
-template <>
-void ColumnStats<Decimal4Value>::Update(const Decimal4Value& v) {}
-
-template <>
-void ColumnStats<Decimal8Value>::Update(const Decimal8Value& v) {}
-
-template <>
-void ColumnStats<Decimal16Value>::Update(const Decimal16Value& v) {}
-
 } // end ns impala
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/be/src/exec/parquet-column-stats.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.inline.h b/be/src/exec/parquet-column-stats.inline.h
new file mode 100644
index 0000000..3738980
--- /dev/null
+++ b/be/src/exec/parquet-column-stats.inline.h
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H
+#define IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H
+
+#include "parquet-column-stats.h"
+
+namespace impala {
+
+template <typename T>
+inline bool ColumnStats<T>::ReadFromThrift(const parquet::Statistics& thrift_stats,
+    const StatsField& stats_field, void* slot) {
+  T* out = reinterpret_cast<T*>(slot);
+  switch (stats_field) {
+    case StatsField::MIN:
+      return DecodeValueFromThrift(thrift_stats.min, out);
+    case StatsField::MAX:
+      return DecodeValueFromThrift(thrift_stats.max, out);
+    default:
+      DCHECK(false) << "Unsupported statistics field requested";
+      return false;
+  }
+}
+
+template <typename T>
+inline void ColumnStats<T>::Update(const T& v) {
+  if (!has_values_) {
+    has_values_ = true;
+    min_value_ = v;
+    max_value_ = v;
+  } else {
+    min_value_ = std::min(min_value_, v);
+    max_value_ = std::max(max_value_, v);
+  }
+}
+
+template <typename T>
+inline void ColumnStats<T>::Merge(const ColumnStatsBase& other) {
+  DCHECK(dynamic_cast<const ColumnStats<T>*>(&other));
+  const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other);
+  if (!cs->has_values_) return;
+  if (!has_values_) {
+    has_values_ = true;
+    min_value_ = cs->min_value_;
+    max_value_ = cs->max_value_;
+  } else {
+    min_value_ = std::min(min_value_, cs->min_value_);
+    max_value_ = std::max(max_value_, cs->max_value_);
+  }
+}
+
+template <typename T>
+inline int64_t ColumnStats<T>::BytesNeeded() const {
+  return BytesNeededInternal(min_value_) + BytesNeededInternal(max_value_);
+}
+
+template <typename T>
+inline void ColumnStats<T>::EncodeToThrift(parquet::Statistics* out) const {
+  DCHECK(has_values_);
+  std::string min_str;
+  EncodeValueToString(min_value_, &min_str);
+  out->__set_min(move(min_str));
+  std::string max_str;
+  EncodeValueToString(max_value_, &max_str);
+  out->__set_max(move(max_str));
+}
+
+template <typename T>
+inline void ColumnStats<T>::EncodeValueToString(const T& v, std::string* out) const {
+  int64_t bytes_needed = BytesNeededInternal(v);
+  out->resize(bytes_needed);
+  int64_t bytes_written = ParquetPlainEncoder::Encode(
+      reinterpret_cast<uint8_t*>(&(*out)[0]), bytes_needed, v);
+  DCHECK_EQ(bytes_needed, bytes_written);
+}
+
+template <typename T>
+inline bool ColumnStats<T>::DecodeValueFromThrift(const std::string& buffer, T* result) {
+  int size = buffer.size();
+  // The ParquetPlainEncoder interface expects mutable pointers.
+  uint8_t* data = const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(&buffer[0]));
+  if (ParquetPlainEncoder::Decode(data, data + size, -1, result) == -1) return false;
+  return true;
+}
+
+template <typename T>
+inline int64_t ColumnStats<T>::BytesNeededInternal(const T& v) const {
+  return plain_encoded_value_size_ < 0 ? ParquetPlainEncoder::ByteSize<T>(v) :
+      plain_encoded_value_size_;
+}
+
+/// Plain encoding for Boolean values is not handled by the ParquetPlainEncoder and thus
+/// needs special handling here.
+template <>
+inline void ColumnStats<bool>::EncodeValueToString(const bool& v, std::string* out) const
+{
+  char c = v;
+  out->assign(1, c);
+}
+
+template <>
+inline bool ColumnStats<bool>::DecodeValueFromThrift(const std::string& buffer,
+    bool* result) {
+  DCHECK(buffer.size() == 1);
+  *result = (buffer[0] != 0);
+  return true;
+}
+
+template <>
+inline int64_t ColumnStats<bool>::BytesNeededInternal(const bool& v) const {
+  return 1;
+}
+
+/// parquet-mr and subsequently Hive currently do not handle the following types
+/// correctly (PARQUET-251, PARQUET-686), so we disable support for them.
+/// The relevant Impala Jiras are for
+/// - StringValue    IMPALA-4817
+/// - TimestampValue IMPALA-4819
+/// - DecimalValue   IMPALA-4815
+template <>
+inline void ColumnStats<StringValue>::Update(const StringValue& v) {}
+
+template <>
+inline void ColumnStats<TimestampValue>::Update(const TimestampValue& v) {}
+
+template <>
+inline void ColumnStats<Decimal4Value>::Update(const Decimal4Value& v) {}
+
+template <>
+inline void ColumnStats<Decimal8Value>::Update(const Decimal8Value& v) {}
+
+template <>
+inline void ColumnStats<Decimal16Value>::Update(const Decimal16Value& v) {}
+
+
+} // end ns impala
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/be/src/exec/parquet-metadata-utils.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.cc b/be/src/exec/parquet-metadata-utils.cc
index da3844a..7fb2313 100644
--- a/be/src/exec/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet-metadata-utils.cc
@@ -27,6 +27,7 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "exec/parquet-common.h"
+#include "exec/parquet-column-stats.h"
 #include "runtime/runtime-state.h"
 #include "util/debug-util.h"
 
@@ -217,6 +218,13 @@ Status ParquetMetadataUtils::ValidateColumn(const parquet::FileMetaData& file_me
   return Status::OK();
 }
 
+bool ParquetMetadataUtils::HasRowGroupStats(const parquet::RowGroup& row_group,
+    int col_idx) {
+  DCHECK(col_idx < row_group.columns.size());
+  const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
+  return col_chunk.__isset.meta_data && col_chunk.meta_data.__isset.statistics;
+}
+
 ParquetFileVersion::ParquetFileVersion(const string& created_by) {
   string created_by_lower = created_by;
   std::transform(created_by_lower.begin(), created_by_lower.end(),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/be/src/exec/parquet-metadata-utils.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.h b/be/src/exec/parquet-metadata-utils.h
index 7a1e897..fe7552e 100644
--- a/be/src/exec/parquet-metadata-utils.h
+++ b/be/src/exec/parquet-metadata-utils.h
@@ -50,6 +50,9 @@ class ParquetMetadataUtils {
       const char* filename, int row_group_idx, int col_idx,
       const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
       RuntimeState* state);
+
+  /// Returns whether column 'col_idx' in 'row_group' has statistics attached to it.
+  static bool HasRowGroupStats(const parquet::RowGroup& row_group, int col_idx);
 };
 
 struct ParquetFileVersion {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/be/src/exprs/expr.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr.h b/be/src/exprs/expr.h
index 3a92c21..7b1b68a 100644
--- a/be/src/exprs/expr.h
+++ b/be/src/exprs/expr.h
@@ -146,6 +146,8 @@ class Expr {
   virtual TimestampVal GetTimestampVal(ExprContext* context, const TupleRow*);
   virtual DecimalVal GetDecimalVal(ExprContext* context, const TupleRow*);
 
+  const std::string& function_name() const { return fn_.name.function_name; }
+
   /// Get the number of digits after the decimal that should be displayed for this value.
   /// Returns -1 if no scale has been specified (currently the scale is only set for
   /// doubles set by RoundUpTo). GetValue() must have already been called.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 233fbb2..7de9d2b 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -204,6 +204,13 @@ struct THdfsScanNode {
   // If this is true then the MT_DOP query option must be > 0.
   // TODO: Remove this option when the MT scan node supports all file formats.
   6: optional bool use_mt_scan_node
+
+  // Conjuncts that can be evaluated against parquet::Statistics using the tuple
+  // referenced by 'min_max_tuple_id'.
+  7: optional list<Exprs.TExpr> min_max_conjuncts
+
+  // Tuple to evaluate 'min_max_conjuncts' against.
+  8: optional Types.TTupleId min_max_tuple_id
 }
 
 struct TDataSourceScanNode {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index 8cbe268..6b2281f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -40,6 +40,7 @@ import org.apache.impala.rewrite.ExprRewriteRule;
 import org.apache.impala.rewrite.ExprRewriter;
 import org.apache.impala.rewrite.ExtractCommonConjunctRule;
 import org.apache.impala.rewrite.FoldConstantsRule;
+import org.apache.impala.rewrite.NormalizeBinaryPredicatesRule;
 import org.apache.impala.rewrite.NormalizeExprsRule;
 import org.apache.impala.rewrite.SimplifyConditionalsRule;
 import org.apache.impala.thrift.TAccessEvent;
@@ -75,10 +76,14 @@ public class AnalysisContext {
     catalog_ = catalog;
     queryCtx_ = queryCtx;
     authzConfig_ = authzConfig;
+    List<ExprRewriteRule> rules = Lists.newArrayList();
     // BetweenPredicates must be rewritten to be executable. Other non-essential
     // expr rewrites can be disabled via a query option. When rewrites are enabled
     // BetweenPredicates should be rewritten first to help trigger other rules.
-    List<ExprRewriteRule> rules = Lists.newArrayList(BetweenToCompoundRule.INSTANCE);
+    rules.add(BetweenToCompoundRule.INSTANCE);
+    // Binary predicates must be rewritten to a canonical form for both Kudu predicate
+    // pushdown and Parquet row group pruning based on min/max statistics.
+    rules.add(NormalizeBinaryPredicatesRule.INSTANCE);
     if (queryCtx.getClient_request().getQuery_options().enable_expr_rewrites) {
       rules.add(FoldConstantsRule.INSTANCE);
       rules.add(NormalizeExprsRule.INSTANCE);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java b/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
index a8669e2..145dbce 100644
--- a/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
@@ -252,12 +252,9 @@ public class BinaryPredicate extends Predicate {
    * TODO: revisit CAST handling at the caller
    */
   public Expr getSlotBinding(SlotId id) {
-    // check left operand
+    // BinaryPredicates are normalized, so we only need to check the left operand.
     SlotRef slotRef = getChild(0).unwrapSlotRef(false);
     if (slotRef != null && slotRef.getSlotId() == id) return getChild(1);
-    // check right operand
-    slotRef = getChild(1).unwrapSlotRef(false);
-    if (slotRef != null && slotRef.getSlotId() == id) return getChild(0);
     return null;
   }
 
@@ -285,15 +282,11 @@ public class BinaryPredicate extends Predicate {
   }
 
   /**
-   * If predicate is of the form "<SlotRef> op <Expr>" or "<Expr> op <SlotRef>",
-   * returns the SlotRef, otherwise returns null.
+   * If predicate is of the form "<SlotRef> op <Expr>", returns the SlotRef, otherwise
+   * returns null.
    */
   @Override
-  public SlotRef getBoundSlot() {
-    SlotRef slotRef = getChild(0).unwrapSlotRef(true);
-    if (slotRef != null) return slotRef;
-    return getChild(1).unwrapSlotRef(true);
-  }
+  public SlotRef getBoundSlot() { return getChild(0).unwrapSlotRef(true); }
 
   /**
    * Negates a BinaryPredicate.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java
index 046240f..cc27528 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java
@@ -89,7 +89,7 @@ public class HdfsPartitionPruner {
 
   /**
    * Return a list of partitions left after applying the conjuncts. Please note
-   * that conjunts used for filtering will be removed from the list 'conjuncts'.
+   * that conjuncts used for filtering will be removed from the list 'conjuncts'.
    * If 'allowEmpty' is False, empty partitions are not returned.
    */
   public List<HdfsPartition> prunePartitions(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 0aaaa51..cb770cb 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -28,8 +28,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
 import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.BinaryPredicate;
+import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.SlotDescriptor;
+import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.TableRef;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
@@ -71,13 +74,18 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
- * Scan of a single single table. Currently limited to full-table scans.
+ * Scan of a single table. Currently limited to full-table scans.
  *
  * It's expected that the creator of this object has already done any necessary
  * partition pruning before creating this object. In other words, the 'conjuncts'
  * passed to the constructors are conjucts not fully evaluated by partition pruning
  * and 'partitions' are the remaining partitions after pruning.
  *
+ * For scans of tables with Parquet files the class creates an additional list of
+ * conjuncts that are passed to the backend and will be evaluated against the
+ * parquet::Statistics of row groups. If the conjuncts don't match, then whole row groups
+ * will be skipped.
+ *
  * TODO: pass in range restrictions.
  */
 public class HdfsScanNode extends ScanNode {
@@ -136,6 +144,21 @@ public class HdfsScanNode extends ScanNode {
 
   private static final Configuration CONF = new Configuration();
 
+
+  // List of conjuncts for min/max values of parquet::Statistics, that are used to skip
+  // data when scanning Parquet files.
+  private List<Expr> minMaxConjuncts_ = Lists.newArrayList();
+
+  // List of PlanNode conjuncts that have been transformed into conjuncts in
+  // 'minMaxConjuncts_'.
+  private List<Expr> minMaxOriginalConjuncts_ = Lists.newArrayList();
+
+  // Tuple that is used to materialize statistics when scanning Parquet files. For each
+  // column it can contain 0, 1, or 2 slots, depending on whether the column needs to be
+  // evaluated against the min and/or the max value of the corresponding
+  // parquet::Statistics.
+  private TupleDescriptor minMaxTuple_;
+
   /**
    * Construct a node to scan given data files into tuples described by 'desc',
    * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and
@@ -195,6 +218,10 @@ public class HdfsScanNode extends ScanNode {
       useMtScanNode_ = false;
     }
 
+    if (fileFormats.contains(HdfsFileFormat.PARQUET)) {
+      computeMinMaxTupleAndConjuncts(analyzer);
+    }
+
     // do this at the end so it can take all conjuncts and scan ranges into account
     computeStats(analyzer);
 
@@ -272,6 +299,73 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
+   * Builds a predicate to evaluate against parquet::Statistics by copying 'inputSlot'
+   * into 'minMaxTuple_', combining 'inputSlot', 'inputPred' and 'op' into a new
+   * predicate, and adding it to 'minMaxConjuncts_'.
+   */
+  private void buildStatsPredicate(Analyzer analyzer, SlotRef inputSlot,
+      BinaryPredicate inputPred, BinaryPredicate.Operator op) {
+    // Obtain the rhs expr of the input predicate
+    Expr constExpr = inputPred.getChild(1);
+    Preconditions.checkState(constExpr.isConstant());
+
+    // Make a new slot descriptor, which adds it to the tuple descriptor.
+    SlotDescriptor slotDesc = analyzer.getDescTbl().copySlotDescriptor(minMaxTuple_,
+        inputSlot.getDesc());
+    SlotRef slot = new SlotRef(slotDesc);
+    BinaryPredicate statsPred = new BinaryPredicate(op, slot, constExpr);
+    statsPred.analyzeNoThrow(analyzer);
+    minMaxConjuncts_.add(statsPred);
+  }
+
+  /**
+   * Analyzes 'conjuncts_', populates 'minMaxTuple_' with slots for statistics values, and
+   * populates 'minMaxConjuncts_' with conjuncts pointing into the 'minMaxTuple_'. Only
+   * conjuncts of the form <slot> <op> <constant> are supported, and <op> must be one of
+   * LT, LE, GE, GT, or EQ.
+   */
+  private void computeMinMaxTupleAndConjuncts(Analyzer analyzer) throws ImpalaException{
+    Preconditions.checkNotNull(desc_.getPath());
+    String tupleName = desc_.getPath().toString() + " statistics";
+    DescriptorTable descTbl = analyzer.getDescTbl();
+    minMaxTuple_ = descTbl.createTupleDescriptor(tupleName);
+    minMaxTuple_.setPath(desc_.getPath());
+
+    for (Expr pred: conjuncts_) {
+      if (!(pred instanceof BinaryPredicate)) continue;
+      BinaryPredicate binaryPred = (BinaryPredicate) pred;
+
+      // We only support slot refs on the left hand side of the predicate, a rewriting
+      // rule makes sure that all compatible exprs are rewritten into this form.
+      if (!(binaryPred.getChild(0) instanceof SlotRef)) continue;
+      SlotRef slot = (SlotRef) binaryPred.getChild(0);
+
+      // This node is a table scan, so this must be a scanning slot.
+      Preconditions.checkState(slot.getDesc().isScanSlot());
+
+      Expr constExpr = binaryPred.getChild(1);
+      // Only constant exprs can be evaluated against parquet::Statistics. This includes
+      // LiteralExpr, but can also be an expr like "1 + 2".
+      if (!constExpr.isConstant()) continue;
+      if (constExpr.isNullLiteral()) continue;
+
+      BinaryPredicate.Operator op = binaryPred.getOp();
+      if (op == BinaryPredicate.Operator.LT || op == BinaryPredicate.Operator.LE ||
+          op == BinaryPredicate.Operator.GE || op == BinaryPredicate.Operator.GT) {
+        minMaxOriginalConjuncts_.add(pred);
+        buildStatsPredicate(analyzer, slot, binaryPred, op);
+      } else if (op == BinaryPredicate.Operator.EQ) {
+        minMaxOriginalConjuncts_.add(pred);
+        // TODO: this could be optimized for boolean columns.
+        buildStatsPredicate(analyzer, slot, binaryPred, BinaryPredicate.Operator.LE);
+        buildStatsPredicate(analyzer, slot, binaryPred, BinaryPredicate.Operator.GE);
+      }
+
+    }
+    minMaxTuple_.computeMemLayout();
+  }
+
+  /**
    * Recursively collects and assigns conjuncts bound by tuples materialized in a
    * collection-typed slot.
    *
@@ -570,6 +664,12 @@ public class HdfsScanNode extends ScanNode {
       msg.hdfs_scan_node.setSkip_header_line_count(skipHeaderLineCount_);
     }
     msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_);
+    if (!minMaxConjuncts_.isEmpty()) {
+      for (Expr e: minMaxConjuncts_) {
+        msg.hdfs_scan_node.addToMin_max_conjuncts(e.treeToThrift());
+      }
+      msg.hdfs_scan_node.setMin_max_tuple_id(minMaxTuple_.getId().asInt());
+    }
   }
 
   @Override
@@ -631,6 +731,10 @@ public class HdfsScanNode extends ScanNode {
             numPartitionsNoDiskIds_, numPartitions, numFilesNoDiskIds_,
             totalFiles_, numScanRangesNoDiskIds_, scanRanges_.size()));
       }
+      if (!minMaxOriginalConjuncts_.isEmpty()) {
+        output.append(detailPrefix + "parquet statistics predicates: " +
+            getExplainString(minMaxOriginalConjuncts_) + "\n");
+      }
     }
     return output.toString();
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 02506e5..8e96bcd 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -88,8 +88,7 @@ public class KuduScanNode extends ScanNode {
   // From analyzer.getHostIndex().getIndex(address)
   private final Set<Integer> hostIndexSet_ = Sets.newHashSet();
 
-  // List of conjuncts that can be pushed down to Kudu, after they have been normalized
-  // by BinaryPredicate.normalizeSlotRefComparison(). Used for computing stats and
+  // List of conjuncts that can be pushed down to Kudu. Used for computing stats and
   // explain strings.
   private final List<Expr> kuduConjuncts_ = Lists.newArrayList();
 
@@ -327,12 +326,14 @@ public class KuduScanNode extends ScanNode {
     BinaryPredicate predicate = (BinaryPredicate) expr;
 
     // TODO KUDU-931 look into handling implicit/explicit casts on the SlotRef.
-    predicate = normalizeSlotRefComparison(predicate, analyzer);
-    if (predicate == null) return false;
+
     ComparisonOp op = getKuduOperator(predicate.getOp());
     if (op == null) return false;
 
+    if (!(predicate.getChild(0) instanceof SlotRef)) return false;
     SlotRef ref = (SlotRef) predicate.getChild(0);
+
+    if (!(predicate.getChild(1) instanceof LiteralExpr)) return false;
     LiteralExpr literal = (LiteralExpr) predicate.getChild(1);
 
     // Cannot push predicates with null literal values (KUDU-1595).
@@ -458,31 +459,4 @@ public class KuduScanNode extends ScanNode {
       default: return null;
     }
   }
-
-
-  /**
-   * Normalizes and returns a copy of 'predicate' consisting of an uncast SlotRef and a
-   * constant Expr into the following form: <SlotRef> <Op> <LiteralExpr>.
-   * Assumes that constant expressions have already been folded.
-   */
-  private static BinaryPredicate normalizeSlotRefComparison(BinaryPredicate predicate,
-      Analyzer analyzer) {
-    SlotRef ref = null;
-    if (predicate.getChild(0) instanceof SlotRef) {
-      ref = (SlotRef) predicate.getChild(0);
-    } else if (predicate.getChild(1) instanceof SlotRef) {
-      ref = (SlotRef) predicate.getChild(1);
-    }
-
-    if (ref == null) return null;
-    if (ref != predicate.getChild(0)) {
-      Preconditions.checkState(ref == predicate.getChild(1));
-      predicate = new BinaryPredicate(predicate.getOp().converse(), ref,
-          predicate.getChild(0));
-      predicate.analyzeNoThrow(analyzer);
-    }
-
-    if (!(predicate.getChild(1) instanceof LiteralExpr)) return null;
-    return predicate;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/fe/src/main/java/org/apache/impala/rewrite/NormalizeBinaryPredicatesRule.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/rewrite/NormalizeBinaryPredicatesRule.java b/fe/src/main/java/org/apache/impala/rewrite/NormalizeBinaryPredicatesRule.java
new file mode 100644
index 0000000..3a2ac57
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/rewrite/NormalizeBinaryPredicatesRule.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.rewrite;
+
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.BinaryPredicate;
+import org.apache.impala.analysis.BoolLiteral;
+import org.apache.impala.analysis.CompoundPredicate;
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.SlotRef;
+import org.apache.impala.common.AnalysisException;
+
+/**
+   * Normalizes binary predicates of the form <expr> <op> <slot> so that the slot is
+   * on the left hand side. Predicates where <slot> is wrapped in a cast (implicit or
+   * explicit) are normalized, too.
+ *
+ * Examples:
+ * 5 > id -> id < 5
+ */
+public class NormalizeBinaryPredicatesRule implements ExprRewriteRule {
+  public static ExprRewriteRule INSTANCE = new NormalizeBinaryPredicatesRule();
+
+  @Override
+  public Expr apply(Expr expr, Analyzer analyzer) throws AnalysisException {
+    if (!(expr instanceof BinaryPredicate)) return expr;
+    if (expr.getChild(0).unwrapSlotRef(false) != null) return expr;
+    if (expr.getChild(1).unwrapSlotRef(false) == null) return expr;
+
+    BinaryPredicate.Operator op = ((BinaryPredicate) expr).getOp();
+
+    return new BinaryPredicate(op.converse(), expr.getChild(1), expr.getChild(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
index 3abdf67..41dbeb8 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
@@ -28,6 +28,7 @@ import org.apache.impala.rewrite.ExprRewriteRule;
 import org.apache.impala.rewrite.ExprRewriter;
 import org.apache.impala.rewrite.ExtractCommonConjunctRule;
 import org.apache.impala.rewrite.FoldConstantsRule;
+import org.apache.impala.rewrite.NormalizeBinaryPredicatesRule;
 import org.apache.impala.rewrite.NormalizeExprsRule;
 import org.junit.Assert;
 import org.junit.Test;
@@ -337,4 +338,21 @@ public class ExprRewriteRulesTest extends FrontendTestBase {
     RewritesOk("false or id = 1", rule, null);
     RewritesOk("false or true", rule, null);
   }
+
+  @Test
+  public void TestNormalizeBinaryPredicatesRule() throws AnalysisException {
+    ExprRewriteRule rule = NormalizeBinaryPredicatesRule.INSTANCE;
+
+    RewritesOk("0 = id", rule, "id = 0");
+    RewritesOk("cast(0 as double) = id", rule, "id = CAST(0 AS DOUBLE)");
+    RewritesOk("1 + 1 = cast(id as int)", rule, "CAST(id AS INT) = 1 + 1");
+
+    // Verify that these don't get rewritten.
+    RewritesOk("id = 5", rule, null);
+    RewritesOk("5 = id + 2", rule, null);
+    RewritesOk("cast(id as int) = int_col", rule, null);
+    RewritesOk("int_col = cast(id as int)", rule, null);
+    RewritesOk("int_col = tinyint_col", rule, null);
+    RewritesOk("tinyint_col = int_col", rule, null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index a88a3cd..42e50be 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -51,7 +51,9 @@ public class PlannerTest extends PlannerTestBase {
     // Tests that constant folding is applied to all relevant PlanNodes and DataSinks.
     // Note that not all Exprs are printed in the explain plan, so validating those
     // via this test is currently not possible.
-    runPlannerTestFile("constant-folding");
+    TQueryOptions options = defaultQueryOptions();
+    options.setExplain_level(TExplainLevel.EXTENDED);
+    runPlannerTestFile("constant-folding", options);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
index dab490e..2fec7c4 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
@@ -786,7 +786,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
-   predicates: id % 100 = day
+   predicates: day = id % 100
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -806,7 +806,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
-   predicates: id % 100 = day
+   predicates: day = id % 100
 ====
 # test group_concat with distinct together with another distinct aggregate function
 select count(distinct cast(timestamp_col as string)),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index d19d86e..d14c462 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -7,26 +7,51 @@ where 5 + 5 < c_custkey and o_orderkey = (2 + 2)
 PLAN-ROOT SINK
 |
 01:SUBPLAN
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=2,1,0 row-size=52B cardinality=1500000
 |
 |--08:NESTED LOOP JOIN [CROSS JOIN]
+|  |  hosts=3 per-host-mem=unavailable
+|  |  tuple-ids=2,1,0 row-size=52B cardinality=100
 |  |
 |  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     hosts=3 per-host-mem=unavailable
+|  |     tuple-ids=0 row-size=24B cardinality=1
 |  |
 |  04:SUBPLAN
+|  |  hosts=3 per-host-mem=unavailable
+|  |  tuple-ids=2,1 row-size=28B cardinality=100
 |  |
 |  |--07:NESTED LOOP JOIN [CROSS JOIN]
+|  |  |  hosts=3 per-host-mem=unavailable
+|  |  |  tuple-ids=2,1 row-size=28B cardinality=10
 |  |  |
 |  |  |--05:SINGULAR ROW SRC
+|  |  |     parent-subplan=04
+|  |  |     hosts=3 per-host-mem=unavailable
+|  |  |     tuple-ids=1 row-size=24B cardinality=1
 |  |  |
 |  |  06:UNNEST [o.o_lineitems]
+|  |     parent-subplan=04
+|  |     hosts=3 per-host-mem=unavailable
+|  |     tuple-ids=2 row-size=0B cardinality=10
 |  |
 |  03:UNNEST [c.c_orders o]
+|     parent-subplan=01
+|     hosts=3 per-host-mem=unavailable
+|     tuple-ids=1 row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   partitions=1/1 files=4 size=577.87MB
-   predicates: 10 < c_custkey, !empty(c.c_orders)
+   partitions=1/1 files=4 size=292.36MB
+   predicates: c_custkey > 10, !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o_orderkey = 4
    predicates on o_lineitems: 20 + l_linenumber < 0
+   table stats: 150000 rows total
+   columns missing stats: c_orders
+   parquet statistics predicates: c_custkey > 10
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=24B cardinality=15000
 ====
 # Test HBase scan node.
 select * from functional_hbase.stringids
@@ -39,7 +64,11 @@ PLAN-ROOT SINK
    start key: 10
    stop key: 20\0
    hbase filters: d:string_col EQUAL '4'
-   predicates: 5 = tinyint_col, string_col = '4'
+   predicates: tinyint_col = 5, string_col = '4'
+   table stats: 10000 rows total
+   column stats: all
+   hosts=100 per-host-mem=unavailable
+   tuple-ids=0 row-size=119B cardinality=1
 ====
 # Test datasource scan node.
 select * from functional.alltypes_datasource
@@ -48,8 +77,10 @@ where tinyint_col < (pow(2, 8)) and float_col != 0 and 1 + 1 > int_col
 PLAN-ROOT SINK
 |
 00:SCAN DATA SOURCE [functional.alltypes_datasource]
-data source predicates: tinyint_col < 256, 2 > int_col
+data source predicates: tinyint_col < 256, int_col < 2
 predicates: float_col != 0
+   hosts=1 per-host-mem=unavailable
+   tuple-ids=0 row-size=116B cardinality=500
 ====
 # Test aggregation.
 select sum(1 + 1 + id) sm
@@ -65,9 +96,15 @@ PLAN-ROOT SINK
 |  output: sum(2 + id), count(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00'
 |  having: sum(2 + id) <= 10, sum(2 + id) > 1, sum(2 + id) >= 5, 1048576 * count(*) % 2 = 0
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=1 row-size=17B cardinality=0
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=20B cardinality=7300
 ====
 # Test hash join.
 select 1 from functional.alltypes a
@@ -81,14 +118,24 @@ PLAN-ROOT SINK
 02:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: 2 + a.id = b.id - 2
 |  other join predicates: a.int_col <= b.bigint_col + 97, a.int_col >= 0 + b.bigint_col
-|  other predicates: 11.1 < CAST(b.double_col AS DECIMAL(3,2))
+|  other predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=0,1N row-size=28B cardinality=7300
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
-|     predicates: 11.1 < CAST(b.double_col AS DECIMAL(3,2))
+|     predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
+|     table stats: 7300 rows total
+|     column stats: all
+|     hosts=3 per-host-mem=unavailable
+|     tuple-ids=1 row-size=20B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=8B cardinality=7300
 ====
 # Test nested-loop join. Same as above but and with a disjunction in the On clause.
 # The Where-clause predicate has the lhs/rhs flipped.
@@ -103,13 +150,23 @@ PLAN-ROOT SINK
 02:NESTED LOOP JOIN [LEFT OUTER JOIN]
 |  join predicates: (2 + a.id = b.id - 2 OR a.int_col >= 0 + b.bigint_col AND a.int_col <= b.bigint_col + 97)
 |  predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=0,1N row-size=28B cardinality=7300
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
+|     table stats: 7300 rows total
+|     column stats: all
+|     hosts=3 per-host-mem=unavailable
+|     tuple-ids=1 row-size=20B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=8B cardinality=7300
 ====
 # Test distinct aggregation with grouping.
 select sum(distinct 1 + 1 + id)
@@ -123,13 +180,21 @@ PLAN-ROOT SINK
 |  output: sum(2 + id), count:merge(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00'
 |  having: 1048576 * count(*) % 2 = 0
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=2 row-size=17B cardinality=0
 |
 01:AGGREGATE
 |  output: count(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00', 2 + id
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=1 row-size=17B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=20B cardinality=7300
 ====
 # Test non-grouping distinct aggregation.
 select sum(distinct 1 + 1 + id)
@@ -141,13 +206,21 @@ PLAN-ROOT SINK
 02:AGGREGATE [FINALIZE]
 |  output: sum(2 + id), count:merge(*)
 |  having: 1048576 * zeroifnull(count(*)) % 2 = 0
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=2 row-size=16B cardinality=0
 |
 01:AGGREGATE
 |  output: count(*)
 |  group by: 2 + id
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=1 row-size=16B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=4B cardinality=7300
 ====
 # Test analytic eval node.
 select first_value(1 + 1 + int_col - (1 - 1)) over
@@ -162,12 +235,20 @@ PLAN-ROOT SINK
 |  partition by: concat('ab', string_col)
 |  order by: greatest(20, bigint_col) ASC
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=3,2 row-size=37B cardinality=7300
 |
 01:SORT
 |  order by: concat('ab', string_col) ASC NULLS FIRST, greatest(20, bigint_col) ASC
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=3 row-size=29B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=29B cardinality=7300
 ====
 # Test sort node.
 select int_col from functional.alltypes
@@ -177,9 +258,15 @@ PLAN-ROOT SINK
 |
 01:SORT
 |  order by: id * 7.5 ASC
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=1 row-size=8B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=8B cardinality=7300
 ====
 # Test HDFS table sink.
 insert into functional.alltypes (id, int_col) partition(year,month)
@@ -188,9 +275,14 @@ from functional.alltypessmall
 ---- PLAN
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(CAST(3 + year AS INT),CAST(month - -1 AS INT))]
 |  partitions=4
+|  hosts=1 per-host-mem=unavailable
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
+   table stats: 100 rows total
+   column stats: all
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=16B cardinality=100
 ====
 # Constant folding does not work across query blocks.
 select sum(id + c3) from
@@ -204,8 +296,14 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: sum(id + 10 + 20 + 30)
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=4 row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
    limit: 2
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=4B cardinality=2
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
index fd7dc1e..b814446 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
@@ -27,8 +27,8 @@ where 10 > int_col and
 PLAN-ROOT SINK
 |
 00:SCAN DATA SOURCE [functional.alltypes_datasource]
-data source predicates: 10 > int_col, string_col != 'Foo'
-predicates: 5 > double_col, NOT 5.0 = double_col, NOT TRUE = bool_col, string_col != 'Bar'
+data source predicates: int_col < 10, string_col != 'Foo'
+predicates: double_col < 5, NOT bool_col = TRUE, NOT double_col = 5.0, string_col != 'Bar'
 ====
 # The 3rd predicate is not in a form that can be offered to the data source so
 # the 4th will be offered and accepted instead.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/testdata/workloads/functional-planner/queries/PlannerTest/hdfs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/hdfs.test b/testdata/workloads/functional-planner/queries/PlannerTest/hdfs.test
index 83f1b46..03b2cdc 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/hdfs.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/hdfs.test
@@ -1028,5 +1028,5 @@ not implemented: Unsupported non-deterministic predicate: rand() > 100
 # IMPALA-4592: Same as above but the predicate references a partition column.
 select * from functional.alltypes where rand() > year
 ---- PLAN
-not implemented: Unsupported non-deterministic predicate: rand() > year
+not implemented: Unsupported non-deterministic predicate: year < rand()
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
index 229c1e6..e5aa5e8 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
@@ -8,7 +8,7 @@ from (select * from functional.alltypestiny) t1
 PLAN-ROOT SINK
 |
 04:NESTED LOOP JOIN [INNER JOIN]
-|  predicates: (coalesce(functional.alltypestiny.id, t3.id) = t3.id)
+|  predicates: t3.id = coalesce(functional.alltypestiny.id, t3.id)
 |
 |--02:SCAN HDFS [functional.alltypestiny t3]
 |     partitions=4/4 files=4 size=460B

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 9f2270f..1549ec7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -102,7 +102,7 @@ and zip > 1 and zip < 50
 PLAN-ROOT SINK
 |
 00:SCAN KUDU [functional_kudu.testtbl]
-   kudu predicates: id <= 20, zip <= 30, id >= 10, zip < 50, zip <= 5, zip > 1, zip >= 0, name = 'foo'
+   kudu predicates: id <= 20, id >= 10, zip < 50, zip <= 30, zip <= 5, zip > 1, zip >= 0, name = 'foo'
 ---- SCANRANGELOCATIONS
 NODE 0:
   ScanToken{table=impala::functional_kudu.testtbl, range-partition: [<start>, (int64 id=1004))}
@@ -112,7 +112,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |
 00:SCAN KUDU [functional_kudu.testtbl]
-   kudu predicates: id <= 20, zip <= 30, id >= 10, zip < 50, zip <= 5, zip > 1, zip >= 0, name = 'foo'
+   kudu predicates: id <= 20, id >= 10, zip < 50, zip <= 30, zip <= 5, zip > 1, zip >= 0, name = 'foo'
 ====
 # Test constant folding.
 select * from functional_kudu.testtbl
@@ -122,7 +122,7 @@ PLAN-ROOT SINK
 |
 00:SCAN KUDU [functional_kudu.testtbl]
    predicates: CAST(sin(id) AS BOOLEAN) = TRUE
-   kudu predicates: id < 103, id <= 60, id < 40
+   kudu predicates: id < 103, id < 40, id <= 60
 ---- SCANRANGELOCATIONS
 NODE 0:
   ScanToken{table=impala::functional_kudu.testtbl, range-partition: [<start>, (int64 id=1004))}
@@ -133,7 +133,7 @@ PLAN-ROOT SINK
 |
 00:SCAN KUDU [functional_kudu.testtbl]
    predicates: CAST(sin(id) AS BOOLEAN) = TRUE
-   kudu predicates: id < 103, id <= 60, id < 40
+   kudu predicates: id < 103, id < 40, id <= 60
 ====
 # Some predicates can be pushed down but others can't (predicate on an non-const value).
 select * from functional_kudu.testtbl
@@ -216,7 +216,7 @@ PLAN-ROOT SINK
 |  output: count(*)
 |
 00:SCAN KUDU [functional_kudu.alltypes]
-   kudu predicates: id > 1475059865, id < 1475059775
+   kudu predicates: id < 1475059775, id > 1475059865
 ====
 # IMPALA-2521: clustered insert into table adds sort node.
 insert into table functional_kudu.alltypes /*+ clustered */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index 6ce43db..5547d3a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -57,6 +57,7 @@ PLAN-ROOT SINK
    predicates: id < 10
    table stats: unavailable
    column stats: unavailable
+   parquet statistics predicates: id < 10
    hosts=3 per-host-mem=unavailable
    tuple-ids=0 row-size=16B cardinality=unavailable
 ---- PARALLELPLANS
@@ -94,6 +95,7 @@ PLAN-ROOT SINK
    predicates: id < 10
    table stats: unavailable
    column stats: unavailable
+   parquet statistics predicates: id < 10
    hosts=3 per-host-mem=16.00MB
    tuple-ids=0 row-size=16B cardinality=unavailable
 ====
@@ -110,18 +112,19 @@ PLAN-ROOT SINK
 |  order by: id ASC
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  hosts=3 per-host-mem=unavailable
-|  tuple-ids=3,2 row-size=16B cardinality=unavailable
+|  tuple-ids=4,3 row-size=16B cardinality=unavailable
 |
 01:SORT
 |  order by: int_col ASC NULLS FIRST, id ASC
 |  hosts=3 per-host-mem=unavailable
-|  tuple-ids=3 row-size=8B cardinality=unavailable
+|  tuple-ids=4 row-size=8B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypes]
    partitions=24/24 files=24 size=156.57KB
    predicates: id < 10
    table stats: unavailable
    column stats: unavailable
+   parquet statistics predicates: id < 10
    hosts=3 per-host-mem=unavailable
    tuple-ids=0 row-size=8B cardinality=unavailable
 ---- PARALLELPLANS
@@ -129,7 +132,7 @@ PLAN-ROOT SINK
 |
 04:EXCHANGE [UNPARTITIONED]
 |  hosts=3 per-host-mem=unavailable
-|  tuple-ids=3,2 row-size=16B cardinality=unavailable
+|  tuple-ids=4,3 row-size=16B cardinality=unavailable
 |
 02:ANALYTIC
 |  functions: row_number()
@@ -137,12 +140,12 @@ PLAN-ROOT SINK
 |  order by: id ASC
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  hosts=3 per-host-mem=0B
-|  tuple-ids=3,2 row-size=16B cardinality=unavailable
+|  tuple-ids=4,3 row-size=16B cardinality=unavailable
 |
 01:SORT
 |  order by: int_col ASC NULLS FIRST, id ASC
 |  hosts=3 per-host-mem=0B
-|  tuple-ids=3 row-size=8B cardinality=unavailable
+|  tuple-ids=4 row-size=8B cardinality=unavailable
 |
 03:EXCHANGE [HASH(int_col)]
 |  hosts=3 per-host-mem=0B
@@ -153,6 +156,7 @@ PLAN-ROOT SINK
    predicates: id < 10
    table stats: unavailable
    column stats: unavailable
+   parquet statistics predicates: id < 10
    hosts=3 per-host-mem=16.00MB
    tuple-ids=0 row-size=8B cardinality=unavailable
 ====
@@ -206,6 +210,7 @@ PLAN-ROOT SINK
    predicates on o_lineitems: l_linenumber < 3
    table stats: 150000 rows total
    columns missing stats: c_orders
+   parquet statistics predicates: c_custkey < 10
    hosts=3 per-host-mem=unavailable
    tuple-ids=0 row-size=254B cardinality=15000
 ---- PARALLELPLANS
@@ -258,6 +263,7 @@ PLAN-ROOT SINK
    predicates on o_lineitems: l_linenumber < 3
    table stats: 150000 rows total
    columns missing stats: c_orders
+   parquet statistics predicates: c_custkey < 10
    hosts=3 per-host-mem=88.00MB
    tuple-ids=0 row-size=254B cardinality=15000
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
index be239fe..6042557 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
@@ -159,12 +159,12 @@ PLAN-ROOT SINK
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id
-|  other predicates: t1.year + t2.smallint_col = t2.tinyint_col, t1.year = t1.month + t2.int_col, t1.year + t2.int_col = t1.month + t2.tinyint_col
+|  other predicates: t1.year = t1.month + t2.int_col, t2.tinyint_col = t1.year + t2.smallint_col, t1.year + t2.int_col = t1.month + t2.tinyint_col
 |  runtime filters: RF000 <- t2.id
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
-|     predicates: 1 = t2.bigint_col
+|     predicates: t2.bigint_col = 1
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
@@ -1415,7 +1415,7 @@ PLAN-ROOT SINK
 |
 04:NESTED LOOP JOIN [LEFT OUTER JOIN]
 |  join predicates: b.id IS DISTINCT FROM c.id
-|  predicates: b.int_col + b.bigint_col = c.int_col
+|  predicates: c.int_col = b.int_col + b.bigint_col
 |
 |--02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
index cf723d2..90b1713 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
@@ -1473,7 +1473,7 @@ PLAN-ROOT SINK
 |
 03:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: tt1.month = t1.id
-|  other predicates: zeroifnull(count(tt1.smallint_col)) < t1.id
+|  other predicates: t1.id > zeroifnull(count(tt1.smallint_col))
 |  runtime filters: RF000 <- t1.id
 |
 |--00:SCAN HDFS [functional.alltypestiny t1]
@@ -1535,7 +1535,7 @@ PLAN-ROOT SINK
 |
 03:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: s.id = t.id
-|  other predicates: ifnull(sample(int_col), '') = t.string_col
+|  other predicates: t.string_col = ifnull(sample(int_col), '')
 |  runtime filters: RF000 <- t.id
 |
 |--00:SCAN HDFS [functional.alltypestiny t]
@@ -2028,11 +2028,11 @@ PLAN-ROOT SINK
 |
 |--01:SCAN HDFS [functional.alltypesagg a]
 |     partitions=11/11 files=11 size=814.73KB
-|     predicates: 20 <= a.int_col, a.smallint_col >= 10
+|     predicates: a.int_col >= 20, a.smallint_col >= 10
 |
 00:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
-   predicates: 20 >= t.bigint_col, t.string_col <= t.date_string_col
+   predicates: t.bigint_col <= 20, t.string_col <= t.date_string_col
    runtime filters: RF000 -> id
 ====
 # IMPALA-4423: Correlated EXISTS and NOT EXISTS subqueries with aggregates. Both

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test b/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test
new file mode 100644
index 0000000..327fba6
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test
@@ -0,0 +1,231 @@
+====
+---- QUERY
+select id, bool_col from functional_parquet.alltypessmall where int_col < 0
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+set explain_level=2;
+explain select id, bool_col from functional_parquet.alltypessmall where int_col < 0;
+---- RESULTS: VERIFY_IS_SUBSET
+'   parquet statistics predicates: int_col < 0'
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where smallint_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where int_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where bigint_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where float_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where double_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+# Test with inverted predicate
+select id, bool_col from functional_parquet.alltypessmall where -1 > int_col
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col > 9
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where smallint_col > 9
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select id, bool_col from functional_parquet.alltypessmall where int_col > 9
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where bigint_col > 90
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where float_col > 9.9
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where double_col > 99
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col >= 10
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col <= 0
+---- RESULTS
+12
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 0 .*
+====
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col >= 9
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 0 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col = -1
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where tinyint_col = 10
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+set explain_level=2;
+explain select count(*) from functional_parquet.alltypessmall where tinyint_col = 10
+---- RESULTS: VERIFY_IS_SUBSET
+'   parquet statistics predicates: tinyint_col = 10'
+====
+---- QUERY
+select count(*) from functional_parquet.alltypessmall where id >= 30 and id <= 80
+---- RESULTS
+51
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 1 .*
+====
+---- QUERY
+# Mix with partitioning columns
+select count(*) from functional_parquet.alltypessmall where int_col < 0 and year < 2012
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+# Test that adding a column without stats will not disable stats-based pruning.
+select count(*) from functional_parquet.alltypessmall where int_col < 0 and string_col < ""
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select id, bool_col from functional_parquet.alltypessmall where int_col < 3 - 3
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select id, bool_col from functional_parquet.alltypessmall where int_col < 3 - 3
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+# Test that without expr rewrite and thus without constant folding, constant exprs still
+# can be used to prune row groups.
+set enable_expr_rewrites=0;
+select id, bool_col from functional_parquet.alltypessmall where int_col < 3 - 3
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+select id, bool_col from functional_parquet.alltypessmall where 5 + 5 < int_col
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+# Test that without expr rewrite and thus without constant folding, constant exprs still
+# can be used to prune row groups.
+set enable_expr_rewrites=0;
+select id, bool_col from functional_parquet.alltypessmall where 5 + 5 < int_col
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/749a55c4/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index 73f1aae..be2704d 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -217,6 +217,7 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
 @SkipIfIsilon.hive
 @SkipIfLocal.hive
 @SkipIfS3.hive
+# TODO: Should we move this to test_parquet_stats.py?
 class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
 
   @classmethod