You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/09/17 05:56:49 UTC

[impala] 02/02: IMPALA-6505: Min-Max predicate push down in ORC scanner

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 35b21083b1866b7056e3810ae5a8daf7bc77ddda
Author: norbert.luksa <no...@cloudera.com>
AuthorDate: Tue Mar 10 12:13:00 2020 +0100

    IMPALA-6505: Min-Max predicate push down in ORC scanner
    
    In planning phase, the planner collects and generates min-max predicates
    that can be evaluated on parquet file statistics. We can easily extend
    this on ORC tables.
    
    This commit implements min/max predicate pushdown for the ORC scanner
    leveraging on the external ORC library's search arguments. We build
    the search arguments when we open the scanner as we need not to
    modify them later.
    
    Also added a new query option orc_read_statistics, similar to
    parquet_read_statistics. If the option is set to true (it is by default)
    predicate pushdown will take effect, otherwise it will be skipped. The
    predicates will be evaluated at ORC row group level, i.e. by default for
    every 10,000 rows.
    
    Limitations:
     - Min-max predicates on CHAR/VARCHAR types are not pushed down due to
       inconsistent behaviors on padding/truncating between Hive and Impala.
       (IMPALA-10882)
     - Min-max predicates on TIMESTAMP are not pushed down (IMPALA-10915).
     - Min-max predicates having different arg types are not pushed down
       (IMPALA-10916).
     - Min-max predicates with non-literal const exprs are not pushed down
       since SearchArgument interfaces only accept literals. This only
       happens when expr rewrites are disabled thus constant folding is
       disabled.
    
    Tests:
     - Add e2e tests similar to test_parquet_stats to verify that
       predicates are pushed down.
     - Run CORE tests
     - Run TPCH benchmark, there is no improvement, nor regression.
       On the other hand, certain selective queries gained significant
       speed-up, e.g. select count(*) from lineitem where l_orderkey = 1.
    
    Change-Id: I136622413db21e0941d238ab6aeea901a6464845
    Reviewed-on: http://gerrit.cloudera.org:8080/15403
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Reviewed-by: Qifan Chen <qc...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-orc-scanner.cc                    | 245 +++++++++-
 be/src/exec/hdfs-orc-scanner.h                     |  26 +
 be/src/exprs/scalar-expr.h                         |   3 +
 be/src/service/query-options.cc                    |   4 +
 be/src/service/query-options.h                     |   4 +-
 bin/impala-config.sh                               |   4 +-
 common/thrift/ImpalaService.thrift                 |   3 +
 common/thrift/Query.thrift                         |   3 +
 .../org/apache/impala/planner/HdfsScanNode.java    |  48 +-
 .../queries/PlannerTest/acid-scans.test            |  12 +-
 .../queries/QueryTest/orc-stats.test               | 542 +++++++++++++++++++++
 tests/query_test/test_nested_types.py              |   3 +-
 tests/query_test/test_orc_stats.py                 |  40 ++
 13 files changed, 915 insertions(+), 22 deletions(-)

diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index d26839c..ed86477 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -24,6 +24,7 @@
 #include "exec/scanner-context.inline.h"
 #include "exec/scratch-tuple-batch.h"
 #include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
 #include "runtime/collection-value-builder.h"
 #include "runtime/exec-env.h"
 #include "runtime/io/request-context.h"
@@ -138,6 +139,7 @@ HdfsOrcScanner::HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
   : HdfsColumnarScanner(scan_node, state),
     dictionary_pool_(new MemPool(scan_node->mem_tracker())),
     data_batch_pool_(new MemPool(scan_node->mem_tracker())),
+    search_args_pool_(new MemPool(scan_node->mem_tracker())),
     assemble_rows_timer_(scan_node_->materialize_tuple_timer()) {
   assemble_rows_timer_.Stop();
 }
@@ -240,6 +242,8 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
   // blob more efficiently.
   row_reader_options_.setEnableLazyDecoding(true);
 
+  RETURN_IF_ERROR(PrepareSearchArguments());
+
   // To create OrcColumnReaders, we need the selected orc schema. It's a subset of the
   // file schema: a tree of selected orc types and can only be got from an orc::RowReader
   // (by orc::RowReader::getSelectedType).
@@ -290,6 +294,9 @@ void HdfsOrcScanner::Close(RowBatch* row_batch) {
     scratch_batch_->ReleaseResources(nullptr);
   }
   orc_root_batch_.reset(nullptr);
+  search_args_pool_->FreeAll();
+
+  ScalarExprEvaluator::Close(min_max_conjunct_evals_, state_);
 
   // Verify all resources (if any) have been transferred.
   DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
@@ -739,8 +746,7 @@ Status HdfsOrcScanner::NextStripe() {
       continue;
     }
 
-    // TODO: check if this stripe can be skipped by stats. e.g. IMPALA-6505 In that case,
-    // set the file row index in 'orc_root_reader_' accordingly.
+    // Set the file row index in 'orc_root_reader_' accordingly.
     if (first_invocation && acid_synthetic_rowid_ != nullptr) {
       orc_root_reader_->SetFileRowIndex(skipped_rows);
     }
