You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2021/10/21 17:32:38 UTC

[impala] 02/04: IMPALA-10873: Push down EQUALS, IS NULL and IN-list predicate to ORC reader

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c127b6b1a72d83778378dc181d9328b4cb7dea9e
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Fri Aug 27 15:39:52 2021 +0800

    IMPALA-10873: Push down EQUALS, IS NULL and IN-list predicate to ORC reader
    
    This patch pushs down more kinds of predicates into the ORC reader,
    including EQUALS, IN-list, and IS-NULL predicates to have more
    improvements:
     - EQUALS and IN-list predicates can be evaluated inside the ORC reader
       with bloom filters in the ORC files.
     - Comparing to scanning parquet that converting an IN-list predicate
       into two binary predicates (i.e. LE and GE), the ORC reader can
       leverage IN-list predicates to skip ORC RowGroups. E.g. a RowGroup
       with int column 'x' in range [1, 100] will be skipped if we push down
       predicate "x in (0, 101)".
     - IS-NULL predicates (including IS-NOT-NULL) can also be used in the
       ORC reader to skip RowGroups.
    
    Implementation:
    FE will collect these kinds of predicates into 'min_max_conjuncts' of
    THdfsScanNode. To better reflect the meaning, 'min_max_conjuncts' is
    renamed to 'stats_conjuncts'. Same for other related variable names.
    
    Parquet scanner will only pick binary min-max conjuncts (i.e. LT, GT,
    LE, and GE) to keep the existing behavior. ORC scanner will build
    SearchArgument based on all these conjuncts.
    
    Tests
     * Add a new test table 'alltypessmall_bool_sorted' which has files
       contiaining sorted bool values.
     * Add test in orc-stats.test
    
    Change-Id: Iaa89f080fe2e87d94fc8ea7f1be83e087fa34225
    Reviewed-on: http://gerrit.cloudera.org:8080/17815
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Qifan Chen <qc...@cloudera.com>
---
 be/src/exec/hdfs-orc-scanner.cc                    | 167 +++++++----
 be/src/exec/hdfs-orc-scanner.h                     |  22 +-
 be/src/exec/hdfs-scan-node-base.cc                 |  46 +--
 be/src/exec/hdfs-scan-node-base.h                  |  22 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  53 ++--
 be/src/exec/parquet/hdfs-parquet-scanner.h         |  18 +-
 bin/impala-config.sh                               |   4 +-
 common/thrift/PlanNodes.thrift                     |  10 +-
 .../org/apache/impala/planner/HdfsScanNode.java    | 208 ++++++++-----
 .../functional/functional_schema_template.sql      |  29 ++
 .../datasets/functional/schema_constraints.csv     |   3 +
 .../queries/QueryTest/orc-stats.test               | 321 +++++++++++++++++++++
 12 files changed, 694 insertions(+), 209 deletions(-)

diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index b02908c..3ac7361 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -301,7 +301,7 @@ void HdfsOrcScanner::Close(RowBatch* row_batch) {
   orc_root_batch_.reset(nullptr);
   search_args_pool_->FreeAll();
 
-  ScalarExprEvaluator::Close(min_max_conjunct_evals_, state_);
+  ScalarExprEvaluator::Close(stats_conjunct_evals_, state_);
 
   // Verify all resources (if any) have been transferred.
   DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
@@ -953,9 +953,10 @@ orc::PredicateDataType HdfsOrcScanner::GetOrcPredicateDataType(const ColumnType&
 }
 
 orc::Literal HdfsOrcScanner::GetSearchArgumentLiteral(ScalarExprEvaluator* eval,
-    const ColumnType& dst_type, orc::PredicateDataType* predicate_type) {
-  DCHECK(eval->root().GetNumChildren() == 2);
-  ScalarExpr* literal_expr = eval->root().GetChild(1);
+    int child_idx, const ColumnType& dst_type, orc::PredicateDataType* predicate_type) {
+  DCHECK_GE(child_idx, 1);
+  DCHECK_LT(child_idx, eval->root().GetNumChildren());
+  ScalarExpr* literal_expr = eval->root().GetChild(child_idx);
   const ColumnType& type = literal_expr->type();
   DCHECK(literal_expr->IsLiteral());
   *predicate_type = GetOrcPredicateDataType(type);
@@ -1049,79 +1050,141 @@ orc::Literal HdfsOrcScanner::GetSearchArgumentLiteral(ScalarExprEvaluator* eval,
   }
 }
 
+bool HdfsOrcScanner::PrepareBinaryPredicate(const string& fn_name, uint64_t orc_column_id,
+    const ColumnType& type, ScalarExprEvaluator* eval,
+    orc::SearchArgumentBuilder* sarg) {
+  orc::PredicateDataType predicate_type;
+  orc::Literal literal = GetSearchArgumentLiteral(eval, /*child_idx*/1, type,
+      &predicate_type);
+  if (literal.isNull()) {
+    VLOG_FILE << "Failed to push down predicate " << eval->root().DebugString();
+    return false;
+  }
+  if (fn_name == "lt") {
+    sarg->lessThan(orc_column_id, predicate_type, literal);
+  } else if (fn_name == "gt") {
+    sarg->startNot()
+        .lessThanEquals(orc_column_id, predicate_type, literal)
+        .end();
+  } else if (fn_name == "le") {
+    sarg->lessThanEquals(orc_column_id, predicate_type, literal);
+  } else if (fn_name == "ge") {
+    sarg->startNot()
+        .lessThan(orc_column_id, predicate_type, literal)
+        .end();
+  } else if (fn_name == "eq") {
+    sarg->equals(orc_column_id, predicate_type, literal);
+  } else {
+    return false;
+  }
+  return true;
+}
+
+bool HdfsOrcScanner::PrepareInListPredicate(uint64_t orc_column_id,
+    const ColumnType& type, ScalarExprEvaluator* eval,
+    orc::SearchArgumentBuilder* sarg) {
+  std::vector<orc::Literal> in_list;
+  // Initialize 'predicate_type' to avoid clang-tidy warning.
+  orc::PredicateDataType predicate_type = orc::PredicateDataType::BOOLEAN;
+  for (int i = 1; i < eval->root().children().size(); ++i) {
+    // ORC reader only supports pushing down predicates whose constant parts are literal.
+    // FE shouldn't push down any non-literal expr here.
+    DCHECK(eval->root().GetChild(i)->IsLiteral())
+        << "Non-literal constant expr cannot be used";
+    in_list.emplace_back(GetSearchArgumentLiteral(eval, i, type, &predicate_type));
+  }
+  // The ORC library requires IN-list has at least 2 literals. Converting to EQUALS
+  // when there is one.
+  if (in_list.size() == 1) {
+    sarg->equals(orc_column_id, predicate_type, in_list[0]);
+  } else if (in_list.size() > 1) {
+    sarg->in(orc_column_id, predicate_type, in_list);
+  } else {
+    DCHECK(false) << "Empty IN-list should cause syntax error";
+    return false;
+  }
+  return true;
+}
+
+void HdfsOrcScanner::PrepareIsNullPredicate(bool is_not_null, uint64_t orc_column_id,
+    const ColumnType& type, orc::SearchArgumentBuilder* sarg) {
+  orc::PredicateDataType orc_type = GetOrcPredicateDataType(type);
+  if (is_not_null) {
+    sarg->startNot()
+        .isNull(orc_column_id, orc_type)
+        .end();
+  } else {
+    sarg->isNull(orc_column_id, orc_type);
+  }
+}
+
 Status HdfsOrcScanner::PrepareSearchArguments() {
   if (!state_->query_options().orc_read_statistics) return Status::OK();
 
-  const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
-  if (!min_max_tuple_desc) return Status::OK();
+  const TupleDescriptor* stats_tuple_desc = scan_node_->stats_tuple_desc();
+  if (!stats_tuple_desc) return Status::OK();
 
   // Clone the min/max statistics conjuncts.
   RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,
       expr_perm_pool_.get(), context_->expr_results_pool(),
-      scan_node_->min_max_conjunct_evals(), &min_max_conjunct_evals_));
+      scan_node_->stats_conjunct_evals(), &stats_conjunct_evals_));
 
   std::unique_ptr<orc::SearchArgumentBuilder> sarg =
       orc::SearchArgumentFactory::newBuilder();
   bool sargs_supported = false;
+  const orc::Type* node = nullptr;
+  bool pos_field;
+  bool missing_field;
 
