You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2022/05/04 16:51:20 UTC
[impala] branch master updated: IMPALA-11277: Push-down IN predicate to iceberg
This is an automated email from the ASF dual-hosted git repository.
boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 93fe446b8 IMPALA-11277: Push-down IN predicate to iceberg
93fe446b8 is described below
commit 93fe446b851ff96f7f3d229c355d83daa5e8f89e
Author: LPL <li...@sensorsdata.cn>
AuthorDate: Mon May 2 00:40:02 2022 +0800
IMPALA-11277: Push-down IN predicate to iceberg
Iceberg provides a rich API to push predicates down. Currently we only
push BinaryPredicate such as EQ, LT, GT, etc. This commit implements IN
predicate push down.
Testing:
- Added end-to-end testing for pushing down IN predicate to iceberg
Change-Id: Id4be9aa31a6353021b0eabc4485306c0b0e8bb07
Reviewed-on: http://gerrit.cloudera.org:8080/18463
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
.../org/apache/impala/planner/IcebergScanNode.java | 220 ++++----
.../QueryTest/iceberg-in-predicate-push-down.test | 624 +++++++++++++++++++++
tests/query_test/test_iceberg.py | 5 +
3 files changed, 749 insertions(+), 100 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index 1a998b2d3..1035cd367 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -22,7 +22,6 @@ import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.ListIterator;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DataFile;
@@ -36,6 +35,7 @@ import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.BoolLiteral;
import org.apache.impala.analysis.DateLiteral;
import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.InPredicate;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.analysis.MultiAggregateInfo;
import org.apache.impala.analysis.NumericLiteral;
@@ -44,6 +44,7 @@ import org.apache.impala.analysis.StringLiteral;
import org.apache.impala.analysis.TableRef;
import org.apache.impala.analysis.TimeTravelSpec;
import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.FeCatalogUtils;
import org.apache.impala.catalog.FeFsPartition;
import org.apache.impala.catalog.FeFsTable;
@@ -52,6 +53,7 @@ import org.apache.impala.catalog.IcebergColumn;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.InternalException;
@@ -184,132 +186,150 @@ public class IcebergScanNode extends HdfsScanNode {
* please refer: https://iceberg.apache.org/spec/#scan-planning
*/
private void extractIcebergConjuncts(Analyzer analyzer) throws ImpalaException {
- ListIterator<Expr> it = conjuncts_.listIterator();
- while (it.hasNext()) {
- tryConvertBinaryIcebergPredicate(analyzer, it.next());
+ for (Expr expr : conjuncts_) {
+ tryConvertIcebergPredicate(analyzer, expr);
}
}
/**
- * Transform impala binary predicate to iceberg predicate
+ * Returns Iceberg operator by BinaryPredicate operator, or null if the operation
+ * is not supported by Iceberg.
+ */
+ private Operation getIcebergOperator(BinaryPredicate.Operator op) {
+ switch (op) {
+ case EQ: return Operation.EQ;
+ case NE: return Operation.NOT_EQ;
+ case LE: return Operation.LT_EQ;
+ case GE: return Operation.GT_EQ;
+ case LT: return Operation.LT;
+ case GT: return Operation.GT;
+ default: return null;
+ }
+ }
+
+ /**
+ * Transform impala predicate to iceberg predicate
*/
- private boolean tryConvertBinaryIcebergPredicate(Analyzer analyzer, Expr expr)
+ private void tryConvertIcebergPredicate(Analyzer analyzer, Expr expr)
throws ImpalaException {
- if (! (expr instanceof BinaryPredicate)) return false;
+ if (expr instanceof BinaryPredicate) {
+ convertIcebergPredicate(analyzer, (BinaryPredicate) expr);
+ } else if (expr instanceof InPredicate) {
+ convertIcebergPredicate(analyzer, (InPredicate) expr);
+ }
+ }
- BinaryPredicate predicate = (BinaryPredicate) expr;
+ private void convertIcebergPredicate(Analyzer analyzer, BinaryPredicate predicate)
+ throws ImpalaException {
Operation op = getIcebergOperator(predicate.getOp());
- if (op == null) return false;
+ if (op == null) return;
- if (!(predicate.getChild(0) instanceof SlotRef)) return false;
+ // Do not convert if there is an implicit cast
+ if (!(predicate.getChild(0) instanceof SlotRef)) return;
SlotRef ref = (SlotRef) predicate.getChild(0);
- if (!(predicate.getChild(1) instanceof LiteralExpr)) return false;
+ if (!(predicate.getChild(1) instanceof LiteralExpr)) return;
LiteralExpr literal = (LiteralExpr) predicate.getChild(1);
// If predicate contains map/struct, this column would be null
- if (ref.getDesc().getColumn() == null) return false;
+ Column col = ref.getDesc().getColumn();
+ if (col == null) return;
+
+ Object value = getIcebergValue(analyzer, ref, literal);
+ if (value == null) return;
+
+ icebergPredicates_.add(Expressions.predicate(op, col.getName(), value));
+ }
+
+ private void convertIcebergPredicate(Analyzer analyzer, InPredicate predicate)
+ throws ImpalaException {
+ // TODO: convert NOT_IN predicate
+ if (predicate.isNotIn()) return;
+
+ // Do not convert if there is an implicit cast
+ if (!(predicate.getChild(0) instanceof SlotRef)) return;
+ SlotRef ref = (SlotRef) predicate.getChild(0);
+
+ // If predicate contains map/struct, this column would be null
+ Column col = ref.getDesc().getColumn();
+ if (col == null) return;
- IcebergColumn iceCol = (IcebergColumn)ref.getDesc().getColumn();
+ // Expressions takes a list of values as Objects
+ List<Object> values = new ArrayList<>();
+ for (int i = 1; i < predicate.getChildren().size(); ++i) {
+ if (!Expr.IS_LITERAL.apply(predicate.getChild(i))) return;
+ LiteralExpr literal = (LiteralExpr) predicate.getChild(i);
+
+ // Cannot push IN or NOT_IN predicate with null literal values
+ if (Expr.IS_NULL_LITERAL.apply(literal)) return;
+
+ Object value = getIcebergValue(analyzer, ref, literal);
+ if (value == null) return;
+
+ values.add(value);
+ }
+
+ icebergPredicates_.add(Expressions.in(col.getName(), values));
+ }
+
+ private Object getIcebergValue(Analyzer analyzer, SlotRef ref, LiteralExpr literal)
+ throws ImpalaException {
+ IcebergColumn iceCol = (IcebergColumn) ref.getDesc().getColumn();
Schema iceSchema = icebergTable_.getIcebergSchema();
- String colName = iceCol.getName();
- UnboundPredicate unboundPredicate = null;
switch (literal.getType().getPrimitiveType()) {
- case BOOLEAN: {
- unboundPredicate = Expressions.predicate(op, colName,
- ((BoolLiteral) literal).getValue());
- break;
- }
+ case BOOLEAN: return ((BoolLiteral) literal).getValue();
case TINYINT:
case SMALLINT:
- case INT: {
- unboundPredicate = Expressions.predicate(op, colName,
- ((NumericLiteral) literal).getIntValue());
- break;
- }
- case BIGINT: {
- unboundPredicate = Expressions.predicate(op, colName,
- ((NumericLiteral) literal).getLongValue());
- break;
- }
- case FLOAT: {
- unboundPredicate = Expressions.predicate(op, colName,
- (float)((NumericLiteral) literal).getDoubleValue());
- break;
- }
- case DOUBLE: {
- unboundPredicate = Expressions.predicate(op, colName,
- ((NumericLiteral) literal).getDoubleValue());
- break;
- }
+ case INT: return ((NumericLiteral) literal).getIntValue();
+ case BIGINT: return ((NumericLiteral) literal).getLongValue();
+ case FLOAT: return (float) ((NumericLiteral) literal).getDoubleValue();
+ case DOUBLE: return ((NumericLiteral) literal).getDoubleValue();
case STRING:
case DATETIME:
- case CHAR: {
- unboundPredicate = Expressions.predicate(op, colName,
- ((StringLiteral) literal).getUnescapedValue());
- break;
- }
- case TIMESTAMP: {
- try {
- org.apache.iceberg.types.Type iceType = iceSchema.findType(iceCol.getFieldId());
- Preconditions.checkState(iceType instanceof Types.TimestampType);
- Types.TimestampType tsType = (Types.TimestampType)iceType;
- long unixMicros = 0;
- if (tsType.shouldAdjustToUTC()) {
- unixMicros = ExprUtil.localTimestampToUnixTimeMicros(analyzer, literal);
- } else {
- unixMicros = ExprUtil.utcTimestampToUnixTimeMicros(analyzer, literal);
- }
- unboundPredicate = Expressions.predicate(op, colName, unixMicros);
- } catch (InternalException ex) {
- // We cannot interpret the timestamp literal. Maybe the timestamp is invalid,
- // or the local timestamp ambigously converts to UTC due to daylight saving
- // time backward turn. E.g. '2021-10-31 02:15:00 Europe/Budapest' converts to
- // either '2021-10-31 00:15:00 UTC' or '2021-10-31 01:15:00 UTC'.
- LOG.warn("Exception occurred during timestamp conversion: " + ex.toString() +
- "\nThis means timestamp predicate is not pushed to Iceberg, let Impala " +
- "backend handle it.");
- }
- break;
+ case CHAR: return ((StringLiteral) literal).getUnescapedValue();
+ case TIMESTAMP: return getIcebergTsValue(analyzer, literal, iceCol, iceSchema);
+ case DATE: return ((DateLiteral) literal).getValue();
+ case DECIMAL: return getIcebergDecimalValue(ref, (NumericLiteral) literal);
+ default: {
+ Preconditions.checkState(false,
+ "Unsupported iceberg type considered for predicate: %s",
+ literal.getType().toSql());
}
- case DATE: {
- int daysSinceEpoch = ((DateLiteral) literal).getValue();
- unboundPredicate = Expressions.predicate(op, colName, daysSinceEpoch);
- break;
- }
- case DECIMAL: {
- Type colType = ref.getDesc().getColumn().getType();
- int scale = colType.getDecimalDigits();
- BigDecimal literalValue = ((NumericLiteral) literal).getValue();
- if (literalValue.scale() <= scale) {
- // Iceberg DecimalLiteral needs to have the exact same scale.
- if (literalValue.scale() < scale) literalValue = literalValue.setScale(scale);
- unboundPredicate = Expressions.predicate(op, colName, literalValue);
- }
- break;
- }
- default: break;
}
- if (unboundPredicate == null) return false;
+ return null;
+ }
- icebergPredicates_.add(unboundPredicate);
+ private BigDecimal getIcebergDecimalValue(SlotRef ref, NumericLiteral literal) {
+ Type colType = ref.getDesc().getColumn().getType();
+ int scale = colType.getDecimalDigits();
+ BigDecimal literalValue = literal.getValue();
- return true;
+ if (literalValue.scale() > scale) return null;
+ // Iceberg DecimalLiteral needs to have the exact same scale.
+ if (literalValue.scale() < scale) return literalValue.setScale(scale);
+ return literalValue;
}
- /**
- * Returns Iceberg operator by BinaryPredicate operator, or null if the operation
- * is not supported by Iceberg.
- */
- private Operation getIcebergOperator(BinaryPredicate.Operator op) {
- switch (op) {
- case EQ: return Operation.EQ;
- case NE: return Operation.NOT_EQ;
- case LE: return Operation.LT_EQ;
- case GE: return Operation.GT_EQ;
- case LT: return Operation.LT;
- case GT: return Operation.GT;
- default: return null;
+ private Object getIcebergTsValue(Analyzer analyzer, LiteralExpr literal,
+ IcebergColumn iceCol, Schema iceSchema) throws AnalysisException {
+ try {
+ org.apache.iceberg.types.Type iceType = iceSchema.findType(iceCol.getFieldId());
+ Preconditions.checkState(iceType instanceof Types.TimestampType);
+ Types.TimestampType tsType = (Types.TimestampType) iceType;
+ if (tsType.shouldAdjustToUTC()) {
+ return ExprUtil.localTimestampToUnixTimeMicros(analyzer, literal);
+ } else {
+ return ExprUtil.utcTimestampToUnixTimeMicros(analyzer, literal);
+ }
+ } catch (InternalException ex) {
+ // We cannot interpret the timestamp literal. Maybe the timestamp is invalid,
+ // or the local timestamp ambigously converts to UTC due to daylight saving
+ // time backward turn. E.g. '2021-10-31 02:15:00 Europe/Budapest' converts to
+ // either '2021-10-31 00:15:00 UTC' or '2021-10-31 01:15:00 UTC'.
+ LOG.warn("Exception occurred during timestamp conversion: " + ex.toString() +
+ "\nThis means timestamp predicate is not pushed to Iceberg, let Impala " +
+ "backend handle it.");
}
+ return null;
}
}
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
new file mode 100644
index 000000000..8aa4ae1b4
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
@@ -0,0 +1,624 @@
+====
+---- QUERY
+# Test INT/BIGINT/FLOAT/DOUBLE/STRING/TIMESTAMP/DATE InPredicate push-down
+create table ice_pred_pd1 (
+ col_i INT,
+ col_bi BIGINT,
+ col_f FLOAT,
+ col_db DOUBLE,
+ col_str STRING,
+ col_ts TIMESTAMP,
+ col_dt DATE
+) partitioned by spec (col_i) stored as iceberg tblproperties ('write.format.default' = 'parquet');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into
+ ice_pred_pd1
+values
+ (0, 12345678900, 3.140, 2.71820, 'Leonhard_A', '1400-01-01 00:00:00', DATE'1400-01-01'),
+ (0, 12345678901, 3.141, 2.71821, 'Leonhard_B', '1400-01-01 01:01:01', DATE'1400-01-01'),
+ (0, 12345678902, 3.142, 2.71822, 'Leonhard_C', '1400-01-01 02:02:02', DATE'1400-01-01'),
+ (1, 12345678903, 3.143, 2.71823, 'Leonhard_D', '1500-01-01 00:00:00', DATE'1500-01-01'),
+ (1, 12345678904, 3.144, 2.71824, 'Leonhard_E', '1500-01-01 01:01:01', DATE'1500-01-01'),
+ (1, 12345678905, 3.145, 2.71825, 'Leonhard_F', '1500-01-01 02:02:02', DATE'1500-01-01'),
+ (2, 12345678906, 3.146, 2.71826, 'Leonhard_G', '1600-01-01 00:00:00', DATE'1600-01-01'),
+ (2, 12345678907, 3.147, 2.71827, 'Leonhard_H', '1600-01-01 01:01:01', DATE'1600-01-01'),
+ (2, 12345678908, 3.148, 2.71828, 'Leonhard_I', '1600-01-01 02:02:02', DATE'1600-01-01');
+select count(1) from ice_pred_pd1;
+---- RESULTS
+9
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 3
+====
+---- QUERY
+show files in ice_pred_pd1;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_pred_pd1/data/col_i=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_pred_pd1/data/col_i=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_pred_pd1/data/col_i=2/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Start testing predicate push-down for int column
+# The IN predicate matches all row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_i in (0, 1, 2);
+---- RESULTS
+9
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches two row group
+# InList: constant expr
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_i in (ceil(-0.1), 1 * 2);
+---- RESULTS
+6
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_i in (0);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_i in (-1, 3);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# The IN predicate will not push down
+# InList: implicit cast
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_i in (0, 2.0);
+---- RESULTS
+6
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate will not push down
+# InList: non-constant expr
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_i in (col_bi);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate will not push down
+# InList: mixed constant and non-constant exprs
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_i in (cast(col_bi as int), 2);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+create table ice_pred_pd1_int (
+ col_i INT
+) stored as iceberg tblproperties ('write.format.default' = 'parquet');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into
+ ice_pred_pd1_int
+values
+ (1),
+ (3),
+ (5);
+select
+ count(1)
+from
+ ice_pred_pd1_int;
+---- RESULTS
+3
+====
+---- QUERY
+# The IN predicate will not push down
+# InList: select clause
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_i in (select distinct col_i from ice_pred_pd1_int);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+====
+---- QUERY
+# Start testing predicate push-down for bigint column
+# The IN predicate matches all row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_bi in (12345678900, 12345678903, 12345678906);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_bi in (12345678900);
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_bi in (12345678909);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for float column
+# The IN predicate matches all row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_f in (3.140, 3.143, 3.146);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_f in (3.140);
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_f in (3.149);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for double column
+# The IN predicate matches all row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_db in (2.71820, 2.71823, 2.71826);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_db in (2.71820);
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_db in (2.71829);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for string column
+# The IN predicate matches all row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_str in ('Leonhard_A', 'Leonhard_D', 'Leonhard_G');
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_str in ('Leonhard_A');
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_str in ('Leonhard_J');
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for timestamp column
+# The IN predicate matches all row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_ts in ('1400-01-01 00:00:00', '1500-01-01 00:00:00', '1600-01-01 00:00:00');
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_ts in ('1400-01-01 00:00:00');
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_ts in ('1700-01-01 00:00:00');
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for date column
+# The IN predicate matches all row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_dt in (DATE'1400-01-01', DATE'1500-01-01', DATE'1600-01-01');
+---- RESULTS
+9
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_dt in (DATE'1400-01-01');
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+ count(1)
+from
+ ice_pred_pd1
+where
+ col_dt in (DATE'1700-01-01');
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Test DECIMAL(9, 3)/DECIMAL(18, 3)/DECIMAL(38, 3) InPredicate push-down
+create table ice_pred_pd2 (
+ p INT,
+ d1 DECIMAL(9, 3),
+ d2 DECIMAL(18, 3),
+ d3 DECIMAL(38, 3)
+) partitioned by spec (p) stored as iceberg tblproperties ('write.format.default' = 'parquet');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into
+ ice_pred_pd2
+values
+ (
+ 0,
+ 123.450,
+ 1234567890.120,
+ 1234567890123456789.010
+ ),
+ (
+ 0,
+ 123.451,
+ 1234567890.121,
+ 1234567890123456789.011
+ ),
+ (
+ 0,
+ 123.452,
+ 1234567890.122,
+ 1234567890123456789.012
+ ),
+ (
+ 1,
+ 123.453,
+ 1234567890.123,
+ 1234567890123456789.013
+ ),
+ (
+ 1,
+ 123.454,
+ 1234567890.124,
+ 1234567890123456789.014
+ ),
+ (
+ 1,
+ 123.455,
+ 1234567890.125,
+ 1234567890123456789.015
+ ),
+ (
+ 2,
+ 123.456,
+ 1234567890.126,
+ 1234567890123456789.016
+ ),
+ (
+ 2,
+ 123.457,
+ 1234567890.127,
+ 1234567890123456789.017
+ ),
+ (
+ 2,
+ 123.458,
+ 1234567890.128,
+ 1234567890123456789.018
+ );
+select count(1) from ice_pred_pd2;
+---- RESULTS
+9
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 3
+====
+---- QUERY
+show files in ice_pred_pd2;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_pred_pd2/data/p=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_pred_pd2/data/p=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_pred_pd2/data/p=2/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Start testing predicate push-down for decimal(9, 3) column
+# The IN predicate matches all row group
+select
+ count(1)
+from
+ ice_pred_pd2
+where
+ d1 in (123.450, 123.453, 123.456);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+ count(1)
+from
+ ice_pred_pd2
+where
+ d1 in (123.450, 123.451, 123.452);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+ count(1)
+from
+ ice_pred_pd2
+where
+ d1 in (123.459);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for decimal(18, 3) column
+# The IN predicate matches all row group
+select
+ count(1)
+from
+ ice_pred_pd2
+where
+ d2 in (1234567890.120, 1234567890.123, 1234567890.126);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+ count(1)
+from
+ ice_pred_pd2
+where
+ d2 in (1234567890.120, 1234567890.121, 1234567890.122);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+ count(1)
+from
+ ice_pred_pd2
+where
+ d2 in (1234567890.129);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for decimal(38, 3) column
+# The IN predicate matches all row group
+select
+ count(1)
+from
+ ice_pred_pd2
+where
+ d3 in (1234567890123456789.010, 1234567890123456789.013, 1234567890123456789.016);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+ count(1)
+from
+ ice_pred_pd2
+where
+ d3 in (1234567890123456789.010, 1234567890123456789.011, 1234567890123456789.012);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+ count(1)
+from
+ ice_pred_pd2
+where
+ d3 in (1234567890123456789.019);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
\ No newline at end of file
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 725671a35..aab727fb6 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -596,3 +596,8 @@ class TestIcebergTable(ImpalaTestSuite):
profile = result.runtime_profile
split_stats = collect_split_stats(profile)
assert ref_split_stats == split_stats
+
+ @pytest.mark.execute_serially
+ def test_in_predicate_push_down(self, vector, unique_database):
+ self.run_test_case('QueryTest/iceberg-in-predicate-push-down', vector,
+ use_db=unique_database)