@@ -906,4 +912,239 @@ Status HdfsOrcScanner::AssembleCollection(
   coll_items_read_counter_ += tuple_idx;
   return Status::OK();
 }
+
+/// T is the intended ORC primitive type and U is Impala internal primitive type.
+template<typename T, typename U>
+orc::Literal HdfsOrcScanner::GetOrcPrimitiveLiteral(
+    orc::PredicateDataType predicate_type, void* val) {
+  if (UNLIKELY(!val)) return orc::Literal(predicate_type);
+  T* val_dst = reinterpret_cast<T*>(val);
+  return orc::Literal(static_cast<U>(*val_dst));
+}
+
+orc::PredicateDataType HdfsOrcScanner::GetOrcPredicateDataType(const ColumnType& type) {
+  switch (type.type) {
+    case TYPE_BOOLEAN:
+      return orc::PredicateDataType::BOOLEAN;
+    case TYPE_TINYINT:
+    case TYPE_SMALLINT:
+    case TYPE_INT:
+    case TYPE_BIGINT:
+      return orc::PredicateDataType::LONG;
+    case TYPE_FLOAT:
+    case TYPE_DOUBLE:
+      return orc::PredicateDataType::FLOAT;
+    case TYPE_TIMESTAMP:
+      return orc::PredicateDataType::TIMESTAMP;
+    case TYPE_DATE:
+      return orc::PredicateDataType::DATE;
+    case TYPE_STRING:
+    case TYPE_VARCHAR:
+    case TYPE_CHAR:
+    case TYPE_FIXED_UDA_INTERMEDIATE:
+      return orc::PredicateDataType::STRING;
+    case TYPE_DECIMAL:
+      return orc::PredicateDataType::DECIMAL;
+    default:
+      DCHECK(false) << "Unsupported type: " << type.DebugString();
+      return orc::PredicateDataType::LONG;
+  }
+}
+
+orc::Literal HdfsOrcScanner::GetSearchArgumentLiteral(ScalarExprEvaluator* eval,
+    const ColumnType& dst_type, orc::PredicateDataType* predicate_type) {
+  DCHECK(eval->root().GetNumChildren() == 2);
+  ScalarExpr* literal_expr = eval->root().GetChild(1);
+  const ColumnType& type = literal_expr->type();
+  DCHECK(literal_expr->IsLiteral());
+  *predicate_type = GetOrcPredicateDataType(type);
+  // Since we want to get a literal value, the second parameter below is not used.
+  void* val = eval->GetValue(*literal_expr, nullptr);
+
+  switch (type.type) {
+    case TYPE_BOOLEAN:
+      return GetOrcPrimitiveLiteral<bool, bool>(*predicate_type, val);
+    case TYPE_TINYINT:
+      return GetOrcPrimitiveLiteral<int8_t, int64_t>(*predicate_type, val);
+    case TYPE_SMALLINT:
+      return GetOrcPrimitiveLiteral<int16_t, int64_t>(*predicate_type, val);
+    case TYPE_INT:
+      return GetOrcPrimitiveLiteral<int32_t, int64_t>(*predicate_type, val);
+    case TYPE_BIGINT:
+      return GetOrcPrimitiveLiteral<int64_t, int64_t>(*predicate_type, val);
+    case TYPE_FLOAT:
+      return GetOrcPrimitiveLiteral<float, double>(*predicate_type, val);
+    case TYPE_DOUBLE:
+      return GetOrcPrimitiveLiteral<double, double>(*predicate_type, val);
+    // Predicates on Timestamp are currently skipped in FE. We will focus on them in
+    // IMPALA-10915.
+    case TYPE_TIMESTAMP: {
+      DCHECK(false) << "Timestamp predicate is not supported: IMPALA-10915";
+      return orc::Literal(predicate_type);
+    }
+    case TYPE_DATE: {
+      if (UNLIKELY(!val)) return orc::Literal(predicate_type);
+      const DateValue* dv = reinterpret_cast<const DateValue*>(val);
+      int32_t value = 0;
+      // The date should be valid at this point.
+      DCHECK(dv->ToDaysSinceEpoch(&value));
+      return orc::Literal(*predicate_type, value);
+    }
+    case TYPE_STRING: {
+      if (UNLIKELY(!val)) return orc::Literal(predicate_type);
+      const StringValue* sv = reinterpret_cast<StringValue*>(val);
+      return orc::Literal(sv->ptr, sv->len);
+    }
+    // Predicates on CHAR/VARCHAR are currently skipped in FE. We will focus on them in
+    // IMPALA-10882.
+    case TYPE_VARCHAR: {
+      DCHECK(false) << "Varchar predicate is not supported: IMPALA-10882";
+      return orc::Literal(predicate_type);
+    }
+    case TYPE_CHAR: {
+      DCHECK(false) << "Char predicate is not supported: IMPALA-10882";
+      if (UNLIKELY(!val)) return orc::Literal(predicate_type);
+      const StringValue* sv = reinterpret_cast<StringValue*>(val);
+      char* dst_ptr;
+      if (dst_type.len > sv->len) {
+        dst_ptr = reinterpret_cast<char*>(search_args_pool_->TryAllocate(dst_type.len));
+        if (dst_ptr == nullptr) return orc::Literal(predicate_type);
+        memcpy(dst_ptr, sv->ptr, sv->len);
+        StringValue::PadWithSpaces(dst_ptr, dst_type.len, sv->len);
+      } else {
+        dst_ptr = sv->ptr;
+      }
+      return orc::Literal(dst_ptr, sv->len);
+    }
+    case TYPE_DECIMAL: {
+      if (!val) return orc::Literal(predicate_type);
+      orc::Int128 value;
+      switch (type.GetByteSize()) {
+        case 4: {
+          Decimal4Value* dv4 = reinterpret_cast<Decimal4Value*>(val);
+          value = orc::Int128(dv4->value());
+          break;
+        }
+        case 8: {
+          Decimal8Value* dv8 = reinterpret_cast<Decimal8Value*>(val);
+          value = orc::Int128(dv8->value());
+          break;
+        }
+        case 16: {
+          Decimal16Value* dv16 = reinterpret_cast<Decimal16Value*>(val);
+          value = orc::Int128(static_cast<int64_t>(dv16->value() >> 64), // higher bits
+              static_cast<uint64_t>(dv16->value())); // lower bits
+          break;
+        }
+        default:
+          DCHECK(false) << "Invalid byte size for decimal type: " << type.GetByteSize();
+      }
+      return orc::Literal(value, type.precision, type.scale);
+    }
+    default:
+      DCHECK(false) << "Invalid type";
+      return orc::Literal(orc::PredicateDataType::BOOLEAN);
+  }
+}
+
+Status HdfsOrcScanner::PrepareSearchArguments() {
+  if (!state_->query_options().orc_read_statistics) return Status::OK();
+
+  const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
+  if (!min_max_tuple_desc) return Status::OK();
+  // TODO(IMPALA-10894): pushing down predicates into the ORC reader will mess up the
+  //  synthetic(fake) row id, because the row index in the returned batch might differ
+  //  from the index in file (due to some rows are skipped).
+  if (acid_synthetic_rowid_ != nullptr) {
+    VLOG_FILE << "Skip pushing down predicates on non-ACID ORC files under an ACID "
+                 "table: " << filename();
+    return Status::OK();
+  }
+
+  // Clone the min/max statistics conjuncts.
+  RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,
+      expr_perm_pool_.get(), context_->expr_results_pool(),
+      scan_node_->min_max_conjunct_evals(), &min_max_conjunct_evals_));
+
+  std::unique_ptr<orc::SearchArgumentBuilder> sarg =
+      orc::SearchArgumentFactory::newBuilder();
+  bool sargs_supported = false;
+
+  DCHECK_EQ(min_max_tuple_desc->slots().size(), min_max_conjunct_evals_.size());
+  for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) {
+    SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i];
+    ScalarExprEvaluator* eval = min_max_conjunct_evals_[i];
+    ScalarExpr* const_expr = eval->root().GetChild(1);
+    // ORC reader only supports pushing down predicates that constant parts are literal.
+    // We could get non-literal expr if expr rewrites are disabled.
+    if (!const_expr->IsLiteral()) continue;
+    // TODO(IMPALA-10882): push down min-max predicates on CHAR/VARCHAR.
+    if (const_expr->type().type == TYPE_CHAR || const_expr->type().type == TYPE_VARCHAR
+        || slot_desc->type().type == TYPE_CHAR
+        || slot_desc->type().type == TYPE_VARCHAR) {
+      continue;
+    }
+    // TODO(IMPALA-10916): dealing with lhs that is a simple cast expr.
+    if (GetOrcPredicateDataType(slot_desc->type()) !=
+        GetOrcPredicateDataType(const_expr->type())) {
+      continue;
+    }
+
+    //Resolve column path to determine col idx.
+    const orc::Type* node = nullptr;
+    bool pos_field;
+    bool missing_field;
+    RETURN_IF_ERROR(schema_resolver_->ResolveColumn(slot_desc->col_path(),
+       &node, &pos_field, &missing_field));
+
+    if (pos_field || missing_field) continue;
+    // TODO(IMPALA-10882): push down min-max predicates on CHAR/VARCHAR.
+    if (node->getKind() == orc::CHAR || node->getKind() == orc::VARCHAR) continue;
+
+    const string& fn_name = eval->root().function_name();
+    orc::PredicateDataType predicate_type;
+    orc::Literal literal =
+        GetSearchArgumentLiteral(eval, slot_desc->type(), &predicate_type);
+    if (literal.isNull()) {
+      VLOG_QUERY << "Failed to push down predicate " << eval->root().DebugString();
+      continue;
+    }
+
+    if (fn_name == "lt") {
+      sarg->lessThan(node->getColumnId(), predicate_type, literal);
+    } else if (fn_name == "gt") {
+      sarg->startNot()
+          .lessThanEquals(node->getColumnId(), predicate_type, literal)
+          .end();
+    } else if (fn_name == "le") {
+      sarg->lessThanEquals(node->getColumnId(), predicate_type, literal);
+    } else if (fn_name == "ge") {
+      sarg->startNot()
+          .lessThan(node->getColumnId(), predicate_type, literal)
+          .end();
+    } else {
+      DCHECK(false) << "Invalid predicate: " << fn_name;
+      continue;
+    }
+    // If we have reached this far, we have a valid search arg that we can build later.
+    sargs_supported = true;
+  }
+  if (sargs_supported) {
+    try {
+      std::unique_ptr<orc::SearchArgument> final_sarg = sarg->build();
+      VLOG_FILE << "Built search arguments for ORC file: " << filename() << ": "
+          << final_sarg->toString() << ". File schema: " << reader_->getType().toString();
+      row_reader_options_.searchArgument(std::move(final_sarg));
+    } catch (std::exception& e) {
+      string msg = Substitute("Encountered parse error during building search arguments "
+          "in ORC file $0: $1", filename(), e.what());
+      parse_status_ = Status(msg);
+      return parse_status_;
+    }
+  }
+  // Free any expr result allocations accumulated during conjunct evaluation.
+  context_->expr_results_pool()->Clear();
+  return Status::OK();
+}
+
 }
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 3af37de..6f2aacf 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -177,6 +177,12 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   /// Pool to copy non-dictionary buffer into. This pool is responsible for handling
   /// vector batches that do not necessarily fit into one row batch.
   boost::scoped_ptr<MemPool> data_batch_pool_;