-  DCHECK_EQ(min_max_tuple_desc->slots().size(), min_max_conjunct_evals_.size());
-  for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) {
-    SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i];
-    ScalarExprEvaluator* eval = min_max_conjunct_evals_[i];
+  DCHECK_EQ(stats_tuple_desc->slots().size(), stats_conjunct_evals_.size());
+  for (int i = 0; i < stats_conjunct_evals_.size(); ++i) {
+    SlotDescriptor* slot_desc = stats_tuple_desc->slots()[i];
+    // Resolve column path to determine col idx in file schema.
+    RETURN_IF_ERROR(schema_resolver_->ResolveColumn(slot_desc->col_path(),
+        &node, &pos_field, &missing_field));
+    if (pos_field || missing_field) continue;
+
+    ScalarExprEvaluator* eval = stats_conjunct_evals_[i];
+    const string& fn_name = eval->root().function_name();
+    if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
+      PrepareIsNullPredicate(fn_name == "is_not_null_pred", node->getColumnId(),
+          slot_desc->type(), sarg.get());
+      sargs_supported = true;
+      continue;
+    }
     ScalarExpr* const_expr = eval->root().GetChild(1);
-    // ORC reader only supports pushing down predicates that constant parts are literal.
+    // ORC reader only supports pushing down predicates whose constant parts are literal.
     // We could get non-literal expr if expr rewrites are disabled.
     if (!const_expr->IsLiteral()) continue;
-    // TODO(IMPALA-10882): push down min-max predicates on CHAR/VARCHAR.
-    if (const_expr->type().type == TYPE_CHAR || const_expr->type().type == TYPE_VARCHAR
-        || slot_desc->type().type == TYPE_CHAR
-        || slot_desc->type().type == TYPE_VARCHAR) {
-      continue;
-    }
+    // TODO: push down stats predicates on CHAR/VARCHAR(IMPALA-10882) and
+    //  TIMESTAMP(IMPALA-10915) to ORC reader
+    DCHECK(const_expr->type().type != TYPE_CHAR);
+    DCHECK(const_expr->type().type != TYPE_VARCHAR);
+    DCHECK(const_expr->type().type != TYPE_TIMESTAMP);
+    DCHECK(slot_desc->type().type != TYPE_CHAR);
+    DCHECK(slot_desc->type().type != TYPE_VARCHAR);
+    DCHECK(slot_desc->type().type != TYPE_TIMESTAMP) << "FE should skip such predicates";
     // TODO(IMPALA-10916): dealing with lhs that is a simple cast expr.
     if (GetOrcPredicateDataType(slot_desc->type()) !=
         GetOrcPredicateDataType(const_expr->type())) {
       continue;
     }
-
-    //Resolve column path to determine col idx.
-    const orc::Type* node = nullptr;
-    bool pos_field;
-    bool missing_field;
-    RETURN_IF_ERROR(schema_resolver_->ResolveColumn(slot_desc->col_path(),
-       &node, &pos_field, &missing_field));
-
-    if (pos_field || missing_field) continue;
-    // TODO(IMPALA-10882): push down min-max predicates on CHAR/VARCHAR.
-    if (node->getKind() == orc::CHAR || node->getKind() == orc::VARCHAR) continue;
-
-    const string& fn_name = eval->root().function_name();
-    orc::PredicateDataType predicate_type;
-    orc::Literal literal =
-        GetSearchArgumentLiteral(eval, slot_desc->type(), &predicate_type);
-    if (literal.isNull()) {
-      VLOG_QUERY << "Failed to push down predicate " << eval->root().DebugString();
+    // Skip if the file schema contains unsupported types.
+    // TODO: push down stats predicates on CHAR/VARCHAR(IMPALA-10882) and
+    //  TIMESTAMP(IMPALA-10915) to ORC reader
+    if (node->getKind() == orc::CHAR
+        || node->getKind() == orc::VARCHAR
+        || node->getKind() == orc::TIMESTAMP) {
       continue;
     }
 
-    if (fn_name == "lt") {
-      sarg->lessThan(node->getColumnId(), predicate_type, literal);
-    } else if (fn_name == "gt") {
-      sarg->startNot()
-          .lessThanEquals(node->getColumnId(), predicate_type, literal)
-          .end();
-    } else if (fn_name == "le") {
-      sarg->lessThanEquals(node->getColumnId(), predicate_type, literal);
-    } else if (fn_name == "ge") {
-      sarg->startNot()
-          .lessThan(node->getColumnId(), predicate_type, literal)
-          .end();
-    } else {
-      DCHECK(false) << "Invalid predicate: " << fn_name;
+    if (fn_name == "in_iterate" || fn_name == "in_set_lookup") {
+      sargs_supported |= PrepareInListPredicate(
+          node->getColumnId(), slot_desc->type(), eval, sarg.get());
       continue;
     }
-    // If we have reached this far, we have a valid search arg that we can build later.
-    sargs_supported = true;
+    sargs_supported |= PrepareBinaryPredicate(fn_name, node->getColumnId(),
+        slot_desc->type(), eval, sarg.get());
   }
   if (sargs_supported) {
     try {
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 6f2aacf..0e7c0cb 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -180,9 +180,9 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   /// Pool to copy values into when building search arguments. Freed on Close().
   boost::scoped_ptr<MemPool> search_args_pool_;
 
-  /// Clone of Min/max statistics conjunct evaluators. Has the same lifetime as
-  /// the scanner. Stored in 'obj_pool_'.
-  vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
+  /// Clone of statistics conjunct evaluators. Has the same lifetime as the scanner.
+  /// Stored in 'obj_pool_'.
+  vector<ScalarExprEvaluator*> stats_conjunct_evals_;
 
   std::unique_ptr<OrcSchemaResolver> schema_resolver_ = nullptr;
 
@@ -318,7 +318,15 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   void SetSyntheticAcidFieldForOriginalFile(const SlotDescriptor* slot_desc,
       Tuple* template_tuple);
 
-  /// Clones the min/max conjucts into min_max_conjunct_evals_, then builds ORC search
+  bool PrepareBinaryPredicate(const string& fn_name, uint64_t orc_column_id,
+      const ColumnType& type, ScalarExprEvaluator* eval,
+      orc::SearchArgumentBuilder* sarg);
+  bool PrepareInListPredicate(uint64_t orc_column_id, const ColumnType& type,
+      ScalarExprEvaluator* eval, orc::SearchArgumentBuilder* sarg);
+  void PrepareIsNullPredicate(bool is_not_null, uint64_t orc_column_id,
+      const ColumnType& type, orc::SearchArgumentBuilder* sarg);
+
+  /// Clones the stats conjucts into stats_conjunct_evals_, then builds ORC search
   /// arguments from the conjuncts. The search arguments will exist for the lifespan of
   /// the scanner and need not to be updated.
   Status PrepareSearchArguments() WARN_UNUSED_RESULT;
@@ -333,9 +341,9 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   /// Helper function for mapping ColumnType to orc::PredicateDataType.
   static orc::PredicateDataType GetOrcPredicateDataType(const ColumnType& type);
 
-  /// Returns the literal from the min/max conjucts, with the assumption that the
-  /// evaluator has exactly two children, where the second is a literal.
-  orc::Literal GetSearchArgumentLiteral(ScalarExprEvaluator* eval,
+  /// Returns the literal specified by 'child_idx' from the stats conjuct 'eval',
+  /// with the assumption that the specifit child is a literal.
+  orc::Literal GetSearchArgumentLiteral(ScalarExprEvaluator* eval, int child_idx,
       const ColumnType& dst_type, orc::PredicateDataType* predicate_type);
 };
 
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index a3a6338..da9f110 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -213,15 +213,15 @@ Status HdfsScanPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   DCHECK(conjuncts_map_[tuple_id].empty());
   conjuncts_map_[tuple_id] = conjuncts_;
 
-  // Add min max conjuncts
-  if (tnode.hdfs_scan_node.__isset.min_max_tuple_id) {
-    TupleDescriptor* min_max_tuple_desc =
-        state->desc_tbl().GetTupleDescriptor(tnode.hdfs_scan_node.min_max_tuple_id);
-    DCHECK(min_max_tuple_desc != nullptr);
-    RowDescriptor* min_max_row_desc = state->obj_pool()->Add(
-        new RowDescriptor(min_max_tuple_desc, /* is_nullable */ false));
-    RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts,
-        *min_max_row_desc, state, &min_max_conjuncts_));
+  // Add stats conjuncts
+  if (tnode.hdfs_scan_node.__isset.stats_tuple_id) {
+    TupleDescriptor* stats_tuple_desc =
+        state->desc_tbl().GetTupleDescriptor(tnode.hdfs_scan_node.stats_tuple_id);
+    DCHECK(stats_tuple_desc != nullptr);
+    RowDescriptor* stats_row_desc = state->obj_pool()->Add(
+        new RowDescriptor(stats_tuple_desc, /* is_nullable */ false));
+    RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.stats_conjuncts,
+        *stats_row_desc, state, &stats_conjuncts_));
   }
 
   // Transfer overlap predicate descs.
