You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2022/05/04 16:51:20 UTC

[impala] branch master updated: IMPALA-11277: Push-down IN predicate to iceberg

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 93fe446b8 IMPALA-11277: Push-down IN predicate to iceberg
93fe446b8 is described below

commit 93fe446b851ff96f7f3d229c355d83daa5e8f89e
Author: LPL <li...@sensorsdata.cn>
AuthorDate: Mon May 2 00:40:02 2022 +0800

    IMPALA-11277: Push-down IN predicate to iceberg
    
    Iceberg provides a rich API to push predicates down. Currently we only
    push BinaryPredicate such as EQ, LT, GT, etc. This commit implements IN
    predicate push down.
    
    Testing:
      - Added end-to-end testing for pushing down IN predicate to iceberg
    
    Change-Id: Id4be9aa31a6353021b0eabc4485306c0b0e8bb07
    Reviewed-on: http://gerrit.cloudera.org:8080/18463
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/planner/IcebergScanNode.java | 220 ++++----
 .../QueryTest/iceberg-in-predicate-push-down.test  | 624 +++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |   5 +
 3 files changed, 749 insertions(+), 100 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index 1a998b2d3..1035cd367 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -22,7 +22,6 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.ListIterator;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.DataFile;
@@ -36,6 +35,7 @@ import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.BoolLiteral;
 import org.apache.impala.analysis.DateLiteral;
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.InPredicate;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.MultiAggregateInfo;
 import org.apache.impala.analysis.NumericLiteral;
@@ -44,6 +44,7 @@ import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.analysis.TableRef;
 import org.apache.impala.analysis.TimeTravelSpec;
 import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