+  /// Pool to copy values into when building search arguments. Freed on Close().
+  boost::scoped_ptr<MemPool> search_args_pool_;
+
+  /// Clone of Min/max statistics conjunct evaluators. Has the same lifetime as
+  /// the scanner. Stored in 'obj_pool_'.
+  vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
 
   std::unique_ptr<OrcSchemaResolver> schema_resolver_ = nullptr;
 
@@ -311,6 +317,26 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
 
   void SetSyntheticAcidFieldForOriginalFile(const SlotDescriptor* slot_desc,
       Tuple* template_tuple);
+
+  /// Clones the min/max conjucts into min_max_conjunct_evals_, then builds ORC search
+  /// arguments from the conjuncts. The search arguments will exist for the lifespan of
+  /// the scanner and need not to be updated.
+  Status PrepareSearchArguments() WARN_UNUSED_RESULT;
+
+  /// Helper function for GetLiteralSearchArguments. The template parameter T is the
+  /// type of val, and U is the destination type that the constructor of orc::Literal
+  /// accepts. The conversion is required here, since otherwise multiple implicit
+  /// conversions would be possible.
+  template<typename T, typename U>
+  orc::Literal GetOrcPrimitiveLiteral(orc::PredicateDataType predicate_type, void* val);
+
+  /// Helper function for mapping ColumnType to orc::PredicateDataType.
+  static orc::PredicateDataType GetOrcPredicateDataType(const ColumnType& type);
+
+  /// Returns the literal from the min/max conjucts, with the assumption that the
+  /// evaluator has exactly two children, where the second is a literal.
+  orc::Literal GetSearchArgumentLiteral(ScalarExprEvaluator* eval,
+      const ColumnType& dst_type, orc::PredicateDataType* predicate_type);
 };
 
 } // namespace impala