@@ -372,7 +372,7 @@ void HdfsScanPlanNode::Close() {
     if (tid_conjunct.first == tuple_id) continue;
     ScalarExpr::Close(tid_conjunct.second);
   }
-  ScalarExpr::Close(min_max_conjuncts_);
+  ScalarExpr::Close(stats_conjuncts_);
   if (shared_state_.template_pool_.get() != nullptr) {
     shared_state_.template_pool_->FreeAll();
   }
@@ -392,11 +392,11 @@ Status HdfsScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) co
 HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const HdfsScanPlanNode& pnode,
     const THdfsScanNode& hdfs_scan_node, const DescriptorTbl& descs)
   : ScanNode(pool, pnode, descs),
-    min_max_tuple_id_(
-        hdfs_scan_node.__isset.min_max_tuple_id ? hdfs_scan_node.min_max_tuple_id : -1),
-    min_max_conjuncts_(pnode.min_max_conjuncts_),
-    min_max_tuple_desc_(
-        min_max_tuple_id_ == -1 ? nullptr : descs.GetTupleDescriptor(min_max_tuple_id_)),
+    stats_tuple_id_(
+        hdfs_scan_node.__isset.stats_tuple_id ? hdfs_scan_node.stats_tuple_id : -1),
+    stats_conjuncts_(pnode.stats_conjuncts_),
+    stats_tuple_desc_(
+        stats_tuple_id_ == -1 ? nullptr : descs.GetTupleDescriptor(stats_tuple_id_)),
     skip_header_line_count_(hdfs_scan_node.__isset.skip_header_line_count ?
             hdfs_scan_node.skip_header_line_count :
             0),
@@ -441,10 +441,10 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
     }
   }
 
-  // Prepare min max statistics conjuncts.
-  if (min_max_tuple_id_ != -1) {
-    RETURN_IF_ERROR(ScalarExprEvaluator::Create(min_max_conjuncts_, state, pool_,
-        expr_perm_pool(), expr_results_pool(), &min_max_conjunct_evals_));
+  // Prepare stats statistics conjuncts.
+  if (stats_tuple_id_ != -1) {
+    RETURN_IF_ERROR(ScalarExprEvaluator::Create(stats_conjuncts_, state, pool_,
+        expr_perm_pool(), expr_results_pool(), &stats_conjunct_evals_));
   }
 
   // Check if reservation was enough to allocate at least one buffer. The
@@ -521,8 +521,8 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
     RETURN_IF_ERROR(ScalarExprEvaluator::Open(entry.second, state));
   }
 
-  // Open min max conjuncts
-  RETURN_IF_ERROR(ScalarExprEvaluator::Open(min_max_conjunct_evals_, state));
+  // Open stats conjuncts
+  RETURN_IF_ERROR(ScalarExprEvaluator::Open(stats_conjunct_evals_, state));
 
   RETURN_IF_ERROR(ClaimBufferReservation(state));
   reader_context_ = ExecEnv::GetInstance()->disk_io_mgr()->RegisterContext();
@@ -630,8 +630,8 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
     ScalarExprEvaluator::Close(tid_conjunct_eval.second, state);
   }
 
