You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2021/10/21 17:32:38 UTC
[impala] 02/04: IMPALA-10873: Push down EQUALS,
IS NULL and IN-list predicate to ORC reader
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit c127b6b1a72d83778378dc181d9328b4cb7dea9e
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Fri Aug 27 15:39:52 2021 +0800
IMPALA-10873: Push down EQUALS, IS NULL and IN-list predicate to ORC reader
This patch pushs down more kinds of predicates into the ORC reader,
including EQUALS, IN-list, and IS-NULL predicates to have more
improvements:
- EQUALS and IN-list predicates can be evaluated inside the ORC reader
with bloom filters in the ORC files.
- Comparing to scanning parquet that converting an IN-list predicate
into two binary predicates (i.e. LE and GE), the ORC reader can
leverage IN-list predicates to skip ORC RowGroups. E.g. a RowGroup
with int column 'x' in range [1, 100] will be skipped if we push down
predicate "x in (0, 101)".
- IS-NULL predicates (including IS-NOT-NULL) can also be used in the
ORC reader to skip RowGroups.
Implementation:
FE will collect these kinds of predicates into 'min_max_conjuncts' of
THdfsScanNode. To better reflect the meaning, 'min_max_conjuncts' is
renamed to 'stats_conjuncts'. Same for other related variable names.
Parquet scanner will only pick binary min-max conjuncts (i.e. LT, GT,
LE, and GE) to keep the existing behavior. ORC scanner will build
SearchArgument based on all these conjuncts.
Tests
* Add a new test table 'alltypessmall_bool_sorted' which has files
contiaining sorted bool values.
* Add test in orc-stats.test
Change-Id: Iaa89f080fe2e87d94fc8ea7f1be83e087fa34225
Reviewed-on: http://gerrit.cloudera.org:8080/17815
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Qifan Chen <qc...@cloudera.com>
---
be/src/exec/hdfs-orc-scanner.cc | 167 +++++++----
be/src/exec/hdfs-orc-scanner.h | 22 +-
be/src/exec/hdfs-scan-node-base.cc | 46 +--
be/src/exec/hdfs-scan-node-base.h | 22 +-
be/src/exec/parquet/hdfs-parquet-scanner.cc | 53 ++--
be/src/exec/parquet/hdfs-parquet-scanner.h | 18 +-
bin/impala-config.sh | 4 +-
common/thrift/PlanNodes.thrift | 10 +-
.../org/apache/impala/planner/HdfsScanNode.java | 208 ++++++++-----
.../functional/functional_schema_template.sql | 29 ++
.../datasets/functional/schema_constraints.csv | 3 +
.../queries/QueryTest/orc-stats.test | 321 +++++++++++++++++++++
12 files changed, 694 insertions(+), 209 deletions(-)
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index b02908c..3ac7361 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -301,7 +301,7 @@ void HdfsOrcScanner::Close(RowBatch* row_batch) {
orc_root_batch_.reset(nullptr);
search_args_pool_->FreeAll();
- ScalarExprEvaluator::Close(min_max_conjunct_evals_, state_);
+ ScalarExprEvaluator::Close(stats_conjunct_evals_, state_);
// Verify all resources (if any) have been transferred.
DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
@@ -953,9 +953,10 @@ orc::PredicateDataType HdfsOrcScanner::GetOrcPredicateDataType(const ColumnType&
}
orc::Literal HdfsOrcScanner::GetSearchArgumentLiteral(ScalarExprEvaluator* eval,
- const ColumnType& dst_type, orc::PredicateDataType* predicate_type) {
- DCHECK(eval->root().GetNumChildren() == 2);
- ScalarExpr* literal_expr = eval->root().GetChild(1);
+ int child_idx, const ColumnType& dst_type, orc::PredicateDataType* predicate_type) {
+ DCHECK_GE(child_idx, 1);
+ DCHECK_LT(child_idx, eval->root().GetNumChildren());
+ ScalarExpr* literal_expr = eval->root().GetChild(child_idx);
const ColumnType& type = literal_expr->type();
DCHECK(literal_expr->IsLiteral());
*predicate_type = GetOrcPredicateDataType(type);
@@ -1049,79 +1050,141 @@ orc::Literal HdfsOrcScanner::GetSearchArgumentLiteral(ScalarExprEvaluator* eval,
}
}
+bool HdfsOrcScanner::PrepareBinaryPredicate(const string& fn_name, uint64_t orc_column_id,
+ const ColumnType& type, ScalarExprEvaluator* eval,
+ orc::SearchArgumentBuilder* sarg) {
+ orc::PredicateDataType predicate_type;
+ orc::Literal literal = GetSearchArgumentLiteral(eval, /*child_idx*/1, type,
+ &predicate_type);
+ if (literal.isNull()) {
+ VLOG_FILE << "Failed to push down predicate " << eval->root().DebugString();
+ return false;
+ }
+ if (fn_name == "lt") {
+ sarg->lessThan(orc_column_id, predicate_type, literal);
+ } else if (fn_name == "gt") {
+ sarg->startNot()
+ .lessThanEquals(orc_column_id, predicate_type, literal)
+ .end();
+ } else if (fn_name == "le") {
+ sarg->lessThanEquals(orc_column_id, predicate_type, literal);
+ } else if (fn_name == "ge") {
+ sarg->startNot()
+ .lessThan(orc_column_id, predicate_type, literal)
+ .end();
+ } else if (fn_name == "eq") {
+ sarg->equals(orc_column_id, predicate_type, literal);
+ } else {
+ return false;
+ }
+ return true;
+}
+
+bool HdfsOrcScanner::PrepareInListPredicate(uint64_t orc_column_id,
+ const ColumnType& type, ScalarExprEvaluator* eval,
+ orc::SearchArgumentBuilder* sarg) {
+ std::vector<orc::Literal> in_list;
+ // Initialize 'predicate_type' to avoid clang-tidy warning.
+ orc::PredicateDataType predicate_type = orc::PredicateDataType::BOOLEAN;
+ for (int i = 1; i < eval->root().children().size(); ++i) {
+ // ORC reader only supports pushing down predicates whose constant parts are literal.
+ // FE shouldn't push down any non-literal expr here.
+ DCHECK(eval->root().GetChild(i)->IsLiteral())
+ << "Non-literal constant expr cannot be used";
+ in_list.emplace_back(GetSearchArgumentLiteral(eval, i, type, &predicate_type));
+ }
+ // The ORC library requires IN-list has at least 2 literals. Converting to EQUALS
+ // when there is one.
+ if (in_list.size() == 1) {
+ sarg->equals(orc_column_id, predicate_type, in_list[0]);
+ } else if (in_list.size() > 1) {
+ sarg->in(orc_column_id, predicate_type, in_list);
+ } else {
+ DCHECK(false) << "Empty IN-list should cause syntax error";
+ return false;
+ }
+ return true;
+}
+
+void HdfsOrcScanner::PrepareIsNullPredicate(bool is_not_null, uint64_t orc_column_id,
+ const ColumnType& type, orc::SearchArgumentBuilder* sarg) {
+ orc::PredicateDataType orc_type = GetOrcPredicateDataType(type);
+ if (is_not_null) {
+ sarg->startNot()
+ .isNull(orc_column_id, orc_type)
+ .end();
+ } else {
+ sarg->isNull(orc_column_id, orc_type);
+ }
+}
+
Status HdfsOrcScanner::PrepareSearchArguments() {
if (!state_->query_options().orc_read_statistics) return Status::OK();
- const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
- if (!min_max_tuple_desc) return Status::OK();
+ const TupleDescriptor* stats_tuple_desc = scan_node_->stats_tuple_desc();
+ if (!stats_tuple_desc) return Status::OK();
// Clone the min/max statistics conjuncts.
RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,
expr_perm_pool_.get(), context_->expr_results_pool(),
- scan_node_->min_max_conjunct_evals(), &min_max_conjunct_evals_));
+ scan_node_->stats_conjunct_evals(), &stats_conjunct_evals_));
std::unique_ptr<orc::SearchArgumentBuilder> sarg =
orc::SearchArgumentFactory::newBuilder();
bool sargs_supported = false;
+ const orc::Type* node = nullptr;
+ bool pos_field;
+ bool missing_field;
- DCHECK_EQ(min_max_tuple_desc->slots().size(), min_max_conjunct_evals_.size());
- for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) {
- SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i];
- ScalarExprEvaluator* eval = min_max_conjunct_evals_[i];
+ DCHECK_EQ(stats_tuple_desc->slots().size(), stats_conjunct_evals_.size());
+ for (int i = 0; i < stats_conjunct_evals_.size(); ++i) {
+ SlotDescriptor* slot_desc = stats_tuple_desc->slots()[i];
+ // Resolve column path to determine col idx in file schema.
+ RETURN_IF_ERROR(schema_resolver_->ResolveColumn(slot_desc->col_path(),
+ &node, &pos_field, &missing_field));
+ if (pos_field || missing_field) continue;
+
+ ScalarExprEvaluator* eval = stats_conjunct_evals_[i];
+ const string& fn_name = eval->root().function_name();
+ if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
+ PrepareIsNullPredicate(fn_name == "is_not_null_pred", node->getColumnId(),
+ slot_desc->type(), sarg.get());
+ sargs_supported = true;
+ continue;
+ }
ScalarExpr* const_expr = eval->root().GetChild(1);
- // ORC reader only supports pushing down predicates that constant parts are literal.
+ // ORC reader only supports pushing down predicates whose constant parts are literal.
// We could get non-literal expr if expr rewrites are disabled.
if (!const_expr->IsLiteral()) continue;
- // TODO(IMPALA-10882): push down min-max predicates on CHAR/VARCHAR.
- if (const_expr->type().type == TYPE_CHAR || const_expr->type().type == TYPE_VARCHAR
- || slot_desc->type().type == TYPE_CHAR
- || slot_desc->type().type == TYPE_VARCHAR) {
- continue;
- }
+ // TODO: push down stats predicates on CHAR/VARCHAR(IMPALA-10882) and
+ // TIMESTAMP(IMPALA-10915) to ORC reader
+ DCHECK(const_expr->type().type != TYPE_CHAR);
+ DCHECK(const_expr->type().type != TYPE_VARCHAR);
+ DCHECK(const_expr->type().type != TYPE_TIMESTAMP);
+ DCHECK(slot_desc->type().type != TYPE_CHAR);
+ DCHECK(slot_desc->type().type != TYPE_VARCHAR);
+ DCHECK(slot_desc->type().type != TYPE_TIMESTAMP) << "FE should skip such predicates";
// TODO(IMPALA-10916): dealing with lhs that is a simple cast expr.
if (GetOrcPredicateDataType(slot_desc->type()) !=
GetOrcPredicateDataType(const_expr->type())) {
continue;
}
-
- //Resolve column path to determine col idx.
- const orc::Type* node = nullptr;
- bool pos_field;
- bool missing_field;
- RETURN_IF_ERROR(schema_resolver_->ResolveColumn(slot_desc->col_path(),
- &node, &pos_field, &missing_field));
-
- if (pos_field || missing_field) continue;
- // TODO(IMPALA-10882): push down min-max predicates on CHAR/VARCHAR.
- if (node->getKind() == orc::CHAR || node->getKind() == orc::VARCHAR) continue;
-
- const string& fn_name = eval->root().function_name();
- orc::PredicateDataType predicate_type;
- orc::Literal literal =
- GetSearchArgumentLiteral(eval, slot_desc->type(), &predicate_type);
- if (literal.isNull()) {
- VLOG_QUERY << "Failed to push down predicate " << eval->root().DebugString();
+ // Skip if the file schema contains unsupported types.
+ // TODO: push down stats predicates on CHAR/VARCHAR(IMPALA-10882) and
+ // TIMESTAMP(IMPALA-10915) to ORC reader
+ if (node->getKind() == orc::CHAR
+ || node->getKind() == orc::VARCHAR
+ || node->getKind() == orc::TIMESTAMP) {
continue;
}
- if (fn_name == "lt") {
- sarg->lessThan(node->getColumnId(), predicate_type, literal);
- } else if (fn_name == "gt") {
- sarg->startNot()
- .lessThanEquals(node->getColumnId(), predicate_type, literal)
- .end();
- } else if (fn_name == "le") {
- sarg->lessThanEquals(node->getColumnId(), predicate_type, literal);
- } else if (fn_name == "ge") {
- sarg->startNot()
- .lessThan(node->getColumnId(), predicate_type, literal)
- .end();
- } else {
- DCHECK(false) << "Invalid predicate: " << fn_name;
+ if (fn_name == "in_iterate" || fn_name == "in_set_lookup") {
+ sargs_supported |= PrepareInListPredicate(
+ node->getColumnId(), slot_desc->type(), eval, sarg.get());
continue;
}
- // If we have reached this far, we have a valid search arg that we can build later.
- sargs_supported = true;
+ sargs_supported |= PrepareBinaryPredicate(fn_name, node->getColumnId(),
+ slot_desc->type(), eval, sarg.get());
}
if (sargs_supported) {
try {
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 6f2aacf..0e7c0cb 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -180,9 +180,9 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
/// Pool to copy values into when building search arguments. Freed on Close().
boost::scoped_ptr<MemPool> search_args_pool_;
- /// Clone of Min/max statistics conjunct evaluators. Has the same lifetime as
- /// the scanner. Stored in 'obj_pool_'.
- vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
+ /// Clone of statistics conjunct evaluators. Has the same lifetime as the scanner.
+ /// Stored in 'obj_pool_'.
+ vector<ScalarExprEvaluator*> stats_conjunct_evals_;
std::unique_ptr<OrcSchemaResolver> schema_resolver_ = nullptr;
@@ -318,7 +318,15 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
void SetSyntheticAcidFieldForOriginalFile(const SlotDescriptor* slot_desc,
Tuple* template_tuple);
- /// Clones the min/max conjucts into min_max_conjunct_evals_, then builds ORC search
+ bool PrepareBinaryPredicate(const string& fn_name, uint64_t orc_column_id,
+ const ColumnType& type, ScalarExprEvaluator* eval,
+ orc::SearchArgumentBuilder* sarg);
+ bool PrepareInListPredicate(uint64_t orc_column_id, const ColumnType& type,
+ ScalarExprEvaluator* eval, orc::SearchArgumentBuilder* sarg);
+ void PrepareIsNullPredicate(bool is_not_null, uint64_t orc_column_id,
+ const ColumnType& type, orc::SearchArgumentBuilder* sarg);
+
+ /// Clones the stats conjucts into stats_conjunct_evals_, then builds ORC search
/// arguments from the conjuncts. The search arguments will exist for the lifespan of
/// the scanner and need not to be updated.
Status PrepareSearchArguments() WARN_UNUSED_RESULT;
@@ -333,9 +341,9 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
/// Helper function for mapping ColumnType to orc::PredicateDataType.
static orc::PredicateDataType GetOrcPredicateDataType(const ColumnType& type);
- /// Returns the literal from the min/max conjucts, with the assumption that the
- /// evaluator has exactly two children, where the second is a literal.
- orc::Literal GetSearchArgumentLiteral(ScalarExprEvaluator* eval,
+ /// Returns the literal specified by 'child_idx' from the stats conjuct 'eval',
+ /// with the assumption that the specifit child is a literal.
+ orc::Literal GetSearchArgumentLiteral(ScalarExprEvaluator* eval, int child_idx,
const ColumnType& dst_type, orc::PredicateDataType* predicate_type);
};
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index a3a6338..da9f110 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -213,15 +213,15 @@ Status HdfsScanPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
DCHECK(conjuncts_map_[tuple_id].empty());
conjuncts_map_[tuple_id] = conjuncts_;
- // Add min max conjuncts
- if (tnode.hdfs_scan_node.__isset.min_max_tuple_id) {
- TupleDescriptor* min_max_tuple_desc =
- state->desc_tbl().GetTupleDescriptor(tnode.hdfs_scan_node.min_max_tuple_id);
- DCHECK(min_max_tuple_desc != nullptr);
- RowDescriptor* min_max_row_desc = state->obj_pool()->Add(
- new RowDescriptor(min_max_tuple_desc, /* is_nullable */ false));
- RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts,
- *min_max_row_desc, state, &min_max_conjuncts_));
+ // Add stats conjuncts
+ if (tnode.hdfs_scan_node.__isset.stats_tuple_id) {
+ TupleDescriptor* stats_tuple_desc =
+ state->desc_tbl().GetTupleDescriptor(tnode.hdfs_scan_node.stats_tuple_id);
+ DCHECK(stats_tuple_desc != nullptr);
+ RowDescriptor* stats_row_desc = state->obj_pool()->Add(
+ new RowDescriptor(stats_tuple_desc, /* is_nullable */ false));
+ RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.stats_conjuncts,
+ *stats_row_desc, state, &stats_conjuncts_));
}
// Transfer overlap predicate descs.
@@ -372,7 +372,7 @@ void HdfsScanPlanNode::Close() {
if (tid_conjunct.first == tuple_id) continue;
ScalarExpr::Close(tid_conjunct.second);
}
- ScalarExpr::Close(min_max_conjuncts_);
+ ScalarExpr::Close(stats_conjuncts_);
if (shared_state_.template_pool_.get() != nullptr) {
shared_state_.template_pool_->FreeAll();
}
@@ -392,11 +392,11 @@ Status HdfsScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) co
HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const HdfsScanPlanNode& pnode,
const THdfsScanNode& hdfs_scan_node, const DescriptorTbl& descs)
: ScanNode(pool, pnode, descs),
- min_max_tuple_id_(
- hdfs_scan_node.__isset.min_max_tuple_id ? hdfs_scan_node.min_max_tuple_id : -1),
- min_max_conjuncts_(pnode.min_max_conjuncts_),
- min_max_tuple_desc_(
- min_max_tuple_id_ == -1 ? nullptr : descs.GetTupleDescriptor(min_max_tuple_id_)),
+ stats_tuple_id_(
+ hdfs_scan_node.__isset.stats_tuple_id ? hdfs_scan_node.stats_tuple_id : -1),
+ stats_conjuncts_(pnode.stats_conjuncts_),
+ stats_tuple_desc_(
+ stats_tuple_id_ == -1 ? nullptr : descs.GetTupleDescriptor(stats_tuple_id_)),
skip_header_line_count_(hdfs_scan_node.__isset.skip_header_line_count ?
hdfs_scan_node.skip_header_line_count :
0),
@@ -441,10 +441,10 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
}
}
- // Prepare min max statistics conjuncts.
- if (min_max_tuple_id_ != -1) {
- RETURN_IF_ERROR(ScalarExprEvaluator::Create(min_max_conjuncts_, state, pool_,
- expr_perm_pool(), expr_results_pool(), &min_max_conjunct_evals_));
+ // Prepare stats statistics conjuncts.
+ if (stats_tuple_id_ != -1) {
+ RETURN_IF_ERROR(ScalarExprEvaluator::Create(stats_conjuncts_, state, pool_,
+ expr_perm_pool(), expr_results_pool(), &stats_conjunct_evals_));
}
// Check if reservation was enough to allocate at least one buffer. The
@@ -521,8 +521,8 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
RETURN_IF_ERROR(ScalarExprEvaluator::Open(entry.second, state));
}
- // Open min max conjuncts
- RETURN_IF_ERROR(ScalarExprEvaluator::Open(min_max_conjunct_evals_, state));
+ // Open stats conjuncts
+ RETURN_IF_ERROR(ScalarExprEvaluator::Open(stats_conjunct_evals_, state));
RETURN_IF_ERROR(ClaimBufferReservation(state));
reader_context_ = ExecEnv::GetInstance()->disk_io_mgr()->RegisterContext();
@@ -630,8 +630,8 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
ScalarExprEvaluator::Close(tid_conjunct_eval.second, state);
}
- // Close min max conjunct
- ScalarExprEvaluator::Close(min_max_conjunct_evals_, state);
+ // Close stats conjunct
+ ScalarExprEvaluator::Close(stats_conjunct_evals_, state);
ScanNode::Close(state);
}
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 4036bd9..e3d2ee8 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -276,13 +276,13 @@ class HdfsScanPlanNode : public ScanPlanNode {
typedef std::unordered_map<TupleId, std::vector<ScalarExpr*>> ConjunctsMap;
ConjunctsMap conjuncts_map_;
- /// Conjuncts to evaluate on parquet::Statistics.
- std::vector<ScalarExpr*> min_max_conjuncts_;
+ /// Conjuncts to evaluate on parquet::Statistics or to be pushed down to ORC reader.
+ std::vector<ScalarExpr*> stats_conjuncts_;
/// The list of overlap predicate descs. Each is used to find out whether the range
/// of values of a data item overlap with a min/max filter. Data structure wise, each
/// desc is composed of the ID of the min/max filter, the slot index in
- /// 'min_max_tuple_desc_' to hold the min value of the data item and the actual overlap
+ /// 'stats_tuple_desc_' to hold the min value of the data item and the actual overlap
/// predicate. The next slot after the slot index implicitly holds the max value of
/// the data item.
std::vector<TOverlapPredicateDesc> overlap_predicate_descs_;
@@ -418,13 +418,13 @@ class HdfsScanNodeBase : public ScanNode {
/// Returns number of partition key slots.
int num_materialized_partition_keys() const { return partition_key_slots_.size(); }
- int min_max_tuple_id() const { return min_max_tuple_id_; }
+ int stats_tuple_id() const { return stats_tuple_id_; }
- const std::vector<ScalarExprEvaluator*>& min_max_conjunct_evals() const {
- return min_max_conjunct_evals_;
+ const std::vector<ScalarExprEvaluator*>& stats_conjunct_evals() const {
+ return stats_conjunct_evals_;
}
- const TupleDescriptor* min_max_tuple_desc() const { return min_max_tuple_desc_; }
+ const TupleDescriptor* stats_tuple_desc() const { return stats_tuple_desc_; }
const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
const HdfsTableDescriptor* hdfs_table() const { return hdfs_table_; }
const AvroSchemaElement& avro_schema() const { return avro_schema_; }
@@ -626,14 +626,14 @@ class HdfsScanNodeBase : public ScanNode {
friend class HdfsScanner;
/// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics.
- const int min_max_tuple_id_;
+ const int stats_tuple_id_;
/// Conjuncts to evaluate on parquet::Statistics.
- const vector<ScalarExpr*>& min_max_conjuncts_;
- vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
+ const vector<ScalarExpr*>& stats_conjuncts_;
+ vector<ScalarExprEvaluator*> stats_conjunct_evals_;
/// Descriptor for the tuple used to evaluate conjuncts on parquet::Statistics.
- TupleDescriptor* min_max_tuple_desc_ = nullptr;
+ TupleDescriptor* stats_tuple_desc_ = nullptr;
// Number of header lines to skip at the beginning of each file of this table. Only set
// to values > 0 for hdfs text files.
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index d1df159..b1f0fdb 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -168,7 +168,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
perm_pool_.reset(new MemPool(scan_node_->mem_tracker()));
// Allocate tuple buffer to evaluate conjuncts on parquet::Statistics.
- const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
+ const TupleDescriptor* min_max_tuple_desc = scan_node_->stats_tuple_desc();
if (min_max_tuple_desc != nullptr) {
int64_t tuple_size = min_max_tuple_desc->byte_size();
uint8_t* buffer = perm_pool_->TryAllocate(tuple_size);
@@ -183,7 +183,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
// Clone the min/max statistics conjuncts.
RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,
expr_perm_pool_.get(), context_->expr_results_pool(),
- scan_node_->min_max_conjunct_evals(), &min_max_conjunct_evals_));
+ scan_node_->stats_conjunct_evals(), &stats_conjunct_evals_));
for (int i = 0; i < context->filter_ctxs().size(); ++i) {
const FilterContext* ctx = &context->filter_ctxs()[i];
@@ -293,7 +293,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
}
if (schema_resolver_.get() != nullptr) schema_resolver_.reset();
- ScalarExprEvaluator::Close(min_max_conjunct_evals_, state_);
+ ScalarExprEvaluator::Close(stats_conjunct_evals_, state_);
for (int i = 0; i < filter_ctxs_.size(); ++i) {
const FilterStats* stats = filter_ctxs_[i]->stats;
@@ -548,7 +548,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
if (!state_->query_options().parquet_read_statistics) return Status::OK();
- const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
+ const TupleDescriptor* min_max_tuple_desc = scan_node_->stats_tuple_desc();
if (!min_max_tuple_desc) return Status::OK();
int64_t tuple_size = min_max_tuple_desc->byte_size();
@@ -556,12 +556,13 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
DCHECK(min_max_tuple_ != nullptr);
min_max_tuple_->Init(tuple_size);
- DCHECK_GE(min_max_tuple_desc->slots().size(), min_max_conjunct_evals_.size());
- for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) {
+ DCHECK_GE(min_max_tuple_desc->slots().size(), stats_conjunct_evals_.size());
+ for (int i = 0; i < stats_conjunct_evals_.size(); ++i) {
SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i];
- ScalarExprEvaluator* eval = min_max_conjunct_evals_[i];
-
+ ScalarExprEvaluator* eval = stats_conjunct_evals_[i];
+ const string& fn_name = eval->root().function_name();
+ if (!IsSupportedStatsConjunct(fn_name)) continue;
bool missing_field = false;
SchemaNode* node = nullptr;
@@ -585,7 +586,6 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
break;
}
- const string& fn_name = eval->root().function_name();
ColumnStatsReader::StatsField stats_field;
if (!ColumnStatsReader::GetRequiredStatsField(fn_name, &stats_field)) continue;
@@ -647,21 +647,21 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
if (!state_->query_options().parquet_read_statistics) return Status::OK();
- const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
- if (GetOverlapPredicateDescs().size() > 0 && !min_max_tuple_desc) {
+ const TupleDescriptor* stats_tuple_desc = scan_node_->stats_tuple_desc();
+ if (GetOverlapPredicateDescs().size() > 0 && !stats_tuple_desc) {
stringstream err;
- err << "min_max_tuple_desc is null.";
+ err << "stats_tuple_desc is null.";
DCHECK(false) << err.str();
return Status(err.str());
}
DCHECK(min_max_tuple_ != nullptr);
- min_max_tuple_->Init(min_max_tuple_desc->byte_size());
+ min_max_tuple_->Init(stats_tuple_desc->byte_size());
// The total number slots in min_max_tuple_ should be equal to or larger than
// the number of min/max conjuncts. The extra number slots are for the overlap
// predicates. # min_max_conjuncts + 2 * # overlap predicates = # min_max_slots.
- DCHECK_GE(min_max_tuple_desc->slots().size(), min_max_conjunct_evals_.size());
+ DCHECK_GE(stats_tuple_desc->slots().size(), stats_conjunct_evals_.size());
TMinmaxFilteringLevel::type level = state_->query_options().minmax_filtering_level;
float threshold = (float)(state_->query_options().minmax_filter_threshold);
@@ -703,7 +703,7 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
continue;
}
- SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[slot_idx];
+ SlotDescriptor* slot_desc = stats_tuple_desc->slots()[slot_idx];
bool missing_field = false;
SchemaNode* node = nullptr;
@@ -800,7 +800,7 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
bool HdfsParquetScanner::ShouldProcessPageIndex() {
if (!state_->query_options().parquet_read_page_index) return false;
- if (!min_max_conjunct_evals_.empty()) return true;
+ if (!stats_conjunct_evals_.empty()) return true;
for (auto desc : GetOverlapPredicateDescs()) {
if (IsFilterWorthyForOverlapCheck(FindFilterIndex(desc.filter_id))) {
return true;
@@ -1294,9 +1294,9 @@ void HdfsParquetScanner::GetMinMaxSlotsForOverlapPred(
DCHECK(min_slot);
DCHECK(max_slot);
SlotDescriptor* min_slot_desc =
- scan_node_->min_max_tuple_desc()->slots()[overlap_slot_idx];
+ scan_node_->stats_tuple_desc()->slots()[overlap_slot_idx];
SlotDescriptor* max_slot_desc =
- scan_node_->min_max_tuple_desc()->slots()[overlap_slot_idx + 1];
+ scan_node_->stats_tuple_desc()->slots()[overlap_slot_idx + 1];
*min_slot = min_max_tuple_->GetSlot(min_slot_desc->tuple_offset());
*max_slot = min_max_tuple_->GetSlot(max_slot_desc->tuple_offset());
}
@@ -1390,10 +1390,10 @@ void HdfsParquetScanner::CollectSkippedPageRangesForSortedColumn(
Status HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters(
vector<RowRange>* skip_ranges) {
DCHECK(skip_ranges);
- const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
+ const TupleDescriptor* min_max_tuple_desc = scan_node_->stats_tuple_desc();
if (GetOverlapPredicateDescs().size() > 0 && !min_max_tuple_desc) {
stringstream err;
- err << "min_max_tuple_desc is null.";
+ err << "stats_tuple_desc is null.";
DCHECK(false) << err.str();
return Status(err.str());
}
@@ -1495,9 +1495,11 @@ Status HdfsParquetScanner::EvaluatePageIndex() {
parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
vector<RowRange> skip_ranges;
- for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) {
- ScalarExprEvaluator* eval = min_max_conjunct_evals_[i];
- SlotDescriptor* slot_desc = scan_node_->min_max_tuple_desc()->slots()[i];
+ for (int i = 0; i < stats_conjunct_evals_.size(); ++i) {
+ ScalarExprEvaluator* eval = stats_conjunct_evals_[i];
+ const string& fn_name = eval->root().function_name();
+ if (!IsSupportedStatsConjunct(fn_name)) continue;
+ SlotDescriptor* slot_desc = scan_node_->stats_tuple_desc()->slots()[i];
bool missing_field = false;
SchemaNode* node = nullptr;
@@ -1518,11 +1520,10 @@ Status HdfsParquetScanner::EvaluatePageIndex() {
parquet::ColumnIndex column_index;
RETURN_IF_ERROR(page_index_.DeserializeColumnIndex(col_chunk, &column_index));
- min_max_tuple_->Init(scan_node_->min_max_tuple_desc()->byte_size());
+ min_max_tuple_->Init(scan_node_->stats_tuple_desc()->byte_size());
void* slot = min_max_tuple_->GetSlot(slot_desc->tuple_offset());
const int num_of_pages = column_index.null_pages.size();
- const string& fn_name = eval->root().function_name();
ColumnStatsReader::StatsField stats_field;
if (!ColumnStatsReader::GetRequiredStatsField(fn_name, &stats_field)) continue;
@@ -1888,7 +1889,7 @@ Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() {
unordered_map<int, std::unordered_set<std::pair<std::string, const Literal*>>>
conjunct_halves;
- for (ScalarExprEvaluator* eval : min_max_conjunct_evals_) {
+ for (ScalarExprEvaluator* eval : stats_conjunct_evals_) {
const ScalarExpr& expr = eval->root();
const string& function_name = expr.function_name();
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 05a2552..cab7d60 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -409,9 +409,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
/// Tuple to hold values when reading parquet::Statistics. Owned by perm_pool_.
Tuple* min_max_tuple_;
- /// Clone of Min/max statistics conjunct evaluators. Has the same life time as
- /// the scanner. Stored in 'obj_pool_'.
- vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
+ /// Clone of statistics conjunct evaluators. Has the same life time as the scanner.
+ /// Stored in 'obj_pool_'.
+ vector<ScalarExprEvaluator*> stats_conjunct_evals_;
/// A map from indices of columns that participate in an EQ conjunct to the hash of the
/// literal value of the EQ conjunct. Used in Parquet Bloom filtering.
@@ -555,6 +555,14 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT;
+ /// Return true if we can evaluate this type of predicate on parquet statistic.
+ /// FE could populate stats-predicates that can't be evaluated here if the table
+ /// contains both Parquet and ORC format partitions. Here we only pick min-max
+ /// predicates, i.e. <, >, <=, and >=.
+ bool IsSupportedStatsConjunct(std::string fn_name) {
+ return fn_name == "lt" || fn_name == "gt" || fn_name == "le" || fn_name == "ge";
+ }
+
/// Evaluates the min/max predicates of the 'scan_node_' using the parquet::Statistics
/// of 'row_group'. 'file_metadata' is used to determine the ordering that was used to
/// compute the statistics. Sets 'skip_row_group' to true if the row group can be
@@ -677,7 +685,7 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
/// Sets 'filter_pages_' to true if found any page to filter out.
Status ProcessPageIndex();
- /// Evaluates 'min_max_conjunct_evals_' against the column index and determines the row
+ /// Evaluates 'stats_conjunct_evals_' against the column index and determines the row
/// ranges that might contain data we are looking for.
/// Sets 'filter_pages_' to true if found any page to filter out.
Status EvaluatePageIndex();
@@ -864,7 +872,7 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
/// provided buffer must be preallocated to hold at least 'size' bytes.
Status ReadToBuffer(uint64_t offset, uint8_t* buffer, uint64_t size) WARN_UNUSED_RESULT;
- /// Processes 'min_max_conjunct_evals_' to extract equality (EQ) conjuncts. These are
+ /// Processes 'stats_conjunct_evals_' to extract equality (EQ) conjuncts. These are
/// now represented as two conjuncts: an LE and a GE. This function finds such pairs and
/// fills the map 'eq_conjunct_info_' with the hash of the literal in the EQ conjunct.
/// See 'eq_conjunct_info_'.
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 91eb441..6379264 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -68,7 +68,7 @@ fi
# moving to a different build of the toolchain, e.g. when a version is bumped or a
# compile option is changed. The build id can be found in the output of the toolchain
# build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=32-edf8115953
+export IMPALA_TOOLCHAIN_BUILD_ID=47-5e0072f1e0
# Versions of toolchain dependencies.
# -----------------------------------
export IMPALA_AVRO_VERSION=1.7.4-p5
@@ -136,7 +136,7 @@ export IMPALA_OPENLDAP_VERSION=2.4.47
unset IMPALA_OPENLDAP_URL
export IMPALA_OPENSSL_VERSION=1.0.2l
unset IMPALA_OPENSSL_URL
-export IMPALA_ORC_VERSION=2667f2996b75e879e52365edfd06b05da4eda941
+export IMPALA_ORC_VERSION=1.7.0-p3
unset IMPALA_ORC_URL
export IMPALA_PROTOBUF_VERSION=3.5.1
unset IMPALA_PROTOBUF_URL
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 3c48fe8..46db62d 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -304,12 +304,12 @@ struct THdfsScanNode {
// TODO: Remove this option when the MT scan node supports all file formats.
6: optional bool use_mt_scan_node
- // Conjuncts that can be evaluated against parquet::Statistics using the tuple
- // referenced by 'min_max_tuple_id'.
- 7: optional list<Exprs.TExpr> min_max_conjuncts
+ // Conjuncts that can be pushed down to the ORC reader, or be evaluated against
+ // parquet::Statistics using the tuple referenced by 'stats_tuple_id'.
+ 7: optional list<Exprs.TExpr> stats_conjuncts
- // Tuple to evaluate 'min_max_conjuncts' against.
- 8: optional Types.TTupleId min_max_tuple_id
+ // Tuple to evaluate 'stats_conjuncts' against.
+ 8: optional Types.TTupleId stats_tuple_id
// The conjuncts that are eligible for dictionary filtering.
9: optional map<Types.TSlotId, list<i32>> dictionary_filter_conjuncts
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index aac9502..b9e84d5 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -33,16 +33,14 @@ import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.impala.analysis.Analyzer;
-import org.apache.impala.analysis.AlterTableSortByStmt;
import org.apache.impala.analysis.BinaryPredicate;
-import org.apache.impala.analysis.CompoundPredicate;
import org.apache.impala.analysis.DescriptorTable;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.InPredicate;
import org.apache.impala.analysis.IsNotEmptyPredicate;
+import org.apache.impala.analysis.IsNullPredicate;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.analysis.MultiAggregateInfo;
-import org.apache.impala.analysis.Path;
import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.SlotId;
import org.apache.impala.analysis.SlotRef;
@@ -59,11 +57,9 @@ import org.apache.impala.catalog.HdfsCompression;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.HdfsPartition.FileBlock;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
-import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.PrimitiveType;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.Type;
-import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
@@ -103,7 +99,6 @@ import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -301,25 +296,25 @@ public class HdfsScanNode extends ScanNode {
private int numFilesNoDiskIds_ = 0;
private int numPartitionsNoDiskIds_ = 0;
- // List of conjuncts for min/max values of parquet::Statistics, that are used to skip
- // data when scanning Parquet files.
- private final List<Expr> minMaxConjuncts_ = new ArrayList<>();
+ // List of conjuncts for min/max values of parquet/orc statistics, that are used to skip
+ // data when scanning Parquet/ORC files.
+ private final List<Expr> statsConjuncts_ = new ArrayList<>();
// Map from TupleDescriptor to list of PlanNode conjuncts that have been transformed
- // into conjuncts in 'minMaxConjuncts_'.
- private final Map<TupleDescriptor, List<Expr>> minMaxOriginalConjuncts_ =
+ // into conjuncts in 'statsConjuncts_'.
+ private final Map<TupleDescriptor, List<Expr>> statsOriginalConjuncts_ =
new LinkedHashMap<>();
// Tuple that is used to materialize statistics when scanning Parquet files. For each
// column it can contain 0, 1, or 2 slots, depending on whether the column needs to be
// evaluated against the min and/or the max value of the corresponding
// parquet::Statistics.
- private TupleDescriptor minMaxTuple_;
+ private TupleDescriptor statsTuple_;
// The list of overlap predicate descs. See TOverlapPredicateDesc in PlanNodes.thrift.
private ArrayList<TOverlapPredicateDesc> overlapPredicateDescs_ = new ArrayList<>();
- // Index of the first slot in minMaxTuple_ for overlap predicates.
+ // Index of the first slot in statsTuple_ for overlap predicates.
private int overlap_first_slot_idx_ = -1;
// Slot that is used to record the Parquet metadata for the count(*) aggregation if
@@ -412,7 +407,7 @@ public class HdfsScanNode extends ScanNode {
// Compute min-max conjuncts only if the PARQUET_READ_STATISTICS query option is
// set to true.
if (analyzer.getQueryOptions().parquet_read_statistics) {
- computeMinMaxTupleAndConjuncts(analyzer);
+ computeStatsTupleAndConjuncts(analyzer);
}
// Compute dictionary conjuncts only if the PARQUET_DICTIONARY_FILTERING query
// option is set to true.
@@ -425,7 +420,7 @@ public class HdfsScanNode extends ScanNode {
// Compute min-max conjuncts only if the ORC_READ_STATISTICS query option is
// set to true.
if (analyzer.getQueryOptions().orc_read_statistics) {
- computeMinMaxTupleAndConjuncts(analyzer);
+ computeStatsTupleAndConjuncts(analyzer);
}
}
@@ -526,25 +521,60 @@ public class HdfsScanNode extends ScanNode {
/**
* Builds a predicate to evaluate against parquet::Statistics by copying 'inputSlot'
- * into 'minMaxTuple_', combining 'inputSlot', 'inputPred' and 'op' into a new
- * predicate, and adding it to 'minMaxConjuncts_'.
+ * into 'statsTuple_', combining 'inputSlot', 'inputPred' and 'op' into a new
+ * predicate, and adding it to 'statsConjuncts_'.
*/
- private void buildStatsPredicate(Analyzer analyzer, SlotRef inputSlot,
+ private void buildBinaryStatsPredicate(Analyzer analyzer, SlotRef inputSlot,
BinaryPredicate inputPred, BinaryPredicate.Operator op) {
// Obtain the rhs expr of the input predicate
Expr constExpr = inputPred.getChild(1);
Preconditions.checkState(constExpr.isConstant());
// Make a new slot descriptor, which adds it to the tuple descriptor.
- SlotDescriptor slotDesc = analyzer.getDescTbl().copySlotDescriptor(minMaxTuple_,
+ SlotDescriptor slotDesc = analyzer.getDescTbl().copySlotDescriptor(statsTuple_,
inputSlot.getDesc());
SlotRef slot = new SlotRef(slotDesc);
BinaryPredicate statsPred = new BinaryPredicate(op, slot, constExpr);
statsPred.analyzeNoThrow(analyzer);
- minMaxConjuncts_.add(statsPred);
+ statsConjuncts_.add(statsPred);
}
- private void tryComputeBinaryMinMaxPredicate(Analyzer analyzer,
+ /**
+ * Similar to the above method but builds an IN-list predicate which can be pushed down
+ * to the ORC reader.
+ */
+ private void buildInListStatsPredicate(Analyzer analyzer, SlotRef inputSlot,
+ InPredicate inputPred) {
+ Preconditions.checkState(!inputPred.isNotIn());
+ List<Expr> children = inputPred.getChildren();
+ Preconditions.checkState(inputSlot == children.get(0).unwrapSlotRef(true));
+ List<Expr> inList = Lists.newArrayListWithCapacity(children.size() - 1);
+ for (int i = 1; i < children.size(); ++i) {
+ Expr child = children.get(i);
+ // If any child is not a literal, then nothing can be done
+ if (!Expr.IS_LITERAL.apply(child)) return;
+ if (isUnsupportedStatsType(child.getType())) return;
+ inList.add(child);
+ }
+ // Make a new slot descriptor, which adds it to the tuple descriptor.
+ SlotDescriptor slotDesc = analyzer.getDescTbl().copySlotDescriptor(statsTuple_,
+ inputSlot.getDesc());
+ SlotRef slot = new SlotRef(slotDesc);
+ InPredicate inPred = new InPredicate(slot, inList, inputPred.isNotIn());
+ inPred.analyzeNoThrow(analyzer);
+ statsConjuncts_.add(inPred);
+ }
+
+ private boolean isUnsupportedStatsType(Type type) {
+ // TODO(IMPALA-10882): Push down Min-Max predicates of CHAR/VARCHAR to ORC reader
+ // TODO(IMPALA-10915): Push down Min-Max predicates of TIMESTAMP to ORC reader
+ return fileFormats_.contains(HdfsFileFormat.ORC)
+ && (type.getPrimitiveType() == PrimitiveType.CHAR
+ || type.getPrimitiveType() == PrimitiveType.VARCHAR
+ || type.getPrimitiveType() == PrimitiveType.TIMESTAMP);
+ }
+
+ private void tryComputeBinaryStatsPredicate(Analyzer analyzer,
BinaryPredicate binaryPred) {
// We only support slot refs on the left hand side of the predicate, a rewriting
// rule makes sure that all compatible exprs are rewritten into this form. Only
@@ -563,26 +593,29 @@ public class HdfsScanNode extends ScanNode {
// LiteralExpr, but can also be an expr like "1 + 2".
if (!constExpr.isConstant()) return;
if (Expr.IS_NULL_VALUE.apply(constExpr)) return;
+ if (isUnsupportedStatsType(slotDesc.getType())) return;
+ if (isUnsupportedStatsType(constExpr.getType())) return;
- // TODO(IMPALA-10882): Push down Min-Max predicates of CHAR/VARCHAR to ORC reader
- // TODO(IMPALA-10915): Push down Min-Max predicates of TIMESTAMP to ORC reader
- if (fileFormats_.contains(HdfsFileFormat.ORC) &&
- (slotDesc.getType() == Type.CHAR || slotDesc.getType() == Type.VARCHAR ||
- slotDesc.getType() == Type.TIMESTAMP)) {
- return;
- }
if (BinaryPredicate.IS_RANGE_PREDICATE.apply(binaryPred)) {
- addMinMaxOriginalConjunct(slotDesc.getParent(), binaryPred);
- buildStatsPredicate(analyzer, slotRef, binaryPred, binaryPred.getOp());
+ addStatsOriginalConjunct(slotDesc.getParent(), binaryPred);
+ buildBinaryStatsPredicate(analyzer, slotRef, binaryPred, binaryPred.getOp());
} else if (BinaryPredicate.IS_EQ_PREDICATE.apply(binaryPred)) {
- addMinMaxOriginalConjunct(slotDesc.getParent(), binaryPred);
- // TODO: this could be optimized for boolean columns.
- buildStatsPredicate(analyzer, slotRef, binaryPred, BinaryPredicate.Operator.LE);
- buildStatsPredicate(analyzer, slotRef, binaryPred, BinaryPredicate.Operator.GE);
+ addStatsOriginalConjunct(slotDesc.getParent(), binaryPred);
+ if (hasParquet(fileFormats_)) {
+ // TODO: this could be optimized for boolean columns.
+ buildBinaryStatsPredicate(analyzer, slotRef, binaryPred,
+ BinaryPredicate.Operator.LE);
+ buildBinaryStatsPredicate(analyzer, slotRef, binaryPred,
+ BinaryPredicate.Operator.GE);
+ }
+ if (fileFormats_.contains(HdfsFileFormat.ORC)) {
+ // We can push down EQ predicates to the ORC reader directly.
+ buildBinaryStatsPredicate(analyzer, slotRef, binaryPred, binaryPred.getOp());
+ }
}
}
- private void tryComputeInListMinMaxPredicate(Analyzer analyzer, InPredicate inPred) {
+ private void tryComputeInListStatsPredicate(Analyzer analyzer, InPredicate inPred) {
// Retrieve the left side of the IN predicate. It must be a simple slot to proceed.
SlotRef slotRef = inPred.getBoundSlot();
if (slotRef == null) return;
@@ -591,15 +624,14 @@ public class HdfsScanNode extends ScanNode {
Preconditions.checkState(slotDesc.isScanSlot());
// Skip the slot ref if it refers to an array's "pos" field.
if (slotDesc.isArrayPosRef()) return;
- // TODO(IMPALA-10882): Push down Min-Max predicates of CHAR/VARCHAR to ORC reader
- // TODO(IMPALA-10915): Push down Min-Max predicates of TIMESTAMP to ORC reader
- if (fileFormats_.contains(HdfsFileFormat.ORC) &&
- (slotDesc.getType() == Type.CHAR || slotDesc.getType() == Type.VARCHAR ||
- slotDesc.getType() == Type.TIMESTAMP)) {
- return;
- }
if (inPred.isNotIn()) return;
+ if (fileFormats_.contains(HdfsFileFormat.ORC)) {
+ if (isUnsupportedStatsType(slotDesc.getType())) return;
+ addStatsOriginalConjunct(slotDesc.getParent(), inPred);
+ buildInListStatsPredicate(analyzer, slotRef, inPred);
+ }
+ if (!hasParquet(fileFormats_)) return;
List<Expr> children = inPred.getChildren();
LiteralExpr min = null;
LiteralExpr max = null;
@@ -623,25 +655,44 @@ public class HdfsScanNode extends ScanNode {
BinaryPredicate maxBound = new BinaryPredicate(BinaryPredicate.Operator.LE,
children.get(0).clone(), max.clone());
- addMinMaxOriginalConjunct(slotDesc.getParent(), inPred);
- buildStatsPredicate(analyzer, slotRef, minBound, minBound.getOp());
- buildStatsPredicate(analyzer, slotRef, maxBound, maxBound.getOp());
+ addStatsOriginalConjunct(slotDesc.getParent(), inPred);
+ buildBinaryStatsPredicate(analyzer, slotRef, minBound, minBound.getOp());
+ buildBinaryStatsPredicate(analyzer, slotRef, maxBound, maxBound.getOp());
}
- private void addMinMaxOriginalConjunct(TupleDescriptor tupleDesc, Expr expr) {
- List<Expr> exprs = minMaxOriginalConjuncts_.get(tupleDesc);
- if (exprs == null) {
- exprs = new ArrayList<>();
- minMaxOriginalConjuncts_.put(tupleDesc, exprs);
- }
+ private void tryComputeIsNullStatsPredicate(Analyzer analyzer,
+ IsNullPredicate isNullPred) {
+ // Currently, only ORC table can push down IS-NULL predicates.
+ if (!fileFormats_.contains(HdfsFileFormat.ORC)) return;
+ // Retrieve the left side of the IS-NULL predicate. Skip if it's not a simple slot.
+ SlotRef slotRef = isNullPred.getBoundSlot();
+ if (slotRef == null) return;
+ // This node is a table scan, so this must be a scanning slot.
+ Preconditions.checkState(slotRef.getDesc().isScanSlot());
+ // Skip the slot ref if it refers to an array's "pos" field.
+ if (slotRef.getDesc().isArrayPosRef()) return;
+ addStatsOriginalConjunct(slotRef.getDesc().getParent(), isNullPred);
+ SlotDescriptor slotDesc = analyzer.getDescTbl().copySlotDescriptor(statsTuple_,
+ slotRef.getDesc());
+ SlotRef slot = new SlotRef(slotDesc);
+ IsNullPredicate statsPred = new IsNullPredicate(slot, isNullPred.isNotNull());
+ statsPred.analyzeNoThrow(analyzer);
+ statsConjuncts_.add(statsPred);
+ }
+
+ private void addStatsOriginalConjunct(TupleDescriptor tupleDesc, Expr expr) {
+ List<Expr> exprs = statsOriginalConjuncts_.computeIfAbsent(
+ tupleDesc, k -> new ArrayList<>());
exprs.add(expr);
}
- private void tryComputeMinMaxPredicate(Analyzer analyzer, Expr pred) {
+ private void tryComputeStatsPredicate(Analyzer analyzer, Expr pred) {
if (pred instanceof BinaryPredicate) {
- tryComputeBinaryMinMaxPredicate(analyzer, (BinaryPredicate) pred);
+ tryComputeBinaryStatsPredicate(analyzer, (BinaryPredicate) pred);
} else if (pred instanceof InPredicate) {
- tryComputeInListMinMaxPredicate(analyzer, (InPredicate) pred);
+ tryComputeInListStatsPredicate(analyzer, (InPredicate) pred);
+ } else if (pred instanceof IsNullPredicate) {
+ tryComputeIsNullStatsPredicate(analyzer, (IsNullPredicate) pred);
}
}
@@ -660,28 +711,29 @@ public class HdfsScanNode extends ScanNode {
}
/**
- * Analyzes 'conjuncts_' and 'collectionConjuncts_', populates 'minMaxTuple_' with slots
- * for statistics values, and populates 'minMaxConjuncts_' with conjuncts pointing into
- * the 'minMaxTuple_'. Only conjuncts of the form <slot> <op> <constant> are supported,
- * and <op> must be one of LT, LE, GE, GT, or EQ.
+ * Analyzes 'conjuncts_' and 'collectionConjuncts_', populates 'statsTuple_' with slots
+ * for statistics values, and populates 'statsConjuncts_' with conjuncts pointing into
+ * the 'statsTuple_'. Binary conjuncts of the form <slot> <op> <constant> are supported,
+ * and <op> must be one of LT, LE, GE, GT, or EQ. IN-list and IS-NULL conjuncts are also
+ * supported.
*/
- private void computeMinMaxTupleAndConjuncts(Analyzer analyzer) throws ImpalaException{
+ private void computeStatsTupleAndConjuncts(Analyzer analyzer) throws ImpalaException{
Preconditions.checkNotNull(desc_.getPath());
String tupleName = desc_.getPath().toString() + " statistics";
DescriptorTable descTbl = analyzer.getDescTbl();
- minMaxTuple_ = descTbl.createTupleDescriptor(tupleName);
- minMaxTuple_.setPath(desc_.getPath());
+ statsTuple_ = descTbl.createTupleDescriptor(tupleName);
+ statsTuple_.setPath(desc_.getPath());
// Adds predicates for scalar, top-level columns.
- for (Expr pred: conjuncts_) tryComputeMinMaxPredicate(analyzer, pred);
+ for (Expr pred: conjuncts_) tryComputeStatsPredicate(analyzer, pred);
// Adds predicates for collections.
for (Map.Entry<TupleDescriptor, List<Expr>> entry: collectionConjuncts_.entrySet()) {
if (notEmptyCollections_.contains(entry.getKey())) {
- for (Expr pred: entry.getValue()) tryComputeMinMaxPredicate(analyzer, pred);
+ for (Expr pred: entry.getValue()) tryComputeStatsPredicate(analyzer, pred);
}
}
- minMaxTuple_.computeMemLayout();
+ statsTuple_.computeMemLayout();
}
/**
@@ -689,11 +741,11 @@ public class HdfsScanNode extends ScanNode {
*/
public void initOverlapPredicate(Analyzer analyzer) {
if (!allParquet_) return;
- Preconditions.checkNotNull(minMaxTuple_);
+ Preconditions.checkNotNull(statsTuple_);
// Allow the tuple to accept new slots.
- minMaxTuple_.resetHasMemoryLayout();
+ statsTuple_.resetHasMemoryLayout();
- overlap_first_slot_idx_ = minMaxTuple_.getSlots().size();
+ overlap_first_slot_idx_ = statsTuple_.getSlots().size();
}
/**
@@ -799,7 +851,7 @@ public class HdfsScanNode extends ScanNode {
// Check if targetExpr refers to some column in one of the min/max conjuncts
// already formed. If so, do not add an overlap predicate as it may not be
// as effective as the conjunct.
- List<SlotDescriptor> slotDescs = minMaxTuple_.getSlots();
+ List<SlotDescriptor> slotDescs = statsTuple_.getSlots();
for (int i=0; i<overlap_first_slot_idx_; i++) {
if (slotDescs.get(i).getPath() == slotRefInScan.getDesc().getPath())
return false;
@@ -837,13 +889,13 @@ public class HdfsScanNode extends ScanNode {
}
}
- int firstSlotIdx = minMaxTuple_.getSlots().size();
+ int firstSlotIdx = statsTuple_.getSlots().size();
// Make two new slot descriptors to hold data min/max values (such as from
// row groups or pages in Parquet)) and append them to the tuple descriptor.
SlotDescriptor slotDescDataMin =
- analyzer.getDescTbl().copySlotDescriptor(minMaxTuple_, slotRefInScan.getDesc());
+ analyzer.getDescTbl().copySlotDescriptor(statsTuple_, slotRefInScan.getDesc());
SlotDescriptor slotDescDataMax =
- analyzer.getDescTbl().copySlotDescriptor(minMaxTuple_, slotRefInScan.getDesc());
+ analyzer.getDescTbl().copySlotDescriptor(statsTuple_, slotRefInScan.getDesc());
overlapPredicateDescs_.add(
new TOverlapPredicateDesc(filter.getFilterId().asInt(), firstSlotIdx));
@@ -854,7 +906,7 @@ public class HdfsScanNode extends ScanNode {
public void finalizeOverlapPredicate() {
if (!allParquet_) return;
// Recompute the memory layout for the min/max tuple.
- minMaxTuple_.computeMemLayout();
+ statsTuple_.computeMemLayout();
}
/**
@@ -1697,14 +1749,14 @@ public class HdfsScanNode extends ScanNode {
msg.hdfs_scan_node.setParquet_count_star_slot_offset(
countStarSlot_.getByteOffset());
}
- if (!minMaxConjuncts_.isEmpty()) {
- for (Expr e: minMaxConjuncts_) {
- msg.hdfs_scan_node.addToMin_max_conjuncts(e.treeToThrift());
+ if (!statsConjuncts_.isEmpty()) {
+ for (Expr e: statsConjuncts_) {
+ msg.hdfs_scan_node.addToStats_conjuncts(e.treeToThrift());
}
}
- if ( minMaxTuple_ != null ) {
- msg.hdfs_scan_node.setMin_max_tuple_id(minMaxTuple_.getId().asInt());
+ if (statsTuple_ != null) {
+ msg.hdfs_scan_node.setStats_tuple_id(statsTuple_.getId().asInt());
}
Map<Integer, List<Integer>> dictMap = new LinkedHashMap<>();
@@ -1851,7 +1903,7 @@ public class HdfsScanNode extends ScanNode {
String prefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
for (Map.Entry<TupleDescriptor, List<Expr>> entry :
- minMaxOriginalConjuncts_.entrySet()) {
+ statsOriginalConjuncts_.entrySet()) {
TupleDescriptor tupleDesc = entry.getKey();
List<Expr> exprs = entry.getValue();
String fileFormatStr;
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 78094a3..52c16cc 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -183,6 +183,35 @@ FROM {db_name}.{table_name};
---- DATASET
functional
---- BASE_TABLE_NAME
+alltypessmall_bool_sorted
+---- PARTITION_COLUMNS
+year int
+month int
+---- COLUMNS
+id int
+bool_col boolean
+tinyint_col tinyint
+smallint_col smallint
+int_col int
+bigint_col bigint
+float_col float
+double_col double
+date_string_col string
+string_col string
+timestamp_col timestamp
+---- DEPENDENT_LOAD_HIVE
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} partition (year, month)
+SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+FROM {db_name}.alltypessmall
+where bool_col;
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} partition (year, month)
+SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+FROM {db_name}.alltypessmall
+where not bool_col;
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
alltypestiny
---- PARTITION_COLUMNS
year int
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index c359af1..87e5e61 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -309,3 +309,6 @@ table_name:uncomp_src_alltypes, constraint:restrict_to, table_format:orc/def/blo
table_name:uncomp_src_decimal_tbl, constraint:restrict_to, table_format:orc/def/block
table_name:part_strings_with_quotes, constraint:restrict_to, table_format:text/none/none
+
+# 'alltypessmall_bool_sorted' only used in ORC tests.
+table_name:alltypessmall_bool_sorted, constraint:restrict_to, table_format:orc/def/block
diff --git a/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test b/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test
index a53a7d8..c878a29 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test
@@ -203,6 +203,20 @@ select count(*) from functional_orc_def.alltypessmall where bigint_col = 100
aggregation(SUM, RowsRead): 0
====
---- QUERY
+select count(*) from functional_orc_def.alltypessmall_bool_sorted where bool_col = true;
+---- RESULTS
+50
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 50
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall_bool_sorted where bool_col = false;
+---- RESULTS
+50
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 50
+====
+---- QUERY
# Test on predicate x < min_val for float. No rows should be read by the ORC reader.
select count(*) from functional_orc_def.alltypessmall where float_col < 0
---- RESULTS
@@ -211,6 +225,14 @@ select count(*) from functional_orc_def.alltypessmall where float_col < 0
aggregation(SUM, RowsRead): 0
====
---- QUERY
+# Test on predicate x < a for float that can't filter out any RowGroups.
+select count(*) from functional_orc_def.alltypessmall where float_col < 1
+---- RESULTS
+12
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
# Test on predicate x > max_val for float. No rows should be read by the ORC reader.
select count(*) from functional_orc_def.alltypessmall where float_col > 9.9
---- RESULTS
@@ -219,6 +241,14 @@ select count(*) from functional_orc_def.alltypessmall where float_col > 9.9
aggregation(SUM, RowsRead): 0
====
---- QUERY
+# Test on predicate x > a for float that can't filter out any RowGroups.
+select count(*) from functional_orc_def.alltypessmall where float_col > 9
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
# Test on predicate x < min_val for double. No rows should be read by the ORC reader.
select count(*) from functional_orc_def.alltypessmall where double_col < 0
---- RESULTS
@@ -227,6 +257,14 @@ select count(*) from functional_orc_def.alltypessmall where double_col < 0
aggregation(SUM, RowsRead): 0
====
---- QUERY
+# Test on predicate x < a for double that can't filter out any RowGroups.
+select count(*) from functional_orc_def.alltypessmall where double_col < 1
+---- RESULTS
+12
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
# Test on predicate x > max_val for double. No rows should be read by the ORC reader.
select count(*) from functional_orc_def.alltypessmall where double_col > 99
---- RESULTS
@@ -235,6 +273,14 @@ select count(*) from functional_orc_def.alltypessmall where double_col > 99
aggregation(SUM, RowsRead): 0
====
---- QUERY
+# Test on predicate x > a for double that can't filter out any RowGroups.
+select count(*) from functional_orc_def.alltypessmall where double_col > 90
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
# Test on predicate x < min_val for string. No rows should be read by the ORC reader.
select count(*) from functional_orc_def.alltypessmall where string_col < "0"
---- RESULTS
@@ -480,6 +526,14 @@ aggregation(SUM, RowsRead): 0
====
---- QUERY
select count(*) from functional_orc_def.date_tbl
+where date_part in ("2017-11-27", "1399-06-27") and date_col <= '0001-06-19';
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 10
+====
+---- QUERY
+select count(*) from functional_orc_def.date_tbl
where date_part in ("2017-11-27", "1399-06-27") and date_col > '2018-12-31';
---- RESULTS
0
@@ -487,6 +541,22 @@ where date_part in ("2017-11-27", "1399-06-27") and date_col > '2018-12-31';
aggregation(SUM, RowsRead): 0
====
---- QUERY
+select count(*) from functional_orc_def.date_tbl
+where date_part in ("2017-11-27", "1399-06-27") and date_col >= '2018-12-31';
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+select count(*) from functional_orc_def.date_tbl
+where date_part in ("2017-11-27", "1399-06-27") and date_col = '2018-12-31';
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
# Test predicates on CHAR type. They are not pushed down (IMPALA-10882). Just make sure
# we don't hit DCHECKs.
select count(*) from functional_orc_def.chars_tiny where cs < cast('1aaaa' as char(5));
@@ -540,3 +610,254 @@ select count(*) from tpch_orc_def.lineitem where l_orderkey = 1609411;
---- RUNTIME_PROFILE
aggregation(SUM, RowsRead): 13501
====
+---- QUERY
+# Test pushing down IN-list predicate with literals outside the value range.
+select count(*) from functional_orc_def.alltypessmall where tinyint_col in (-1, 10);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals inside the value range.
+select count(*) from functional_orc_def.alltypessmall where tinyint_col in (1, 5);
+---- RESULTS
+20
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals outside the value range.
+select count(*) from functional_orc_def.alltypessmall where smallint_col in (-1, 10);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals inside the value range.
+select count(*) from functional_orc_def.alltypessmall where smallint_col in (1, 5);
+---- RESULTS
+20
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals outside the value range.
+select count(*) from functional_orc_def.alltypessmall where int_col in (-1, 10);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals inside the value range.
+select count(*) from functional_orc_def.alltypessmall where int_col in (1, 5);
+---- RESULTS
+20
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals inside the value range.
+# The table has 4 files. 3 of them will be skipped.
+select count(*) from functional_orc_def.alltypessmall where id in (1, 3, 5);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 25
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals outside the value range.
+select count(*) from functional_orc_def.alltypessmall where bigint_col in (-1, 91);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals inside the value range.
+select count(*) from functional_orc_def.alltypessmall where bigint_col in (10, 50);
+---- RESULTS
+20
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall_bool_sorted where bool_col in (true);
+---- RESULTS
+50
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 50
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall_bool_sorted where bool_col in (false);
+---- RESULTS
+50
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 50
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals outside the value range.
+# The range of string_col is ['0', '9']. '/' is smaller than '0' and 'A' is larger
+# than '9'.
+select count(*) from functional_orc_def.alltypessmall where string_col in ('/', 'A');
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals inside the value range.
+select count(*) from functional_orc_def.alltypessmall where string_col in ('1', '5');
+---- RESULTS
+20
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 100
+====
+---- QUERY
+select count(*) from functional_orc_def.date_tbl
+where date_part in ("2017-11-27", "1399-06-27") and date_col in ('0001-06-18', '2019-12-31');
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.date_tbl
+where date_part in ("2017-11-27", "1399-06-27") and date_col in ('0001-06-19', '2018-12-31');
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 13
+====
+---- QUERY
+# Test pushing down IN-list predicate with literals outside the value range. Explicitly cast
+# the literals so FE won't wrap 'd1' as casting to INT, so the predicate can be pushed down.
+# Due to ORC-517 not included in the current Hive version (3.1.3000.7.2.12.0-104),
+# the ORC files have wrong statistics on d1 column showing that its minimum is 0. So here we
+# use -1 to be smaller than it. The max of d1 is 132842 so we use 132843.
+select count(*) from functional_orc_def.decimal_tbl
+where d1 in (cast(-1 as decimal(9,0)), cast(132843 as decimal(9,0)));
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl
+where d1 in (cast(0 as decimal(9,0)), cast(132842 as decimal(9,0)));
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 5
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl
+where d1 in (cast(1234 as decimal(9,0)), cast(2345 as decimal(9,0)), cast(12345 as decimal(9,0)));
+---- RESULTS
+4
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 5
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl
+where d3 in (cast(-1 as decimal(20,10)), cast(12346 as decimal(20,10)));
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl
+where d3 in (cast(2 as decimal(20,10)), cast(200 as decimal(20,10)));
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 5
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl
+where d3 in (cast(12.3456789 as decimal(20,10)), cast(12345.6789 as decimal(20,10)));
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 5
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where bool_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where tinyint_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where smallint_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where int_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where bigint_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where float_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where double_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where string_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypessmall where timestamp_col is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl where d1 is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+select count(*) from functional_orc_def.decimal_tbl where d4 is null;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====