diff --git a/be/src/exprs/scalar-expr.h b/be/src/exprs/scalar-expr.h
index 341a467..a148cc5 100644
--- a/be/src/exprs/scalar-expr.h
+++ b/be/src/exprs/scalar-expr.h
@@ -235,6 +235,9 @@ class ScalarExpr : public Expr {
   friend class ScalarFnCall;
   friend class SlotRef;
 
+  // The ORC scanner builds search arguments based on the values it gets from Literal.
+  friend class HdfsOrcScanner;
+
   /// For BE tests
   friend class ExprTest;
   friend class ExprCodegenTest;
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 4e431a2..db34c66 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -525,6 +525,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_parquet_read_statistics(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::ORC_READ_STATISTICS: {
+        query_options->__set_orc_read_statistics(IsTrue(value));
+        break;
+      }
       case TImpalaQueryOptions::DEFAULT_JOIN_DISTRIBUTION_MODE: {
         TJoinDistributionMode::type enum_type;
         RETURN_IF_ERROR(GetThriftEnum(value, "default join distribution mode",
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 65113cb..fb1b670 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::PARQUET_BLOOM_FILTER_WRITE + 1);\
+      TImpalaQueryOptions::ORC_READ_STATISTICS+ 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -259,6 +259,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(parquet_bloom_filter_write, PARQUET_BLOOM_FILTER_WRITE,\
       TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(orc_read_statistics, ORC_READ_STATISTICS,\
+      TQueryOptionLevel::ADVANCED)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index de3f037..a5d666a 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -68,7 +68,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=21-dd7509fc38
+export IMPALA_TOOLCHAIN_BUILD_ID=32-edf8115953
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p5
@@ -134,7 +134,7 @@ export IMPALA_OPENLDAP_VERSION=2.4.47
 unset IMPALA_OPENLDAP_URL
 export IMPALA_OPENSSL_VERSION=1.0.2l
 unset IMPALA_OPENSSL_URL
-export IMPALA_ORC_VERSION=1.6.2-p11
+export IMPALA_ORC_VERSION=2667f2996b75e879e52365edfd06b05da4eda941
 unset IMPALA_ORC_URL
 export IMPALA_PROTOBUF_VERSION=3.5.1
 unset IMPALA_PROTOBUF_URL
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index ad0d1fc..4d560d3 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -693,6 +693,9 @@ enum TImpalaQueryOptions {
   //     ALWAYS     - always write Parquet Bloom filters, even if the row group is fully
   //                  dictionary encoded
   PARQUET_BLOOM_FILTER_WRITE = 134
+
+  // Indicates whether to use ORC's search argument to push down predicates.
+  ORC_READ_STATISTICS = 135
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index f5c3cd0..849944d 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -546,6 +546,9 @@ struct TQueryOptions {
   // See comment in ImpalaService.thrift
   135: optional TParquetBloomFilterWrite parquet_bloom_filter_write =
       TParquetBloomFilterWrite.IF_NO_DICT;
+
+  // Indicates whether to use ORC's search argument to push down predicates.
+  136: optional bool orc_read_statistics = true;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index f8dcb68..aac9502 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -421,6 +421,14 @@ public class HdfsScanNode extends ScanNode {
       }
     }
 
+    if (fileFormats_.contains(HdfsFileFormat.ORC)) {
+      // Compute min-max conjuncts only if the ORC_READ_STATISTICS query option is
+      // set to true.
+      if (analyzer.getQueryOptions().orc_read_statistics) {
+        computeMinMaxTupleAndConjuncts(analyzer);
+      }
+    }
+
     if (canApplyCountStarOptimization(analyzer, fileFormats_)) {
       Preconditions.checkState(desc_.getPath().destTable() != null);
       Preconditions.checkState(collectionConjuncts_.isEmpty());
@@ -544,10 +552,11 @@ public class HdfsScanNode extends ScanNode {
     SlotRef slotRef = binaryPred.getChild(0).unwrapSlotRef(true);
     if (slotRef == null) return;
 
+    SlotDescriptor slotDesc = slotRef.getDesc();
     // This node is a table scan, so this must be a scanning slot.
-    Preconditions.checkState(slotRef.getDesc().isScanSlot());
+    Preconditions.checkState(slotDesc.isScanSlot());
     // Skip the slot ref if it refers to an array's "pos" field.
-    if (slotRef.getDesc().isArrayPosRef()) return;
+    if (slotDesc.isArrayPosRef()) return;
 
     Expr constExpr = binaryPred.getChild(1);
     // Only constant exprs can be evaluated against parquet::Statistics. This includes
@@ -555,11 +564,18 @@ public class HdfsScanNode extends ScanNode {
     if (!constExpr.isConstant()) return;
     if (Expr.IS_NULL_VALUE.apply(constExpr)) return;
 
+    // TODO(IMPALA-10882): Push down Min-Max predicates of CHAR/VARCHAR to ORC reader
+    // TODO(IMPALA-10915): Push down Min-Max predicates of TIMESTAMP to ORC reader
+    if (fileFormats_.contains(HdfsFileFormat.ORC) &&
+        (slotDesc.getType() == Type.CHAR || slotDesc.getType() == Type.VARCHAR ||
+            slotDesc.getType() == Type.TIMESTAMP)) {
+      return;
+    }
     if (BinaryPredicate.IS_RANGE_PREDICATE.apply(binaryPred)) {
-      addMinMaxOriginalConjunct(slotRef.getDesc().getParent(), binaryPred);
+      addMinMaxOriginalConjunct(slotDesc.getParent(), binaryPred);
       buildStatsPredicate(analyzer, slotRef, binaryPred, binaryPred.getOp());
     } else if (BinaryPredicate.IS_EQ_PREDICATE.apply(binaryPred)) {
-      addMinMaxOriginalConjunct(slotRef.getDesc().getParent(), binaryPred);
+      addMinMaxOriginalConjunct(slotDesc.getParent(), binaryPred);
       // TODO: this could be optimized for boolean columns.
       buildStatsPredicate(analyzer, slotRef, binaryPred, BinaryPredicate.Operator.LE);
       buildStatsPredicate(analyzer, slotRef, binaryPred, BinaryPredicate.Operator.GE);
@@ -570,10 +586,18 @@ public class HdfsScanNode extends ScanNode {
     // Retrieve the left side of the IN predicate. It must be a simple slot to proceed.
     SlotRef slotRef = inPred.getBoundSlot();
     if (slotRef == null) return;
+    SlotDescriptor slotDesc = slotRef.getDesc();
     // This node is a table scan, so this must be a scanning slot.
-    Preconditions.checkState(slotRef.getDesc().isScanSlot());
+    Preconditions.checkState(slotDesc.isScanSlot());
     // Skip the slot ref if it refers to an array's "pos" field.
-    if (slotRef.getDesc().isArrayPosRef()) return;
+    if (slotDesc.isArrayPosRef()) return;
+    // TODO(IMPALA-10882): Push down Min-Max predicates of CHAR/VARCHAR to ORC reader
+    // TODO(IMPALA-10915): Push down Min-Max predicates of TIMESTAMP to ORC reader
+    if (fileFormats_.contains(HdfsFileFormat.ORC) &&
+        (slotDesc.getType() == Type.CHAR || slotDesc.getType() == Type.VARCHAR ||
+            slotDesc.getType() == Type.TIMESTAMP)) {
+      return;
+    }
     if (inPred.isNotIn()) return;
 
     List<Expr> children = inPred.getChildren();
@@ -599,7 +623,7 @@ public class HdfsScanNode extends ScanNode {
     BinaryPredicate maxBound = new BinaryPredicate(BinaryPredicate.Operator.LE,
         children.get(0).clone(), max.clone());
 
-    addMinMaxOriginalConjunct(slotRef.getDesc().getParent(), inPred);
+    addMinMaxOriginalConjunct(slotDesc.getParent(), inPred);
     buildStatsPredicate(analyzer, slotRef, minBound, minBound.getOp());
     buildStatsPredicate(analyzer, slotRef, maxBound, maxBound.getOp());
   }
@@ -1830,13 +1854,19 @@ public class HdfsScanNode extends ScanNode {
         minMaxOriginalConjuncts_.entrySet()) {
       TupleDescriptor tupleDesc = entry.getKey();
       List<Expr> exprs = entry.getValue();
+      String fileFormatStr;
+      if (hasParquet(fileFormats_) && fileFormats_.contains(HdfsFileFormat.ORC)) {
+        fileFormatStr = "parquet/orc";
+      } else {
+        fileFormatStr = hasParquet(fileFormats_) ? "parquet" : "orc";
+      }
       if (tupleDesc == getTupleDesc()) {
         output.append(prefix)
-        .append(String.format("parquet statistics predicates: %s\n",
+        .append(String.format("%s statistics predicates: %s\n", fileFormatStr,
             Expr.getExplainString(exprs, detailLevel)));
       } else {
         output.append(prefix)
-        .append(String.format("parquet statistics predicates on %s: %s\n",
+        .append(String.format("%s statistics predicates on %s: %s\n", fileFormatStr,
             tupleDesc.getAlias(), Expr.getExplainString(exprs, detailLevel)));
       }
     }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test b/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test
index 4ed5b97..13bfb1f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test
@@ -296,7 +296,7 @@ PLAN-ROOT SINK
 |  |       columns: all
 |  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
-|  |     tuple-ids=5 row-size=28B cardinality=419
+|  |     tuple-ids=7 row-size=28B cardinality=413
 |  |     in pipelines: 04(GETNEXT)
 |  |
 |  03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows]
@@ -395,7 +395,7 @@ Per-Host Resources: mem-estimate=68.94MB mem-reservation=4.98MB thread-reservati
 |  |
 |  |--09:EXCHANGE [BROADCAST]
 |  |  |  mem-estimate=50.74KB mem-reservation=0B thread-reservation=0
-|  |  |  tuple-ids=5 row-size=28B cardinality=419
+|  |  |  tuple-ids=7 row-size=28B cardinality=413
 |  |  |  in pipelines: 04(GETNEXT)
 |  |  |
 |  |  F03:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
@@ -408,7 +408,7 @@ Per-Host Resources: mem-estimate=68.94MB mem-reservation=4.98MB thread-reservati
 |  |       columns: all
 |  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
-|  |     tuple-ids=5 row-size=28B cardinality=419
+|  |     tuple-ids=7 row-size=28B cardinality=413
 |  |     in pipelines: 04(GETNEXT)
 |  |
 |  03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows, RANDOM]
@@ -495,7 +495,7 @@ Per-Host Resources: mem-estimate=212.88MB mem-reservation=6.89MB thread-reservat
   |  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
   |  |     file formats: [ORC]
   |  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
-  |  |     tuple-ids=3 row-size=28B cardinality=419
+  |  |     tuple-ids=5 row-size=28B cardinality=413
   |  |     in pipelines: 04(GETNEXT)
   |  |
   |  03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t2]
@@ -630,7 +630,7 @@ Per-Host Resources: mem-estimate=49.99MB mem-reservation=1.96MB thread-reservati
   |
   |--08:EXCHANGE [BROADCAST]
   |     mem-estimate=50.74KB mem-reservation=0B thread-reservation=0
-  |     tuple-ids=3 row-size=28B cardinality=419
+  |     tuple-ids=5 row-size=28B cardinality=413
   |     in pipelines: 04(GETNEXT)
   |
   03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t2, RANDOM]
@@ -658,7 +658,7 @@ Per-Host Resources: mem-estimate=48.00MB mem-reservation=24.00KB thread-reservat
      extrapolated-rows=disabled max-scan-range-rows=unavailable
      file formats: [ORC]
      mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
-     tuple-ids=3 row-size=28B cardinality=419
+     tuple-ids=5 row-size=28B cardinality=413
      in pipelines: 04(GETNEXT)
 ====
 # Do a join with itself, but the scanned partitions of 't2' don't have delete delta files.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test b/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test
new file mode 100644
index 0000000..a53a7d8
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test
@@ -0,0 +1,542 @@
+====
+---- QUERY
+select id, bool_col from functional_orc_def.alltypessmall
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+set explain_level=2;
+explain select count(*) from functional_orc_def.alltypessmall where tinyint_col = 10
+---- RESULTS: VERIFY_IS_SUBSET
+'   orc statistics predicates: tinyint_col = CAST(10 AS TINYINT)'
+====
+---- QUERY
+# Test on predicate x < min_val for tinyint. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where tinyint_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x <= min_val for tinyint.
+select count(*) from functional_orc_def.alltypessmall where tinyint_col <= 0
+---- RESULTS
+12
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test on predicate x >= max_val for tinyint.
+select count(*) from functional_orc_def.alltypessmall where tinyint_col >= 9
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test on predicate x > max_val for tinyint. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where tinyint_col > 9
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test EQ predicate with values outside the range.
+select count(*) from functional_orc_def.alltypessmall where tinyint_col = -1
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test EQ predicate with values outside the range.
+select count(*) from functional_orc_def.alltypessmall where tinyint_col = 10
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x < min_val for smallint. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where smallint_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x <= min_val for smallint.
+select count(*) from functional_orc_def.alltypessmall where smallint_col <= 0
+---- RESULTS
+12
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test on predicate x >= max_val for smallint.
+select count(*) from functional_orc_def.alltypessmall where smallint_col >= 9
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test on predicate x > max_val for smallint. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where smallint_col > 9
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test EQ predicate with values outside the range.
+select count(*) from functional_orc_def.alltypessmall where smallint_col = -1
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test EQ predicate with values outside the range.
+select count(*) from functional_orc_def.alltypessmall where smallint_col = 10
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x < min_val for int. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where int_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x <= min_val for int.
+select count(*) from functional_orc_def.alltypessmall where int_col <= 0
+---- RESULTS
+12
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test on predicate x >= max_val for int.
+select count(*) from functional_orc_def.alltypessmall where int_col >= 9
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test on predicate x > max_val for int. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where int_col > 9
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test EQ predicate with values outside the range.
+select count(*) from functional_orc_def.alltypessmall where int_col = -1
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test EQ predicate with values outside the range.
+select count(*) from functional_orc_def.alltypessmall where int_col = 10
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x < min_val for bigint. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where bigint_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x <= min_val for bigint.
+select count(*) from functional_orc_def.alltypessmall where bigint_col <= 0
+---- RESULTS
+12
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test on predicate x >= max_val for bigint.
+select count(*) from functional_orc_def.alltypessmall where bigint_col >= 90
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test on predicate x > max_val for bigint. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where bigint_col > 90
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test EQ predicate with values outside the range.
+select count(*) from functional_orc_def.alltypessmall where bigint_col = -1
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test EQ predicate with values outside the range.
+select count(*) from functional_orc_def.alltypessmall where bigint_col = 100
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x < min_val for float. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where float_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x > max_val for float. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where float_col > 9.9
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x < min_val for double. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where double_col < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x > max_val for double. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where double_col > 99
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x < min_val for string. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where string_col < "0"
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x <= min_val for string.
+select count(*) from functional_orc_def.alltypessmall where string_col <= "0"
+---- RESULTS
+12
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test on predicate x >= max_val for string.
+select count(*) from functional_orc_def.alltypessmall where string_col >= "9"
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test on predicate x > max_val for string. No rows should be read by the ORC reader.
+select count(*) from functional_orc_def.alltypessmall where string_col > "9"
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on TIMESTAMP predicates. Currently they are not pushed down (IMPALA-10915).
+select count(*) from functional_orc_def.alltypessmall where timestamp_col < "2009-01-01 00:00:00"
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test on TIMESTAMP predicates. Currently they are not pushed down (IMPALA-10915).
+select count(*) from functional_orc_def.alltypessmall where timestamp_col > "2009-04-03 00:24:00.96"
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test on predicate x < min_val for decimal(9,0).
+# Due to ORC-517 not included in the current Hive version (3.1.3000.7.2.12.0-104),
+# the ORC files have wrong statistics on d1 column showing that its minimum is 0.
+# So we still see RowsRead=5 here.
+select count(*) from functional_orc_def.decimal_tbl where d1 < 1234
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 5
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl where d1 < 0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x > max_val for decimal(9,0). No rows should be read by the ORC reader.
+# Use 132842.0 instead of 132842 to workaround IMPALA-10916.
+select count(*) from functional_orc_def.decimal_tbl where d1 > 132842.0
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x < min_val for decimal(20,10). No rows should be read by the ORC
+# reader.
+select count(*) from functional_orc_def.decimal_tbl where d3 < 1.23456789
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on predicate x > max_val for decimal(20,10). No rows should be read by the ORC
+# reader.
+select count(*) from functional_orc_def.decimal_tbl where d3 > 12345.6789
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on EQ predicate with a value out of the range.
+select count(*) from functional_orc_def.decimal_tbl where d3 = 1.23456788
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test on EQ predicate with a value out of the range.
+select count(*) from functional_orc_def.decimal_tbl where d3 = 12345.6799
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl where d4 > 0.123456789
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl where d4 < 0.123456789
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl where d4 >= 0.12345678
+---- RESULTS
+5
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 5
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl where d4 >= 0.12345679
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test with inverted predicate
+select id, bool_col from functional_orc_def.alltypessmall where -1 > int_col
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Mix with partitioning columns
+select count(*) from functional_orc_def.alltypessmall where int_col < 0 and year < 2012
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select id, bool_col from functional_orc_def.alltypessmall where int_col < 3 - 3
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select id, bool_col from functional_orc_def.alltypessmall where int_col < 3 - 3
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Without expr rewrite and thus without constant folding, predicates with const exprs
+# won't be pushed down.
+set enable_expr_rewrites=0;
+select id, bool_col from functional_orc_def.alltypessmall where int_col < 3 - 3
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+select id, bool_col from functional_orc_def.alltypessmall where 5 + 5 < int_col
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Without expr rewrite and thus without constant folding, predicates with const exprs
+# won't be pushed down.
+set enable_expr_rewrites=0;
+select id, bool_col from functional_orc_def.alltypessmall where 5 + 5 < int_col
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Query that has an implicit cast to a larger integer type
+select count(*) from functional_orc_def.alltypessmall where tinyint_col > 1000000000000
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Predicates with explicit casts are not supported when evaluating orc statistics.
+select count(*) from functional_orc_def.alltypessmall where '0' > cast(tinyint_col as string)
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Explicit casts between numerical types can violate the transitivity of "min()", so they
+# are not supported when evaluating orc statistics.
+select count(*) from functional_orc_def.alltypes where cast(id as tinyint) < 10;
+---- RESULTS
+3878
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 7300
+====
+---- QUERY
+# Predicates on array position can't be pushed down into the orc reader.
+select count(*) from functional_orc_def.complextypestbl.int_array where pos < 5;
+---- RESULTS
+9
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 8
+====
+---- QUERY
+# Test the conversion of constant IN lists to min/max predicats
+select count(*) from functional_orc_def.alltypes where int_col in (-1,-2,-3,-4);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypes where id IN (1,25,49);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 310
+====
+---- QUERY
+select count(*) from functional_orc_def.date_tbl
+where date_part in ("2017-11-27", "1399-06-27") and date_col < '0001-06-19';
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.date_tbl
+where date_part in ("2017-11-27", "1399-06-27") and date_col > '2018-12-31';
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test predicates on CHAR type. They are not pushed down (IMPALA-10882). Just make sure
+# we don't hit DCHECKs.
+select count(*) from functional_orc_def.chars_tiny where cs < cast('1aaaa' as char(5));
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 9
+====
+---- QUERY
+# Test predicates on CHAR type. They are not pushed down (IMPALA-10882). Just make sure
+# we don't hit DCHECKs.
+select count(*) from functional_orc_def.chars_tiny where cs > cast('a' as char(5));
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 9
+====
+---- QUERY
+# Test predicates on VARCHAR type. They are not pushed down (IMPALA-10882). Just make sure
+# we don't hit DCHECKs.
+select count(*) from functional_orc_def.chars_tiny where vc < cast('1cccc' as varchar(32));
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 9
+====
+---- QUERY
+# Test predicates on VARCHAR type. They are not pushed down (IMPALA-10882). Just make sure
+# we don't hit DCHECKs.
+select count(*) from functional_orc_def.chars_tiny where vc > cast('c' as varchar(32));
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 9
+====
+---- QUERY
+# Test that stats support can be disabled using the orc_read_statistics query option.
+set orc_read_statistics=0;
+select count(*) from functional_orc_def.alltypes where id < 0;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 7300
+====
+---- QUERY
+# Test on a larger ORC file that has multiple stripes and each stripe has multiple row
+# groups.
+select count(*) from tpch_orc_def.lineitem where l_orderkey = 1609411;
+---- RESULTS
+7
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 13501
+====
diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py
index f00b697..f202843 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -236,8 +236,7 @@ class TestNestedTypesNoMtDop(ImpalaTestSuite):
   def test_parquet_stats(self, vector):
     """Queries that test evaluation of Parquet row group statistics."""
     if vector.get_value('table_format').file_format == 'orc':
-      pytest.skip('Predicate push down on ORC stripe statistics is not supported' +
-                  '(IMPALA-6505)')
+      pytest.skip('This test is specific to Parquet')
     self.run_test_case('QueryTest/nested-types-parquet-stats', vector)
 
   @SkipIfIsilon.hive
diff --git a/tests/query_test/test_orc_stats.py b/tests/query_test/test_orc_stats.py
new file mode 100644
index 0000000..25cb106
--- /dev/null
+++ b/tests/query_test/test_orc_stats.py
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+
+class TestOrcStats(ImpalaTestSuite):
+  """
+  This suite tests runtime optimizations based on ORC statistics.
+  """
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestOrcStats, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'orc')
+
+  def test_orc_stats(self, vector, unique_database):
+    # The test makes assumptions about the number of rows read from the orc-reader
+    # inside a fragment, so we ensure that the tests run in a single fragment.
+    vector.get_value('exec_option')['num_nodes'] = 1
+    self.run_test_case('QueryTest/orc-stats', vector, use_db=unique_database)