-  // Close min max conjunct
-  ScalarExprEvaluator::Close(min_max_conjunct_evals_, state);
+  // Close stats conjunct
+  ScalarExprEvaluator::Close(stats_conjunct_evals_, state);
   ScanNode::Close(state);
 }
 
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 4036bd9..e3d2ee8 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -276,13 +276,13 @@ class HdfsScanPlanNode : public ScanPlanNode {
   typedef std::unordered_map<TupleId, std::vector<ScalarExpr*>> ConjunctsMap;
   ConjunctsMap conjuncts_map_;
 
-  /// Conjuncts to evaluate on parquet::Statistics.
-  std::vector<ScalarExpr*> min_max_conjuncts_;
+  /// Conjuncts to evaluate on parquet::Statistics or to be pushed down to ORC reader.
+  std::vector<ScalarExpr*> stats_conjuncts_;
 
   /// The list of overlap predicate descs. Each is used to find out whether the range
   /// of values of a data item overlap with a min/max filter. Data structure wise, each
   /// desc is composed of the ID of the min/max filter, the slot index in
-  /// 'min_max_tuple_desc_' to hold the min value of the data item and the actual overlap
+  /// 'stats_tuple_desc_' to hold the min value of the data item and the actual overlap
   /// predicate. The next slot after the slot index implicitly holds the max value of
   /// the data item.
   std::vector<TOverlapPredicateDesc> overlap_predicate_descs_;
@@ -418,13 +418,13 @@ class HdfsScanNodeBase : public ScanNode {
   /// Returns number of partition key slots.
   int num_materialized_partition_keys() const { return partition_key_slots_.size(); }
 
-  int min_max_tuple_id() const { return min_max_tuple_id_; }
+  int stats_tuple_id() const { return stats_tuple_id_; }
 
-  const std::vector<ScalarExprEvaluator*>& min_max_conjunct_evals() const {
-    return min_max_conjunct_evals_;
+  const std::vector<ScalarExprEvaluator*>& stats_conjunct_evals() const {
+    return stats_conjunct_evals_;
   }
 
-  const TupleDescriptor* min_max_tuple_desc() const { return min_max_tuple_desc_; }
+  const TupleDescriptor* stats_tuple_desc() const { return stats_tuple_desc_; }
   const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
   const HdfsTableDescriptor* hdfs_table() const { return hdfs_table_; }
   const AvroSchemaElement& avro_schema() const { return avro_schema_; }
@@ -626,14 +626,14 @@ class HdfsScanNodeBase : public ScanNode {
   friend class HdfsScanner;
 
   /// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics.
-  const int min_max_tuple_id_;
+  const int stats_tuple_id_;
 
   /// Conjuncts to evaluate on parquet::Statistics.
-  const vector<ScalarExpr*>& min_max_conjuncts_;
-  vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
+  const vector<ScalarExpr*>& stats_conjuncts_;
+  vector<ScalarExprEvaluator*> stats_conjunct_evals_;
 
   /// Descriptor for the tuple used to evaluate conjuncts on parquet::Statistics.
-  TupleDescriptor* min_max_tuple_desc_ = nullptr;
+  TupleDescriptor* stats_tuple_desc_ = nullptr;
 
   // Number of header lines to skip at the beginning of each file of this table. Only set
   // to values > 0 for hdfs text files.
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index d1df159..b1f0fdb 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -168,7 +168,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   perm_pool_.reset(new MemPool(scan_node_->mem_tracker()));
 
   // Allocate tuple buffer to evaluate conjuncts on parquet::Statistics.
-  const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
+  const TupleDescriptor* min_max_tuple_desc = scan_node_->stats_tuple_desc();
   if (min_max_tuple_desc != nullptr) {
     int64_t tuple_size = min_max_tuple_desc->byte_size();
     uint8_t* buffer = perm_pool_->TryAllocate(tuple_size);
@@ -183,7 +183,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   // Clone the min/max statistics conjuncts.
   RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,
       expr_perm_pool_.get(), context_->expr_results_pool(),
-      scan_node_->min_max_conjunct_evals(), &min_max_conjunct_evals_));
+      scan_node_->stats_conjunct_evals(), &stats_conjunct_evals_));
 
   for (int i = 0; i < context->filter_ctxs().size(); ++i) {
     const FilterContext* ctx = &context->filter_ctxs()[i];
@@ -293,7 +293,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
   }
   if (schema_resolver_.get() != nullptr) schema_resolver_.reset();
 
-  ScalarExprEvaluator::Close(min_max_conjunct_evals_, state_);
+  ScalarExprEvaluator::Close(stats_conjunct_evals_, state_);
 
   for (int i = 0; i < filter_ctxs_.size(); ++i) {
     const FilterStats* stats = filter_ctxs_[i]->stats;
@@ -548,7 +548,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
 
   if (!state_->query_options().parquet_read_statistics) return Status::OK();
 
-  const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
+  const TupleDescriptor* min_max_tuple_desc = scan_node_->stats_tuple_desc();
   if (!min_max_tuple_desc) return Status::OK();
 
   int64_t tuple_size = min_max_tuple_desc->byte_size();
@@ -556,12 +556,13 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
   DCHECK(min_max_tuple_ != nullptr);
   min_max_tuple_->Init(tuple_size);
 
-  DCHECK_GE(min_max_tuple_desc->slots().size(), min_max_conjunct_evals_.size());
-  for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) {
+  DCHECK_GE(min_max_tuple_desc->slots().size(), stats_conjunct_evals_.size());
+  for (int i = 0; i < stats_conjunct_evals_.size(); ++i) {
 
     SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i];
-    ScalarExprEvaluator* eval = min_max_conjunct_evals_[i];
-
+    ScalarExprEvaluator* eval = stats_conjunct_evals_[i];
+    const string& fn_name = eval->root().function_name();
+    if (!IsSupportedStatsConjunct(fn_name)) continue;
     bool missing_field = false;
     SchemaNode* node = nullptr;
 
@@ -585,7 +586,6 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
       break;
     }
 
-    const string& fn_name = eval->root().function_name();
     ColumnStatsReader::StatsField stats_field;
     if (!ColumnStatsReader::GetRequiredStatsField(fn_name, &stats_field)) continue;
 
@@ -647,21 +647,21 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
 
   if (!state_->query_options().parquet_read_statistics) return Status::OK();
 
-  const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
-  if (GetOverlapPredicateDescs().size() > 0 && !min_max_tuple_desc) {
+  const TupleDescriptor* stats_tuple_desc = scan_node_->stats_tuple_desc();
+  if (GetOverlapPredicateDescs().size() > 0 && !stats_tuple_desc) {
     stringstream err;
-    err << "min_max_tuple_desc is null.";
+    err << "stats_tuple_desc is null.";
     DCHECK(false) << err.str();
     return Status(err.str());
   }
 
   DCHECK(min_max_tuple_ != nullptr);
-  min_max_tuple_->Init(min_max_tuple_desc->byte_size());
+  min_max_tuple_->Init(stats_tuple_desc->byte_size());
 
   // The total number slots in min_max_tuple_ should be equal to or larger than
   // the number of min/max conjuncts. The extra number slots are for the overlap
   // predicates. # min_max_conjuncts + 2 * # overlap predicates = # min_max_slots.
-  DCHECK_GE(min_max_tuple_desc->slots().size(), min_max_conjunct_evals_.size());
+  DCHECK_GE(stats_tuple_desc->slots().size(), stats_conjunct_evals_.size());
 
   TMinmaxFilteringLevel::type level = state_->query_options().minmax_filtering_level;
   float threshold = (float)(state_->query_options().minmax_filter_threshold);
@@ -703,7 +703,7 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
       continue;
     }
 
-    SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[slot_idx];
+    SlotDescriptor* slot_desc = stats_tuple_desc->slots()[slot_idx];
 
     bool missing_field = false;
     SchemaNode* node = nullptr;
@@ -800,7 +800,7 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
 
 bool HdfsParquetScanner::ShouldProcessPageIndex() {
   if (!state_->query_options().parquet_read_page_index) return false;
-  if (!min_max_conjunct_evals_.empty()) return true;
+  if (!stats_conjunct_evals_.empty()) return true;
   for (auto desc : GetOverlapPredicateDescs()) {
     if (IsFilterWorthyForOverlapCheck(FindFilterIndex(desc.filter_id))) {
       return true;
@@ -1294,9 +1294,9 @@ void HdfsParquetScanner::GetMinMaxSlotsForOverlapPred(
   DCHECK(min_slot);
   DCHECK(max_slot);
   SlotDescriptor* min_slot_desc =
-      scan_node_->min_max_tuple_desc()->slots()[overlap_slot_idx];
+      scan_node_->stats_tuple_desc()->slots()[overlap_slot_idx];
   SlotDescriptor* max_slot_desc =
-      scan_node_->min_max_tuple_desc()->slots()[overlap_slot_idx + 1];
+      scan_node_->stats_tuple_desc()->slots()[overlap_slot_idx + 1];
   *min_slot = min_max_tuple_->GetSlot(min_slot_desc->tuple_offset());
   *max_slot = min_max_tuple_->GetSlot(max_slot_desc->tuple_offset());
 }
@@ -1390,10 +1390,10 @@ void HdfsParquetScanner::CollectSkippedPageRangesForSortedColumn(
 Status HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters(
     vector<RowRange>* skip_ranges) {
   DCHECK(skip_ranges);
-  const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
+  const TupleDescriptor* min_max_tuple_desc = scan_node_->stats_tuple_desc();
   if (GetOverlapPredicateDescs().size() > 0 && !min_max_tuple_desc) {
     stringstream err;
-    err << "min_max_tuple_desc is null.";
+    err << "stats_tuple_desc is null.";
     DCHECK(false) << err.str();
     return Status(err.str());
   }
@@ -1495,9 +1495,11 @@ Status HdfsParquetScanner::EvaluatePageIndex() {
   parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
   vector<RowRange> skip_ranges;
 
-  for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) {
-    ScalarExprEvaluator* eval = min_max_conjunct_evals_[i];
-    SlotDescriptor* slot_desc = scan_node_->min_max_tuple_desc()->slots()[i];
+  for (int i = 0; i < stats_conjunct_evals_.size(); ++i) {
+    ScalarExprEvaluator* eval = stats_conjunct_evals_[i];
+    const string& fn_name = eval->root().function_name();
+    if (!IsSupportedStatsConjunct(fn_name)) continue;
+    SlotDescriptor* slot_desc = scan_node_->stats_tuple_desc()->slots()[i];
 
     bool missing_field = false;
     SchemaNode* node = nullptr;
@@ -1518,11 +1520,10 @@ Status HdfsParquetScanner::EvaluatePageIndex() {
     parquet::ColumnIndex column_index;
     RETURN_IF_ERROR(page_index_.DeserializeColumnIndex(col_chunk, &column_index));
 
-    min_max_tuple_->Init(scan_node_->min_max_tuple_desc()->byte_size());
+    min_max_tuple_->Init(scan_node_->stats_tuple_desc()->byte_size());
     void* slot = min_max_tuple_->GetSlot(slot_desc->tuple_offset());
 
     const int num_of_pages = column_index.null_pages.size();
-    const string& fn_name = eval->root().function_name();
     ColumnStatsReader::StatsField stats_field;
     if (!ColumnStatsReader::GetRequiredStatsField(fn_name, &stats_field)) continue;
 
@@ -1888,7 +1889,7 @@ Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() {
   unordered_map<int, std::unordered_set<std::pair<std::string, const Literal*>>>
       conjunct_halves;
 
-  for (ScalarExprEvaluator* eval : min_max_conjunct_evals_) {
+  for (ScalarExprEvaluator* eval : stats_conjunct_evals_) {
     const ScalarExpr& expr = eval->root();
     const string& function_name = expr.function_name();
 
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 05a2552..cab7d60 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -409,9 +409,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// Tuple to hold values when reading parquet::Statistics. Owned by perm_pool_.
   Tuple* min_max_tuple_;
 
-  /// Clone of Min/max statistics conjunct evaluators. Has the same life time as
-  /// the scanner. Stored in 'obj_pool_'.
-  vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
+  /// Clone of statistics conjunct evaluators. Has the same life time as the scanner.
+  /// Stored in 'obj_pool_'.
+  vector<ScalarExprEvaluator*> stats_conjunct_evals_;
 
   /// A map from indices of columns that participate in an EQ conjunct to the hash of the
   /// literal value of the EQ conjunct. Used in Parquet Bloom filtering.
@@ -555,6 +555,14 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
 
   virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT;
 
+  /// Return true if we can evaluate this type of predicate on parquet statistic.
+  /// FE could populate stats-predicates that can't be evaluated here if the table
+  /// contains both Parquet and ORC format partitions. Here we only pick min-max
+  /// predicates, i.e. <, >, <=, and >=.
+  bool IsSupportedStatsConjunct(std::string fn_name) {
+    return fn_name == "lt" || fn_name == "gt" || fn_name == "le" || fn_name == "ge";
+  }
+
   /// Evaluates the min/max predicates of the 'scan_node_' using the parquet::Statistics
   /// of 'row_group'. 'file_metadata' is used to determine the ordering that was used to
   /// compute the statistics. Sets 'skip_row_group' to true if the row group can be
@@ -677,7 +685,7 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// Sets 'filter_pages_' to true if found any page to filter out.
   Status ProcessPageIndex();
 
-  /// Evaluates 'min_max_conjunct_evals_' against the column index and determines the row
+  /// Evaluates 'stats_conjunct_evals_' against the column index and determines the row
   /// ranges that might contain data we are looking for.
   /// Sets 'filter_pages_' to true if found any page to filter out.
   Status EvaluatePageIndex();
@@ -864,7 +872,7 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// provided buffer must be preallocated to hold at least 'size' bytes.
   Status ReadToBuffer(uint64_t offset, uint8_t* buffer, uint64_t size) WARN_UNUSED_RESULT;
 
-  /// Processes 'min_max_conjunct_evals_' to extract equality (EQ) conjuncts. These are
+  /// Processes 'stats_conjunct_evals_' to extract equality (EQ) conjuncts. These are
   /// now represented as two conjuncts: an LE and a GE. This function finds such pairs and
   /// fills the map 'eq_conjunct_info_' with the hash of the literal in the EQ conjunct.
   /// See 'eq_conjunct_info_'.
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 91eb441..6379264 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -68,7 +68,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=32-edf8115953
+export IMPALA_TOOLCHAIN_BUILD_ID=47-5e0072f1e0
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p5
@@ -136,7 +136,7 @@ export IMPALA_OPENLDAP_VERSION=2.4.47
 unset IMPALA_OPENLDAP_URL
 export IMPALA_OPENSSL_VERSION=1.0.2l
 unset IMPALA_OPENSSL_URL
-export IMPALA_ORC_VERSION=2667f2996b75e879e52365edfd06b05da4eda941
+export IMPALA_ORC_VERSION=1.7.0-p3
 unset IMPALA_ORC_URL
 export IMPALA_PROTOBUF_VERSION=3.5.1
 unset IMPALA_PROTOBUF_URL
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 3c48fe8..46db62d 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -304,12 +304,12 @@ struct THdfsScanNode {
   // TODO: Remove this option when the MT scan node supports all file formats.
   6: optional bool use_mt_scan_node
 
-  // Conjuncts that can be evaluated against parquet::Statistics using the tuple
-  // referenced by 'min_max_tuple_id'.
-  7: optional list<Exprs.TExpr> min_max_conjuncts
+  // Conjuncts that can be pushed down to the ORC reader, or be evaluated against
+  // parquet::Statistics using the tuple referenced by 'stats_tuple_id'.
+  7: optional list<Exprs.TExpr> stats_conjuncts
 
-  // Tuple to evaluate 'min_max_conjuncts' against.
-  8: optional Types.TTupleId min_max_tuple_id
+  // Tuple to evaluate 'stats_conjuncts' against.
+  8: optional Types.TTupleId stats_tuple_id
 
   // The conjuncts that are eligible for dictionary filtering.
   9: optional map<Types.TSlotId, list<i32>> dictionary_filter_conjuncts
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index aac9502..b9e84d5 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -33,16 +33,14 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.impala.analysis.Analyzer;
-import org.apache.impala.analysis.AlterTableSortByStmt;
 import org.apache.impala.analysis.BinaryPredicate;
-import org.apache.impala.analysis.CompoundPredicate;
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.InPredicate;
 import org.apache.impala.analysis.IsNotEmptyPredicate;
+import org.apache.impala.analysis.IsNullPredicate;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.MultiAggregateInfo;
-import org.apache.impala.analysis.Path;
 import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotId;
 import org.apache.impala.analysis.SlotRef;
@@ -59,11 +57,9 @@ import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
-import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -103,7 +99,6 @@ import com.google.common.base.Joiner;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -301,25 +296,25 @@ public class HdfsScanNode extends ScanNode {
   private int numFilesNoDiskIds_ = 0;
   private int numPartitionsNoDiskIds_ = 0;
 
-  // List of conjuncts for min/max values of parquet::Statistics, that are used to skip
-  // data when scanning Parquet files.
-  private final List<Expr> minMaxConjuncts_ = new ArrayList<>();
+  // List of conjuncts for min/max values of parquet/orc statistics, that are used to skip
+  // data when scanning Parquet/ORC files.
+  private final List<Expr> statsConjuncts_ = new ArrayList<>();
 
   // Map from TupleDescriptor to list of PlanNode conjuncts that have been transformed
-  // into conjuncts in 'minMaxConjuncts_'.
-  private final Map<TupleDescriptor, List<Expr>> minMaxOriginalConjuncts_ =
+  // into conjuncts in 'statsConjuncts_'.
+  private final Map<TupleDescriptor, List<Expr>> statsOriginalConjuncts_ =
       new LinkedHashMap<>();
 
   // Tuple that is used to materialize statistics when scanning Parquet files. For each
   // column it can contain 0, 1, or 2 slots, depending on whether the column needs to be
   // evaluated against the min and/or the max value of the corresponding
   // parquet::Statistics.
-  private TupleDescriptor minMaxTuple_;
+  private TupleDescriptor statsTuple_;
 
   // The list of overlap predicate descs. See TOverlapPredicateDesc in PlanNodes.thrift.
   private ArrayList<TOverlapPredicateDesc> overlapPredicateDescs_ = new ArrayList<>();
 
-  // Index of the first slot in minMaxTuple_ for overlap predicates.
+  // Index of the first slot in statsTuple_ for overlap predicates.
   private int overlap_first_slot_idx_ = -1;
 
   // Slot that is used to record the Parquet metadata for the count(*) aggregation if
@@ -412,7 +407,7 @@ public class HdfsScanNode extends ScanNode {
       // Compute min-max conjuncts only if the PARQUET_READ_STATISTICS query option is
       // set to true.
       if (analyzer.getQueryOptions().parquet_read_statistics) {
-        computeMinMaxTupleAndConjuncts(analyzer);
+        computeStatsTupleAndConjuncts(analyzer);
       }
       // Compute dictionary conjuncts only if the PARQUET_DICTIONARY_FILTERING query
       // option is set to true.
@@ -425,7 +420,7 @@ public class HdfsScanNode extends ScanNode {
       // Compute min-max conjuncts only if the ORC_READ_STATISTICS query option is
       // set to true.
       if (analyzer.getQueryOptions().orc_read_statistics) {
-        computeMinMaxTupleAndConjuncts(analyzer);
+        computeStatsTupleAndConjuncts(analyzer);
       }
     }
 
@@ -526,25 +521,60 @@ public class HdfsScanNode extends ScanNode {
 
   /**
    * Builds a predicate to evaluate against parquet::Statistics by copying 'inputSlot'
-   * into 'minMaxTuple_', combining 'inputSlot', 'inputPred' and 'op' into a new
-   * predicate, and adding it to 'minMaxConjuncts_'.
+   * into 'statsTuple_', combining 'inputSlot', 'inputPred' and 'op' into a new
+   * predicate, and adding it to 'statsConjuncts_'.
    */
-  private void buildStatsPredicate(Analyzer analyzer, SlotRef inputSlot,
+  private void buildBinaryStatsPredicate(Analyzer analyzer, SlotRef inputSlot,
       BinaryPredicate inputPred, BinaryPredicate.Operator op) {
     // Obtain the rhs expr of the input predicate
     Expr constExpr = inputPred.getChild(1);
     Preconditions.checkState(constExpr.isConstant());
 
     // Make a new slot descriptor, which adds it to the tuple descriptor.
-    SlotDescriptor slotDesc = analyzer.getDescTbl().copySlotDescriptor(minMaxTuple_,
+    SlotDescriptor slotDesc = analyzer.getDescTbl().copySlotDescriptor(statsTuple_,
         inputSlot.getDesc());
     SlotRef slot = new SlotRef(slotDesc);
     BinaryPredicate statsPred = new BinaryPredicate(op, slot, constExpr);
     statsPred.analyzeNoThrow(analyzer);
-    minMaxConjuncts_.add(statsPred);
+    statsConjuncts_.add(statsPred);
   }
 
-  private void tryComputeBinaryMinMaxPredicate(Analyzer analyzer,
+  /**
+   * Similar to the above method but builds an IN-list predicate which can be pushed down
+   * to the ORC reader.
+   */
+  private void buildInListStatsPredicate(Analyzer analyzer, SlotRef inputSlot,
+      InPredicate inputPred) {
+    Preconditions.checkState(!inputPred.isNotIn());
+    List<Expr> children = inputPred.getChildren();
+    Preconditions.checkState(inputSlot == children.get(0).unwrapSlotRef(true));
+    List<Expr> inList = Lists.newArrayListWithCapacity(children.size() - 1);
+    for (int i = 1; i < children.size(); ++i) {
+      Expr child = children.get(i);
+      // If any child is not a literal, then nothing can be done
+      if (!Expr.IS_LITERAL.apply(child)) return;
+      if (isUnsupportedStatsType(child.getType())) return;
+      inList.add(child);
+    }
+    // Make a new slot descriptor, which adds it to the tuple descriptor.
+    SlotDescriptor slotDesc = analyzer.getDescTbl().copySlotDescriptor(statsTuple_,
+        inputSlot.getDesc());
+    SlotRef slot = new SlotRef(slotDesc);
+    InPredicate inPred = new InPredicate(slot, inList, inputPred.isNotIn());
+    inPred.analyzeNoThrow(analyzer);
+    statsConjuncts_.add(inPred);
+  }
+
+  private boolean isUnsupportedStatsType(Type type) {
+    // TODO(IMPALA-10882): Push down Min-Max predicates of CHAR/VARCHAR to ORC reader
+    // TODO(IMPALA-10915): Push down Min-Max predicates of TIMESTAMP to ORC reader
+    return fileFormats_.contains(HdfsFileFormat.ORC)
+        && (type.getPrimitiveType() == PrimitiveType.CHAR
+            || type.getPrimitiveType() == PrimitiveType.VARCHAR
+            || type.getPrimitiveType() == PrimitiveType.TIMESTAMP);
+  }
+
+  private void tryComputeBinaryStatsPredicate(Analyzer analyzer,
       BinaryPredicate binaryPred) {
     // We only support slot refs on the left hand side of the predicate, a rewriting
     // rule makes sure that all compatible exprs are rewritten into this form. Only
@@ -563,26 +593,29 @@ public class HdfsScanNode extends ScanNode {
     // LiteralExpr, but can also be an expr like "1 + 2".
     if (!constExpr.isConstant()) return;
     if (Expr.IS_NULL_VALUE.apply(constExpr)) return;
+    if (isUnsupportedStatsType(slotDesc.getType())) return;
+    if (isUnsupportedStatsType(constExpr.getType())) return;
 
-    // TODO(IMPALA-10882): Push down Min-Max predicates of CHAR/VARCHAR to ORC reader
-    // TODO(IMPALA-10915): Push down Min-Max predicates of TIMESTAMP to ORC reader
-    if (fileFormats_.contains(HdfsFileFormat.ORC) &&
-        (slotDesc.getType() == Type.CHAR || slotDesc.getType() == Type.VARCHAR ||
-            slotDesc.getType() == Type.TIMESTAMP)) {
-      return;
-    }
     if (BinaryPredicate.IS_RANGE_PREDICATE.apply(binaryPred)) {
-      addMinMaxOriginalConjunct(slotDesc.getParent(), binaryPred);
-      buildStatsPredicate(analyzer, slotRef, binaryPred, binaryPred.getOp());
+      addStatsOriginalConjunct(slotDesc.getParent(), binaryPred);
+      buildBinaryStatsPredicate(analyzer, slotRef, binaryPred, binaryPred.getOp());
     } else if (BinaryPredicate.IS_EQ_PREDICATE.apply(binaryPred)) {
-      addMinMaxOriginalConjunct(slotDesc.getParent(), binaryPred);
-      // TODO: this could be optimized for boolean columns.
-      buildStatsPredicate(analyzer, slotRef, binaryPred, BinaryPredicate.Operator.LE);
-      buildStatsPredicate(analyzer, slotRef, binaryPred, BinaryPredicate.Operator.GE);
+      addStatsOriginalConjunct(slotDesc.getParent(), binaryPred);
+      if (hasParquet(fileFormats_)) {
+        // TODO: this could be optimized for boolean columns.
+        buildBinaryStatsPredicate(analyzer, slotRef, binaryPred,
+            BinaryPredicate.Operator.LE);
+        buildBinaryStatsPredicate(analyzer, slotRef, binaryPred,
+            BinaryPredicate.Operator.GE);
+      }
+      if (fileFormats_.contains(HdfsFileFormat.ORC)) {
+        // We can push down EQ predicates to the ORC reader directly.
+        buildBinaryStatsPredicate(analyzer, slotRef, binaryPred, binaryPred.getOp());
+      }
     }
   }
 
-  private void tryComputeInListMinMaxPredicate(Analyzer analyzer, InPredicate inPred) {
+  private void tryComputeInListStatsPredicate(Analyzer analyzer, InPredicate inPred) {
     // Retrieve the left side of the IN predicate. It must be a simple slot to proceed.
     SlotRef slotRef = inPred.getBoundSlot();
     if (slotRef == null) return;
@@ -591,15 +624,14 @@ public class HdfsScanNode extends ScanNode {
     Preconditions.checkState(slotDesc.isScanSlot());
     // Skip the slot ref if it refers to an array's "pos" field.
     if (slotDesc.isArrayPosRef()) return;
-    // TODO(IMPALA-10882): Push down Min-Max predicates of CHAR/VARCHAR to ORC reader
-    // TODO(IMPALA-10915): Push down Min-Max predicates of TIMESTAMP to ORC reader
-    if (fileFormats_.contains(HdfsFileFormat.ORC) &&
-        (slotDesc.getType() == Type.CHAR || slotDesc.getType() == Type.VARCHAR ||
-            slotDesc.getType() == Type.TIMESTAMP)) {
-      return;
-    }
     if (inPred.isNotIn()) return;
+    if (fileFormats_.contains(HdfsFileFormat.ORC)) {
+      if (isUnsupportedStatsType(slotDesc.getType())) return;
+      addStatsOriginalConjunct(slotDesc.getParent(), inPred);
+      buildInListStatsPredicate(analyzer, slotRef, inPred);
+    }
 
+    if (!hasParquet(fileFormats_)) return;
     List<Expr> children = inPred.getChildren();
     LiteralExpr min = null;
     LiteralExpr max = null;
@@ -623,25 +655,44 @@ public class HdfsScanNode extends ScanNode {
     BinaryPredicate maxBound = new BinaryPredicate(BinaryPredicate.Operator.LE,
         children.get(0).clone(), max.clone());
 
-    addMinMaxOriginalConjunct(slotDesc.getParent(), inPred);
-    buildStatsPredicate(analyzer, slotRef, minBound, minBound.getOp());
-    buildStatsPredicate(analyzer, slotRef, maxBound, maxBound.getOp());
+    addStatsOriginalConjunct(slotDesc.getParent(), inPred);
+    buildBinaryStatsPredicate(analyzer, slotRef, minBound, minBound.getOp());
+    buildBinaryStatsPredicate(analyzer, slotRef, maxBound, maxBound.getOp());
   }
 
-  private void addMinMaxOriginalConjunct(TupleDescriptor tupleDesc, Expr expr) {
-    List<Expr> exprs = minMaxOriginalConjuncts_.get(tupleDesc);
-    if (exprs == null) {
-      exprs = new ArrayList<>();
-      minMaxOriginalConjuncts_.put(tupleDesc, exprs);
-    }
+  private void tryComputeIsNullStatsPredicate(Analyzer analyzer,
+      IsNullPredicate isNullPred) {
+    // Currently, only ORC table can push down IS-NULL predicates.
+    if (!fileFormats_.contains(HdfsFileFormat.ORC)) return;
+    // Retrieve the left side of the IS-NULL predicate. Skip if it's not a simple slot.
+    SlotRef slotRef = isNullPred.getBoundSlot();
+    if (slotRef == null) return;
+    // This node is a table scan, so this must be a scanning slot.
+    Preconditions.checkState(slotRef.getDesc().isScanSlot());
+    // Skip the slot ref if it refers to an array's "pos" field.
+    if (slotRef.getDesc().isArrayPosRef()) return;
+    addStatsOriginalConjunct(slotRef.getDesc().getParent(), isNullPred);
+    SlotDescriptor slotDesc = analyzer.getDescTbl().copySlotDescriptor(statsTuple_,
+        slotRef.getDesc());
+    SlotRef slot = new SlotRef(slotDesc);
+    IsNullPredicate statsPred = new IsNullPredicate(slot, isNullPred.isNotNull());
+    statsPred.analyzeNoThrow(analyzer);
+    statsConjuncts_.add(statsPred);
+  }
+
+  private void addStatsOriginalConjunct(TupleDescriptor tupleDesc, Expr expr) {
+    List<Expr> exprs = statsOriginalConjuncts_.computeIfAbsent(
+        tupleDesc, k -> new ArrayList<>());
     exprs.add(expr);
   }
 
-  private void tryComputeMinMaxPredicate(Analyzer analyzer, Expr pred) {
+  private void tryComputeStatsPredicate(Analyzer analyzer, Expr pred) {
     if (pred instanceof BinaryPredicate) {
-      tryComputeBinaryMinMaxPredicate(analyzer, (BinaryPredicate) pred);
+      tryComputeBinaryStatsPredicate(analyzer, (BinaryPredicate) pred);
     } else if (pred instanceof InPredicate) {
-      tryComputeInListMinMaxPredicate(analyzer, (InPredicate) pred);
+      tryComputeInListStatsPredicate(analyzer, (InPredicate) pred);
+    } else if (pred instanceof IsNullPredicate) {
+      tryComputeIsNullStatsPredicate(analyzer, (IsNullPredicate) pred);
     }
   }
 
@@ -660,28 +711,29 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
-   * Analyzes 'conjuncts_' and 'collectionConjuncts_', populates 'minMaxTuple_' with slots
-   * for statistics values, and populates 'minMaxConjuncts_' with conjuncts pointing into
-   * the 'minMaxTuple_'. Only conjuncts of the form <slot> <op> <constant> are supported,
-   * and <op> must be one of LT, LE, GE, GT, or EQ.
+   * Analyzes 'conjuncts_' and 'collectionConjuncts_', populates 'statsTuple_' with slots
+   * for statistics values, and populates 'statsConjuncts_' with conjuncts pointing into
+   * the 'statsTuple_'. Binary conjuncts of the form <slot> <op> <constant> are supported,
+   * and <op> must be one of LT, LE, GE, GT, or EQ. IN-list and IS-NULL conjuncts are also
+   * supported.
    */
-  private void computeMinMaxTupleAndConjuncts(Analyzer analyzer) throws ImpalaException{
+  private void computeStatsTupleAndConjuncts(Analyzer analyzer) throws ImpalaException{
     Preconditions.checkNotNull(desc_.getPath());
     String tupleName = desc_.getPath().toString() + " statistics";
     DescriptorTable descTbl = analyzer.getDescTbl();
-    minMaxTuple_ = descTbl.createTupleDescriptor(tupleName);
-    minMaxTuple_.setPath(desc_.getPath());
+    statsTuple_ = descTbl.createTupleDescriptor(tupleName);
+    statsTuple_.setPath(desc_.getPath());
 
     // Adds predicates for scalar, top-level columns.
-    for (Expr pred: conjuncts_) tryComputeMinMaxPredicate(analyzer, pred);
+    for (Expr pred: conjuncts_) tryComputeStatsPredicate(analyzer, pred);
 
     // Adds predicates for collections.
     for (Map.Entry<TupleDescriptor, List<Expr>> entry: collectionConjuncts_.entrySet()) {
       if (notEmptyCollections_.contains(entry.getKey())) {
-        for (Expr pred: entry.getValue()) tryComputeMinMaxPredicate(analyzer, pred);
+        for (Expr pred: entry.getValue()) tryComputeStatsPredicate(analyzer, pred);
       }
     }
-    minMaxTuple_.computeMemLayout();
+    statsTuple_.computeMemLayout();
   }
 
   /**
@@ -689,11 +741,11 @@ public class HdfsScanNode extends ScanNode {
    */
   public void initOverlapPredicate(Analyzer analyzer) {
     if (!allParquet_) return;
-    Preconditions.checkNotNull(minMaxTuple_);
+    Preconditions.checkNotNull(statsTuple_);
     // Allow the tuple to accept new slots.
-    minMaxTuple_.resetHasMemoryLayout();
+    statsTuple_.resetHasMemoryLayout();
 
-    overlap_first_slot_idx_ = minMaxTuple_.getSlots().size();
+    overlap_first_slot_idx_ = statsTuple_.getSlots().size();
   }
 
   /**
@@ -799,7 +851,7 @@ public class HdfsScanNode extends ScanNode {
     // Check if targetExpr refers to some column in one of the min/max conjuncts
     // already formed. If so, do not add an overlap predicate as it may not be
     // as effective as the conjunct.
-    List<SlotDescriptor> slotDescs = minMaxTuple_.getSlots();
+    List<SlotDescriptor> slotDescs = statsTuple_.getSlots();
     for (int i=0; i<overlap_first_slot_idx_; i++) {
       if (slotDescs.get(i).getPath() == slotRefInScan.getDesc().getPath())
         return false;
@@ -837,13 +889,13 @@ public class HdfsScanNode extends ScanNode {
       }
     }
 
-    int firstSlotIdx = minMaxTuple_.getSlots().size();
+    int firstSlotIdx = statsTuple_.getSlots().size();
     // Make two new slot descriptors to hold data min/max values (such as from
     // row groups or pages in Parquet)) and append them to the tuple descriptor.
     SlotDescriptor slotDescDataMin =
-        analyzer.getDescTbl().copySlotDescriptor(minMaxTuple_, slotRefInScan.getDesc());
+        analyzer.getDescTbl().copySlotDescriptor(statsTuple_, slotRefInScan.getDesc());
     SlotDescriptor slotDescDataMax =
-        analyzer.getDescTbl().copySlotDescriptor(minMaxTuple_, slotRefInScan.getDesc());
+        analyzer.getDescTbl().copySlotDescriptor(statsTuple_, slotRefInScan.getDesc());
 
     overlapPredicateDescs_.add(
         new TOverlapPredicateDesc(filter.getFilterId().asInt(), firstSlotIdx));
@@ -854,7 +906,7 @@ public class HdfsScanNode extends ScanNode {
   public void finalizeOverlapPredicate() {
     if (!allParquet_) return;
     // Recompute the memory layout for the min/max tuple.
-    minMaxTuple_.computeMemLayout();
+    statsTuple_.computeMemLayout();
   }
 
   /**
@@ -1697,14 +1749,14 @@ public class HdfsScanNode extends ScanNode {
       msg.hdfs_scan_node.setParquet_count_star_slot_offset(
           countStarSlot_.getByteOffset());
     }
-    if (!minMaxConjuncts_.isEmpty()) {
-      for (Expr e: minMaxConjuncts_) {
-        msg.hdfs_scan_node.addToMin_max_conjuncts(e.treeToThrift());
+    if (!statsConjuncts_.isEmpty()) {
+      for (Expr e: statsConjuncts_) {
+        msg.hdfs_scan_node.addToStats_conjuncts(e.treeToThrift());
       }
     }
 
-    if ( minMaxTuple_ != null ) {
-      msg.hdfs_scan_node.setMin_max_tuple_id(minMaxTuple_.getId().asInt());
+    if (statsTuple_ != null) {
+      msg.hdfs_scan_node.setStats_tuple_id(statsTuple_.getId().asInt());
     }
 
     Map<Integer, List<Integer>> dictMap = new LinkedHashMap<>();
@@ -1851,7 +1903,7 @@ public class HdfsScanNode extends ScanNode {
       String prefix, TExplainLevel detailLevel) {
     StringBuilder output = new StringBuilder();
     for (Map.Entry<TupleDescriptor, List<Expr>> entry :
-        minMaxOriginalConjuncts_.entrySet()) {
+        statsOriginalConjuncts_.entrySet()) {
       TupleDescriptor tupleDesc = entry.getKey();
       List<Expr> exprs = entry.getValue();
       String fileFormatStr;
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 78094a3..52c16cc 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -183,6 +183,35 @@ FROM {db_name}.{table_name};
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+alltypessmall_bool_sorted
+---- PARTITION_COLUMNS
+year int
+month int
+---- COLUMNS
+id int
+bool_col boolean
+tinyint_col tinyint
+smallint_col smallint
+int_col int
+bigint_col bigint
+float_col float
+double_col double
+date_string_col string
+string_col string
+timestamp_col timestamp
+---- DEPENDENT_LOAD_HIVE
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} partition (year, month)
+SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+FROM {db_name}.alltypessmall
+where bool_col;
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} partition (year, month)
+SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+FROM {db_name}.alltypessmall
+where not bool_col;
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 alltypestiny
 ---- PARTITION_COLUMNS
 year int
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index c359af1..87e5e61 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -309,3 +309,6 @@ table_name:uncomp_src_alltypes, constraint:restrict_to, table_format:orc/def/blo
 table_name:uncomp_src_decimal_tbl, constraint:restrict_to, table_format:orc/def/block
 
 table_name:part_strings_with_quotes, constraint:restrict_to, table_format:text/none/none
+
+# 'alltypessmall_bool_sorted' only used in ORC tests.
+table_name:alltypessmall_bool_sorted, constraint:restrict_to, table_format:orc/def/block
diff --git a/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test b/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test
index a53a7d8..c878a29 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test
@@ -203,6 +203,20 @@ select count(*) from functional_orc_def.alltypessmall where bigint_col = 100
 aggregation(SUM, RowsRead): 0
 ====
 ---- QUERY
+select count(*) from functional_orc_def.alltypessmall_bool_sorted where bool_col = true;
+---- RESULTS
+50
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 50
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall_bool_sorted where bool_col = false;
+---- RESULTS
+50
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 50
+====
+---- QUERY
 # Test on predicate x < min_val for float. No rows should be read by the ORC reader.
 select count(*) from functional_orc_def.alltypessmall where float_col < 0
 ---- RESULTS
@@ -211,6 +225,14 @@ select count(*) from functional_orc_def.alltypessmall where float_col < 0
 aggregation(SUM, RowsRead): 0
 ====
 ---- QUERY
+# Test on predicate x < a for float that can't filter out any RowGroups.
+select count(*) from functional_orc_def.alltypessmall where float_col < 1
+---- RESULTS
+12
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
 # Test on predicate x > max_val for float. No rows should be read by the ORC reader.
 select count(*) from functional_orc_def.alltypessmall where float_col > 9.9
 ---- RESULTS
@@ -219,6 +241,14 @@ select count(*) from functional_orc_def.alltypessmall where float_col > 9.9
 aggregation(SUM, RowsRead): 0
 ====
 ---- QUERY
+# Test on predicate x > a for float that can't filter out any RowGroups.
+select count(*) from functional_orc_def.alltypessmall where float_col > 9
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
 # Test on predicate x < min_val for double. No rows should be read by the ORC reader.
 select count(*) from functional_orc_def.alltypessmall where double_col < 0
 ---- RESULTS
@@ -227,6 +257,14 @@ select count(*) from functional_orc_def.alltypessmall where double_col < 0
 aggregation(SUM, RowsRead): 0
 ====
 ---- QUERY
+# Test on predicate x < a for double that can't filter out any RowGroups.
+select count(*) from functional_orc_def.alltypessmall where double_col < 1
+---- RESULTS
+12
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
 # Test on predicate x > max_val for double. No rows should be read by the ORC reader.
 select count(*) from functional_orc_def.alltypessmall where double_col > 99
 ---- RESULTS
@@ -235,6 +273,14 @@ select count(*) from functional_orc_def.alltypessmall where double_col > 99
 aggregation(SUM, RowsRead): 0
 ====
 ---- QUERY
+# Test on predicate x > a for double that can't filter out any RowGroups.
+select count(*) from functional_orc_def.alltypessmall where double_col > 90
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
 # Test on predicate x < min_val for string. No rows should be read by the ORC reader.
 select count(*) from functional_orc_def.alltypessmall where string_col < "0"
 ---- RESULTS
@@ -480,6 +526,14 @@ aggregation(SUM, RowsRead): 0
 ====
 ---- QUERY
 select count(*) from functional_orc_def.date_tbl
+where date_part in ("2017-11-27", "1399-06-27") and date_col <= '0001-06-19';
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 10
+====
+---- QUERY
+select count(*) from functional_orc_def.date_tbl
 where date_part in ("2017-11-27", "1399-06-27") and date_col > '2018-12-31';
 ---- RESULTS
 0
@@ -487,6 +541,22 @@ where date_part in ("2017-11-27", "1399-06-27") and date_col > '2018-12-31';
 aggregation(SUM, RowsRead): 0
 ====
 ---- QUERY
+select count(*) from functional_orc_def.date_tbl
+where date_part in ("2017-11-27", "1399-06-27") and date_col >= '2018-12-31';
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+select count(*) from functional_orc_def.date_tbl
+where date_part in ("2017-11-27", "1399-06-27") and date_col = '2018-12-31';
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
 # Test predicates on CHAR type. They are not pushed down (IMPALA-10882). Just make sure
 # we don't hit DCHECKs.
 select count(*) from functional_orc_def.chars_tiny where cs < cast('1aaaa' as char(5));
@@ -540,3 +610,254 @@ select count(*) from tpch_orc_def.lineitem where l_orderkey = 1609411;
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 13501
 ====
+---- QUERY
+# Test pushing down IN-list predicate with literals outside the value range.
+select count(*) from functional_orc_def.alltypessmall where tinyint_col in (-1, 10);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals inside the value range.
+select count(*) from functional_orc_def.alltypessmall where tinyint_col in (1, 5);
+---- RESULTS
+20
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals outside the value range.
+select count(*) from functional_orc_def.alltypessmall where smallint_col in (-1, 10);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals inside the value range.
+select count(*) from functional_orc_def.alltypessmall where smallint_col in (1, 5);
+---- RESULTS
+20
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals outside the value range.
+select count(*) from functional_orc_def.alltypessmall where int_col in (-1, 10);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals inside the value range.
+select count(*) from functional_orc_def.alltypessmall where int_col in (1, 5);
+---- RESULTS
+20
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals inside the value range.
+# The table has 4 files. 3 of them will be skipped.
+select count(*) from functional_orc_def.alltypessmall where id in (1, 3, 5);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 25
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals outside the value range.
+select count(*) from functional_orc_def.alltypessmall where bigint_col in (-1, 91);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals inside the value range.
+select count(*) from functional_orc_def.alltypessmall where bigint_col in (10, 50);
+---- RESULTS
+20
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall_bool_sorted where bool_col in (true);
+---- RESULTS
+50
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 50
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall_bool_sorted where bool_col in (false);
+---- RESULTS
+50
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 50
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals outside the value range.
+# The range of string_col is ['0', '9']. '/' is smaller than '0' and 'A' is larger
+# than '9'.
+select count(*) from functional_orc_def.alltypessmall where string_col in ('/', 'A');
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals inside the value range.
+select count(*) from functional_orc_def.alltypessmall where string_col in ('1', '5');
+---- RESULTS
+20
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+select count(*) from functional_orc_def.date_tbl
+where date_part in ("2017-11-27", "1399-06-27") and date_col in ('0001-06-18', '2019-12-31');
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.date_tbl
+where date_part in ("2017-11-27", "1399-06-27") and date_col in ('0001-06-19', '2018-12-31');
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 13
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals outside the value range. Explicitly cast
+# the literals so FE won't wrap 'd1' as casting to INT, so the predicate can be pushed down.
+# Due to ORC-517 not included in the current Hive version (3.1.3000.7.2.12.0-104),
+# the ORC files have wrong statistics on d1 column showing that its minimum is 0. So here we
+# use -1 to be smaller than it. The max of d1 is 132842 so we use 132843.
+select count(*) from functional_orc_def.decimal_tbl
+where d1 in (cast(-1 as decimal(9,0)), cast(132843 as decimal(9,0)));
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl
+where d1 in (cast(0 as decimal(9,0)), cast(132842 as decimal(9,0)));
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 5
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl
+where d1 in (cast(1234 as decimal(9,0)), cast(2345 as decimal(9,0)), cast(12345 as decimal(9,0)));
+---- RESULTS
+4
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 5
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl
+where d3 in (cast(-1 as decimal(20,10)), cast(12346 as decimal(20,10)));
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl
+where d3 in (cast(2 as decimal(20,10)), cast(200 as decimal(20,10)));
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 5
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl
+where d3 in (cast(12.3456789 as decimal(20,10)), cast(12345.6789 as decimal(20,10)));
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 5
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where bool_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where tinyint_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where smallint_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where int_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where bigint_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where float_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where double_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where string_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where timestamp_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl where d1 is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl where d4 is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====