@@ -52,6 +53,7 @@ import org.apache.impala.catalog.IcebergColumn;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.InternalException;
@@ -184,132 +186,150 @@ public class IcebergScanNode extends HdfsScanNode {
    * please refer: https://iceberg.apache.org/spec/#scan-planning
    */
   private void extractIcebergConjuncts(Analyzer analyzer) throws ImpalaException {
-    ListIterator<Expr> it = conjuncts_.listIterator();
-    while (it.hasNext()) {
-      tryConvertBinaryIcebergPredicate(analyzer, it.next());
+    for (Expr expr : conjuncts_) {
+      tryConvertIcebergPredicate(analyzer, expr);
     }
   }
 
   /**
-   * Transform impala binary predicate to iceberg predicate
+   * Returns Iceberg operator by BinaryPredicate operator, or null if the operation
+   * is not supported by Iceberg.
+   */
+  private Operation getIcebergOperator(BinaryPredicate.Operator op) {
+    switch (op) {
+      case EQ: return Operation.EQ;
+      case NE: return Operation.NOT_EQ;
+      case LE: return Operation.LT_EQ;
+      case GE: return Operation.GT_EQ;
+      case LT: return Operation.LT;
+      case GT: return Operation.GT;
+      default: return null;
+    }
+  }
+
+  /**
+   * Transform impala predicate to iceberg predicate
    */
-  private boolean tryConvertBinaryIcebergPredicate(Analyzer analyzer, Expr expr)
+  private void tryConvertIcebergPredicate(Analyzer analyzer, Expr expr)
       throws ImpalaException {
-    if (! (expr instanceof BinaryPredicate)) return false;
+    if (expr instanceof BinaryPredicate) {
+      convertIcebergPredicate(analyzer, (BinaryPredicate) expr);
+    } else if (expr instanceof InPredicate) {
+      convertIcebergPredicate(analyzer, (InPredicate) expr);
+    }
+  }
 
-    BinaryPredicate predicate = (BinaryPredicate) expr;
+  private void convertIcebergPredicate(Analyzer analyzer, BinaryPredicate predicate)
+      throws ImpalaException {
     Operation op = getIcebergOperator(predicate.getOp());
-    if (op == null) return false;
+    if (op == null) return;
 
-    if (!(predicate.getChild(0) instanceof SlotRef)) return false;
+    // Do not convert if there is an implicit cast
+    if (!(predicate.getChild(0) instanceof SlotRef)) return;
     SlotRef ref = (SlotRef) predicate.getChild(0);
 
-    if (!(predicate.getChild(1) instanceof LiteralExpr)) return false;
+    if (!(predicate.getChild(1) instanceof LiteralExpr)) return;
     LiteralExpr literal = (LiteralExpr) predicate.getChild(1);
 
     // If predicate contains map/struct, this column would be null
-    if (ref.getDesc().getColumn() == null) return false;
+    Column col = ref.getDesc().getColumn();
+    if (col == null) return;
+
+    Object value = getIcebergValue(analyzer, ref, literal);
+    if (value == null) return;
+
+    icebergPredicates_.add(Expressions.predicate(op, col.getName(), value));
+  }
+
+  private void convertIcebergPredicate(Analyzer analyzer, InPredicate predicate)
+      throws ImpalaException {
+    // TODO: convert NOT_IN predicate
+    if (predicate.isNotIn()) return;
+
+    // Do not convert if there is an implicit cast
+    if (!(predicate.getChild(0) instanceof SlotRef)) return;
+    SlotRef ref = (SlotRef) predicate.getChild(0);
+
+    // If predicate contains map/struct, this column would be null
+    Column col = ref.getDesc().getColumn();
+    if (col == null) return;
 
-    IcebergColumn iceCol = (IcebergColumn)ref.getDesc().getColumn();
+    // Expressions takes a list of values as Objects
+    List<Object> values = new ArrayList<>();
+    for (int i = 1; i < predicate.getChildren().size(); ++i) {
+      if (!Expr.IS_LITERAL.apply(predicate.getChild(i))) return;
+      LiteralExpr literal = (LiteralExpr) predicate.getChild(i);
+
+      // Cannot push IN or NOT_IN predicate with null literal values
+      if (Expr.IS_NULL_LITERAL.apply(literal)) return;
+
+      Object value = getIcebergValue(analyzer, ref, literal);
+      if (value == null) return;
+
+      values.add(value);
+    }
+
+    icebergPredicates_.add(Expressions.in(col.getName(), values));
+  }
+
+  private Object getIcebergValue(Analyzer analyzer, SlotRef ref, LiteralExpr literal)
+      throws ImpalaException {
+    IcebergColumn iceCol = (IcebergColumn) ref.getDesc().getColumn();
     Schema iceSchema = icebergTable_.getIcebergSchema();
-    String colName = iceCol.getName();
-    UnboundPredicate unboundPredicate = null;
     switch (literal.getType().getPrimitiveType()) {
-      case BOOLEAN: {
-        unboundPredicate = Expressions.predicate(op, colName,
-            ((BoolLiteral) literal).getValue());
-        break;
-      }
+      case BOOLEAN: return ((BoolLiteral) literal).getValue();
       case TINYINT:
       case SMALLINT:
-      case INT: {
-        unboundPredicate = Expressions.predicate(op, colName,
-            ((NumericLiteral) literal).getIntValue());
-        break;
-      }
-      case BIGINT: {
-        unboundPredicate = Expressions.predicate(op, colName,
-            ((NumericLiteral) literal).getLongValue());
-        break;
-      }
-      case FLOAT: {
-        unboundPredicate = Expressions.predicate(op, colName,
-            (float)((NumericLiteral) literal).getDoubleValue());
-        break;
-      }
-      case DOUBLE: {
-        unboundPredicate = Expressions.predicate(op, colName,
-            ((NumericLiteral) literal).getDoubleValue());
-        break;
-      }
+      case INT: return ((NumericLiteral) literal).getIntValue();
+      case BIGINT: return ((NumericLiteral) literal).getLongValue();
+      case FLOAT: return (float) ((NumericLiteral) literal).getDoubleValue();
+      case DOUBLE: return ((NumericLiteral) literal).getDoubleValue();
       case STRING:
       case DATETIME:
-      case CHAR: {
-        unboundPredicate = Expressions.predicate(op, colName,
-            ((StringLiteral) literal).getUnescapedValue());
-        break;
-      }
-      case TIMESTAMP: {
-        try {
-          org.apache.iceberg.types.Type iceType = iceSchema.findType(iceCol.getFieldId());
-          Preconditions.checkState(iceType instanceof Types.TimestampType);
-          Types.TimestampType tsType = (Types.TimestampType)iceType;
-          long unixMicros = 0;
-          if (tsType.shouldAdjustToUTC()) {
-            unixMicros = ExprUtil.localTimestampToUnixTimeMicros(analyzer, literal);
-          } else {
-            unixMicros = ExprUtil.utcTimestampToUnixTimeMicros(analyzer, literal);
-          }
-          unboundPredicate = Expressions.predicate(op, colName, unixMicros);
-        } catch (InternalException ex) {
-          // We cannot interpret the timestamp literal. Maybe the timestamp is invalid,
-          // or the local timestamp ambigously converts to UTC due to daylight saving
-          // time backward turn. E.g. '2021-10-31 02:15:00 Europe/Budapest' converts to
-          // either '2021-10-31 00:15:00 UTC' or '2021-10-31 01:15:00 UTC'.
-          LOG.warn("Exception occurred during timestamp conversion: " + ex.toString() +
-              "\nThis means timestamp predicate is not pushed to Iceberg, let Impala " +
-              "backend handle it.");
-        }
-        break;
+      case CHAR: return ((StringLiteral) literal).getUnescapedValue();
+      case TIMESTAMP: return getIcebergTsValue(analyzer, literal, iceCol, iceSchema);
+      case DATE: return ((DateLiteral) literal).getValue();
+      case DECIMAL: return getIcebergDecimalValue(ref, (NumericLiteral) literal);
+      default: {
+        Preconditions.checkState(false,
+            "Unsupported iceberg type considered for predicate: %s",
+            literal.getType().toSql());
       }
-      case DATE: {
-        int daysSinceEpoch = ((DateLiteral) literal).getValue();
-        unboundPredicate = Expressions.predicate(op, colName, daysSinceEpoch);
-        break;
-      }
-      case DECIMAL: {
-        Type colType = ref.getDesc().getColumn().getType();
-        int scale = colType.getDecimalDigits();
-        BigDecimal literalValue = ((NumericLiteral) literal).getValue();
-        if (literalValue.scale() <= scale) {
-          // Iceberg DecimalLiteral needs to have the exact same scale.
-          if (literalValue.scale() < scale) literalValue = literalValue.setScale(scale);
-          unboundPredicate = Expressions.predicate(op, colName, literalValue);
-        }
-        break;
-      }
-      default: break;
     }
-    if (unboundPredicate == null) return false;
+    return null;
+  }
 
-    icebergPredicates_.add(unboundPredicate);
+  private BigDecimal getIcebergDecimalValue(SlotRef ref, NumericLiteral literal) {
+    Type colType = ref.getDesc().getColumn().getType();
+    int scale = colType.getDecimalDigits();
+    BigDecimal literalValue = literal.getValue();
 
-    return true;
+    if (literalValue.scale() > scale) return null;
+    // Iceberg DecimalLiteral needs to have the exact same scale.
+    if (literalValue.scale() < scale) return literalValue.setScale(scale);
+    return literalValue;
   }
 
-  /**
-   * Returns Iceberg operator by BinaryPredicate operator, or null if the operation
-   * is not supported by Iceberg.
-   */
-  private Operation getIcebergOperator(BinaryPredicate.Operator op) {
-    switch (op) {
-      case EQ: return Operation.EQ;
-      case NE: return Operation.NOT_EQ;
-      case LE: return Operation.LT_EQ;
-      case GE: return Operation.GT_EQ;
-      case LT: return Operation.LT;
-      case GT: return Operation.GT;
-      default: return null;
+  private Object getIcebergTsValue(Analyzer analyzer, LiteralExpr literal,
+      IcebergColumn iceCol, Schema iceSchema) throws AnalysisException {
+    try {
+      org.apache.iceberg.types.Type iceType = iceSchema.findType(iceCol.getFieldId());
+      Preconditions.checkState(iceType instanceof Types.TimestampType);
+      Types.TimestampType tsType = (Types.TimestampType) iceType;
+      if (tsType.shouldAdjustToUTC()) {
+        return ExprUtil.localTimestampToUnixTimeMicros(analyzer, literal);
+      } else {
+        return ExprUtil.utcTimestampToUnixTimeMicros(analyzer, literal);
+      }
+    } catch (InternalException ex) {
+      // We cannot interpret the timestamp literal. Maybe the timestamp is invalid,
+      // or the local timestamp ambigously converts to UTC due to daylight saving
+      // time backward turn. E.g. '2021-10-31 02:15:00 Europe/Budapest' converts to
+      // either '2021-10-31 00:15:00 UTC' or '2021-10-31 01:15:00 UTC'.
+      LOG.warn("Exception occurred during timestamp conversion: " + ex.toString() +
+          "\nThis means timestamp predicate is not pushed to Iceberg, let Impala " +
+          "backend handle it.");
     }
+    return null;
   }
 }
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
new file mode 100644
index 000000000..8aa4ae1b4
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
@@ -0,0 +1,624 @@
+====
+---- QUERY
+# Test INT/BIGINT/FLOAT/DOUBLE/STRING/TIMESTAMP/DATE InPredicate push-down
+create table ice_pred_pd1 (
+  col_i INT,
+  col_bi BIGINT,
+  col_f FLOAT,
+  col_db DOUBLE,
+  col_str STRING,
+  col_ts TIMESTAMP,
+  col_dt DATE
+) partitioned by spec (col_i) stored as iceberg tblproperties ('write.format.default' = 'parquet');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into
+  ice_pred_pd1
+values
+  (0, 12345678900, 3.140, 2.71820, 'Leonhard_A', '1400-01-01 00:00:00', DATE'1400-01-01'),
+  (0, 12345678901, 3.141, 2.71821, 'Leonhard_B', '1400-01-01 01:01:01', DATE'1400-01-01'),
+  (0, 12345678902, 3.142, 2.71822, 'Leonhard_C', '1400-01-01 02:02:02', DATE'1400-01-01'),
+  (1, 12345678903, 3.143, 2.71823, 'Leonhard_D', '1500-01-01 00:00:00', DATE'1500-01-01'),
+  (1, 12345678904, 3.144, 2.71824, 'Leonhard_E', '1500-01-01 01:01:01', DATE'1500-01-01'),
+  (1, 12345678905, 3.145, 2.71825, 'Leonhard_F', '1500-01-01 02:02:02', DATE'1500-01-01'),
+  (2, 12345678906, 3.146, 2.71826, 'Leonhard_G', '1600-01-01 00:00:00', DATE'1600-01-01'),
+  (2, 12345678907, 3.147, 2.71827, 'Leonhard_H', '1600-01-01 01:01:01', DATE'1600-01-01'),
+  (2, 12345678908, 3.148, 2.71828, 'Leonhard_I', '1600-01-01 02:02:02', DATE'1600-01-01');
+select count(1) from ice_pred_pd1;
+---- RESULTS
+9
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 3
+====
+---- QUERY
+show files in ice_pred_pd1;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_pred_pd1/data/col_i=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_pred_pd1/data/col_i=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_pred_pd1/data/col_i=2/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Start testing predicate push-down for int column
+# The IN predicate matches all row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_i in (0, 1, 2);
+---- RESULTS
+9
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches two row group
+# InList: constant expr
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_i in (ceil(-0.1), 1 * 2);
+---- RESULTS
+6
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_i in (0);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_i in (-1, 3);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# The IN predicate will not push down
+# InList: implicit cast
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_i in (0, 2.0);
+---- RESULTS
+6
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate will not push down
+# InList: non-constant expr
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_i in (col_bi);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate will not push down
+# InList: mixed constant and non-constant exprs
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_i in (cast(col_bi as int), 2);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+create table ice_pred_pd1_int (
+  col_i INT
+) stored as iceberg tblproperties ('write.format.default' = 'parquet');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into
+  ice_pred_pd1_int
+values
+  (1),
+  (3),
+  (5);
+select
+  count(1)
+from
+  ice_pred_pd1_int;
+---- RESULTS
+3
+====
+---- QUERY
+# The IN predicate will not push down
+# InList: select clause
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_i in (select distinct col_i from ice_pred_pd1_int);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+====
+---- QUERY
+# Start testing predicate push-down for bigint column
+# The IN predicate matches all row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_bi in (12345678900, 12345678903, 12345678906);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_bi in (12345678900);
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_bi in (12345678909);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for float column
+# The IN predicate matches all row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_f in (3.140, 3.143, 3.146);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_f in (3.140);
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_f in (3.149);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for double column
+# The IN predicate matches all row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_db in (2.71820, 2.71823, 2.71826);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_db in (2.71820);
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_db in (2.71829);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for string column
+# The IN predicate matches all row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_str in ('Leonhard_A', 'Leonhard_D', 'Leonhard_G');
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_str in ('Leonhard_A');
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_str in ('Leonhard_J');
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for timestamp column
+# The IN predicate matches all row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_ts in ('1400-01-01 00:00:00', '1500-01-01 00:00:00', '1600-01-01 00:00:00');
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_ts in ('1400-01-01 00:00:00');
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_ts in ('1700-01-01 00:00:00');
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for date column
+# The IN predicate matches all row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_dt in (DATE'1400-01-01', DATE'1500-01-01', DATE'1600-01-01');
+---- RESULTS
+9
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_dt in (DATE'1400-01-01');
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_dt in (DATE'1700-01-01');
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Test DECIMAL(9, 3)/DECIMAL(18, 3)/DECIMAL(38, 3) InPredicate push-down
+create table ice_pred_pd2 (
+  p INT,
+  d1 DECIMAL(9, 3),
+  d2 DECIMAL(18, 3),
+  d3 DECIMAL(38, 3)
+) partitioned by spec (p) stored as iceberg tblproperties ('write.format.default' = 'parquet');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into
+  ice_pred_pd2
+values
+  (
+    0,
+    123.450,
+    1234567890.120,
+    1234567890123456789.010
+  ),
+  (
+    0,
+    123.451,
+    1234567890.121,
+    1234567890123456789.011
+  ),
+  (
+    0,
+    123.452,
+    1234567890.122,
+    1234567890123456789.012
+  ),
+  (
+    1,
+    123.453,
+    1234567890.123,
+    1234567890123456789.013
+  ),
+  (
+    1,
+    123.454,
+    1234567890.124,
+    1234567890123456789.014
+  ),
+  (
+    1,
+    123.455,
+    1234567890.125,
+    1234567890123456789.015
+  ),
+  (
+    2,
+    123.456,
+    1234567890.126,
+    1234567890123456789.016
+  ),
+  (
+    2,
+    123.457,
+    1234567890.127,
+    1234567890123456789.017
+  ),
+  (
+    2,
+    123.458,
+    1234567890.128,
+    1234567890123456789.018
+  );
+select count(1) from ice_pred_pd2;
+---- RESULTS
+9
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 3
+====
+---- QUERY
+show files in ice_pred_pd2;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_pred_pd2/data/p=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_pred_pd2/data/p=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_pred_pd2/data/p=2/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Start testing predicate push-down for decimal(9, 3) column
+# The IN predicate matches all row group
+select
+  count(1)
+from
+  ice_pred_pd2
+where
+  d1 in (123.450, 123.453, 123.456);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+  count(1)
+from
+  ice_pred_pd2
+where
+  d1 in (123.450, 123.451, 123.452);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+  count(1)
+from
+  ice_pred_pd2
+where
+  d1 in (123.459);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for decimal(18, 3) column
+# The IN predicate matches all row group
+select
+  count(1)
+from
+  ice_pred_pd2
+where
+  d2 in (1234567890.120, 1234567890.123, 1234567890.126);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+  count(1)
+from
+  ice_pred_pd2
+where
+  d2 in (1234567890.120, 1234567890.121, 1234567890.122);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+  count(1)
+from
+  ice_pred_pd2
+where
+  d2 in (1234567890.129);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# Start testing predicate push-down for decimal(38, 3) column
+# The IN predicate matches all row group
+select
+  count(1)
+from
+  ice_pred_pd2
+where
+  d3 in (1234567890123456789.010, 1234567890123456789.013, 1234567890123456789.016);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+# The IN predicate matches one row group
+select
+  count(1)
+from
+  ice_pred_pd2
+where
+  d3 in (1234567890123456789.010, 1234567890123456789.011, 1234567890123456789.012);
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+# The IN predicate does not match any row group
+select
+  count(1)
+from
+  ice_pred_pd2
+where
+  d3 in (1234567890123456789.019);
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
\ No newline at end of file
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 725671a35..aab727fb6 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -596,3 +596,8 @@ class TestIcebergTable(ImpalaTestSuite):
         profile = result.runtime_profile
         split_stats = collect_split_stats(profile)
         assert ref_split_stats == split_stats
+
+  @pytest.mark.execute_serially
+  def test_in_predicate_push_down(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-in-predicate-push-down', vector,
+                       use_db=unique_database)