You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/04 04:26:44 UTC

[flink-table-store] branch master updated: [FLINK-28063] Introduce dedicated In and NotIn predicate function

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new d11a287c [FLINK-28063] Introduce dedicated In and NotIn predicate function
d11a287c is described below

commit d11a287c756713c6b16a6c93291566a780f97b4a
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Jul 4 12:26:40 2022 +0800

    [FLINK-28063] Introduce dedicated In and NotIn predicate function
    
    This closes #189
---
 .../flink/table/store/file/predicate/In.java       |  71 ++
 .../flink/table/store/file/predicate/NotIn.java    |  71 ++
 .../store/file/predicate/PredicateBuilder.java     |  12 +-
 .../store/file/predicate/PredicateConverter.java   |   9 +
 .../store/file/predicate/PredicateBuilderTest.java |  70 --
 .../file/predicate/PredicateConverterTest.java     | 553 +++++++++++++++-
 .../table/store/file/predicate/PredicateTest.java  | 736 +++------------------
 .../SearchArgumentToPredicateConverterTest.java    |  20 +-
 .../table/store/spark/SparkFilterConverter.java    |  12 +
 .../store/spark/SparkFilterConverterTest.java      |   7 +
 10 files changed, 817 insertions(+), 744 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
new file mode 100644
index 00000000..aa338945
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafFunction} to eval in. */
+public class In implements LeafFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final In INSTANCE = new In();
+
+    private In() {}
+
+    @Override
+    public boolean test(LogicalType type, Object field, List<Object> literals) {
+        if (field == null) {
+            return false;
+        }
+        for (Object literal : literals) {
+            if (literal != null && compareLiteral(type, literal, field) == 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean test(
+            LogicalType type, long rowCount, FieldStats fieldStats, List<Object> literals) {
+        if (rowCount == fieldStats.nullCount()) {
+            return false;
+        }
+        for (Object literal : literals) {
+            if (literal != null
+                    && compareLiteral(type, literal, fieldStats.minValue()) >= 0
+                    && compareLiteral(type, literal, fieldStats.maxValue()) <= 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(NotIn.INSTANCE);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
new file mode 100644
index 00000000..d710da0f
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafFunction} to eval not in. */
+public class NotIn implements LeafFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final NotIn INSTANCE = new NotIn();
+
+    private NotIn() {}
+
+    @Override
+    public boolean test(LogicalType type, Object field, List<Object> literals) {
+        if (field == null) {
+            return false;
+        }
+        for (Object literal : literals) {
+            if (literal == null || compareLiteral(type, literal, field) == 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public boolean test(
+            LogicalType type, long rowCount, FieldStats fieldStats, List<Object> literals) {
+        if (rowCount == fieldStats.nullCount()) {
+            return false;
+        }
+        for (Object literal : literals) {
+            if (literal == null
+                    || (compareLiteral(type, literal, fieldStats.minValue()) == 0
+                            && compareLiteral(type, literal, fieldStats.maxValue()) == 0)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(In.INSTANCE);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
index c3888ecf..93921a50 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
@@ -95,13 +95,11 @@ public class PredicateBuilder {
     }
 
     public Predicate in(int idx, List<Object> literals) {
-        Preconditions.checkArgument(
-                literals.size() > 0,
-                "There must be at least 1 literal to construct an IN predicate");
-        return literals.stream()
-                .map(l -> equal(idx, l))
-                .reduce((a, b) -> new CompoundPredicate(Or.INSTANCE, Arrays.asList(a, b)))
-                .get();
+        return new LeafPredicate(In.INSTANCE, rowType.getTypeAt(idx), idx, literals);
+    }
+
+    public Predicate notIn(int idx, List<Object> literals) {
+        return new LeafPredicate(NotIn.INSTANCE, rowType.getTypeAt(idx), idx, literals);
     }
 
     public Predicate between(int idx, Object includedLowerBound, Object includedUpperBound) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
index f53ee9bc..baaf768d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -83,6 +84,14 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
             return visitBiFunction(children, builder::lessThan, builder::greaterThan);
         } else if (func == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL) {
             return visitBiFunction(children, builder::lessOrEqual, builder::greaterOrEqual);
+        } else if (func == BuiltInFunctionDefinitions.IN) {
+            FieldReferenceExpression fieldRefExpr =
+                    extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new);
+            List<Object> literals = new ArrayList<>();
+            for (int i = 1; i < children.size(); i++) {
+                literals.add(extractLiteral(fieldRefExpr.getOutputDataType(), children.get(i)));
+            }
+            return builder.in(fieldRefExpr.getInputIndex(), literals);
         } else if (func == BuiltInFunctionDefinitions.IS_NULL) {
             return extractFieldReference(children.get(0))
                     .map(FieldReferenceExpression::getFieldIndex)
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
index 4f3bcbeb..4152b272 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
@@ -31,76 +31,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link PredicateBuilder}. */
 public class PredicateBuilderTest {
 
-    @Test
-    public void testIn() {
-        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        Predicate predicate = builder.in(0, Arrays.asList(1, 3));
-
-        assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
-        assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
-        assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
-        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
-
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
-                .isEqualTo(false);
-    }
-
-    @Test
-    public void testInNull() {
-        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        Predicate predicate = builder.in(0, Arrays.asList(1, null, 3));
-
-        assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
-        assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
-        assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
-        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
-
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
-                .isEqualTo(false);
-    }
-
-    @Test
-    public void testNotIn() {
-        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        Predicate predicate = builder.in(0, Arrays.asList(1, 3)).negate().get();
-
-        assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
-        assertThat(predicate.test(new Object[] {2})).isEqualTo(true);
-        assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
-        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
-
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(true);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
-                .isEqualTo(false);
-    }
-
-    @Test
-    public void testNotInNull() {
-        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        Predicate predicate = builder.in(0, Arrays.asList(1, null, 3)).negate().get();
-
-        assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
-        assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
-        assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
-        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
-
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
-                .isEqualTo(false);
-    }
-
     @Test
     public void testBetween() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
index 7ffa337b..137232ad 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
@@ -19,23 +19,35 @@
 package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.Row;
 
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -63,6 +75,7 @@ public class PredicateConverterTest {
         FieldReferenceExpression longRefExpr =
                 new FieldReferenceExpression("long1", DataTypes.BIGINT(), 0, 0);
         ValueLiteralExpression intLitExpr = new ValueLiteralExpression(10);
+        ValueLiteralExpression intLitExpr2 = new ValueLiteralExpression(20);
         long longLit = 10L;
         ValueLiteralExpression nullLongLitExpr =
                 new ValueLiteralExpression(null, DataTypes.BIGINT());
@@ -189,6 +202,544 @@ public class PredicateConverterTest {
                                                 DataTypes.BOOLEAN())),
                                 DataTypes.BOOLEAN()),
                         PredicateBuilder.or(
-                                BUILDER.notEqual(0, longLit), BUILDER.equal(1, doubleLit))));
+                                BUILDER.notEqual(0, longLit), BUILDER.equal(1, doubleLit))),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.IN,
+                                Arrays.asList(
+                                        longRefExpr, intLitExpr, nullLongLitExpr, intLitExpr2),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.in(0, Arrays.asList(10L, null, 20L))));
+    }
+
+    @MethodSource("provideLikeExpressions")
+    @ParameterizedTest
+    public void testStartsWith(
+            CallExpression callExpression,
+            List<Object[]> valuesList,
+            List<Boolean> expectedForValues,
+            List<Long> rowCountList,
+            List<FieldStats[]> statsList,
+            List<Boolean> expectedForStats) {
+        Predicate predicate =
+                callExpression.accept(new PredicateConverter(RowType.of(new VarCharType())));
+        IntStream.range(0, valuesList.size())
+                .forEach(
+                        i ->
+                                assertThat(predicate.test(valuesList.get(i)))
+                                        .isEqualTo(expectedForValues.get(i)));
+        IntStream.range(0, rowCountList.size())
+                .forEach(
+                        i ->
+                                assertThat(predicate.test(rowCountList.get(i), statsList.get(i)))
+                                        .isEqualTo(expectedForStats.get(i)));
+    }
+
+    public static Stream<Arguments> provideLikeExpressions() {
+        CallExpression expr1 =
+                call(
+                        BuiltInFunctionDefinitions.LIKE,
+                        field(0, STRING()),
+                        literal("abd%", STRING()));
+        List<Object[]> valuesList1 =
+                Arrays.asList(
+                        new Object[] {null},
+                        new Object[] {StringData.fromString("a")},
+                        new Object[] {StringData.fromString("ab")},
+                        new Object[] {StringData.fromString("abd")},
+                        new Object[] {StringData.fromString("abd%")},
+                        new Object[] {StringData.fromString("abd1")},
+                        new Object[] {StringData.fromString("abde@")},
+                        new Object[] {StringData.fromString("abd_")},
+                        new Object[] {StringData.fromString("abd_%")});
+        List<Boolean> expectedForValues1 =
+                Arrays.asList(false, false, false, true, true, true, true, true, true);
+        List<Long> rowCountList1 = Arrays.asList(0L, 3L, 3L, 3L);
+        List<FieldStats[]> statsList1 =
+                Arrays.asList(
+                        new FieldStats[] {new FieldStats(null, null, 0L)},
+                        new FieldStats[] {new FieldStats(null, null, 3L)},
+                        new FieldStats[] {
+                            new FieldStats(
+                                    StringData.fromString("ab"),
+                                    StringData.fromString("abc123"),
+                                    1L)
+                        },
+                        new FieldStats[] {
+                            new FieldStats(
+                                    StringData.fromString("abc"), StringData.fromString("abe"), 1L)
+                        });
+        List<Boolean> expectedForStats1 = Arrays.asList(false, false, false, true);
+
+        CallExpression expr2 =
+                call(
+                        BuiltInFunctionDefinitions.LIKE,
+                        field(0, STRING()),
+                        literal("test=_%", STRING()),
+                        literal("=", STRING()));
+        List<Object[]> valuesList2 =
+                Arrays.asList(
+                        new Object[] {StringData.fromString("test%")},
+                        new Object[] {StringData.fromString("test_123")},
+                        new Object[] {StringData.fromString("test_%")},
+                        new Object[] {StringData.fromString("test__")});
+        List<Boolean> expectedForValues2 = Arrays.asList(false, true, true, true);
+        List<Long> rowCountList2 = Collections.singletonList(3L);
+        List<FieldStats[]> statsList2 =
+                Collections.singletonList(
+                        new FieldStats[] {
+                            new FieldStats(
+                                    StringData.fromString("test_123"),
+                                    StringData.fromString("test_789"),
+                                    0L)
+                        });
+        List<Boolean> expectedForStats2 = Collections.singletonList(true);
+
+        // currently, SQL wildcards '[]' and '[^]' are deemed as normal characters in Flink
+        CallExpression expr3 =
+                call(
+                        BuiltInFunctionDefinitions.LIKE,
+                        field(0, STRING()),
+                        literal("[a-c]xyz%", STRING()));
+        List<Object[]> valuesList3 =
+                Arrays.asList(
+                        new Object[] {StringData.fromString("axyz")},
+                        new Object[] {StringData.fromString("bxyz")},
+                        new Object[] {StringData.fromString("cxyz")},
+                        new Object[] {StringData.fromString("[a-c]xyz")});
+        List<Boolean> expectedForValues3 = Arrays.asList(false, false, false, true);
+        List<Long> rowCountList3 = Collections.singletonList(3L);
+        List<FieldStats[]> statsList3 =
+                Collections.singletonList(
+                        new FieldStats[] {
+                            new FieldStats(
+                                    StringData.fromString("[a-c]xyz"),
+                                    StringData.fromString("[a-c]xyzz"),
+                                    0L)
+                        });
+        List<Boolean> expectedForStats3 = Collections.singletonList(true);
+
+        CallExpression expr4 =
+                call(
+                        BuiltInFunctionDefinitions.LIKE,
+                        field(0, STRING()),
+                        literal("[^a-d]xyz%", STRING()));
+        List<Object[]> valuesList4 =
+                Arrays.asList(
+                        new Object[] {StringData.fromString("exyz")},
+                        new Object[] {StringData.fromString("fxyz")},
+                        new Object[] {StringData.fromString("axyz")},
+                        new Object[] {StringData.fromString("[^a-d]xyz")});
+        List<Boolean> expectedForValues4 = Arrays.asList(false, false, false, true);
+        List<Long> rowCountList4 = Collections.singletonList(3L);
+        List<FieldStats[]> statsList4 =
+                Collections.singletonList(
+                        new FieldStats[] {
+                            new FieldStats(
+                                    StringData.fromString("[^a-d]xyz"),
+                                    StringData.fromString("[^a-d]xyzz"),
+                                    1L)
+                        });
+        List<Boolean> expectedForStats4 = Collections.singletonList(true);
+
+        return Stream.of(
+                Arguments.of(
+                        expr1,
+                        valuesList1,
+                        expectedForValues1,
+                        rowCountList1,
+                        statsList1,
+                        expectedForStats1),
+                Arguments.of(
+                        expr2,
+                        valuesList2,
+                        expectedForValues2,
+                        rowCountList2,
+                        statsList2,
+                        expectedForStats2),
+                Arguments.of(
+                        expr3,
+                        valuesList3,
+                        expectedForValues3,
+                        rowCountList3,
+                        statsList3,
+                        expectedForStats3),
+                Arguments.of(
+                        expr4,
+                        valuesList4,
+                        expectedForValues4,
+                        rowCountList4,
+                        statsList4,
+                        expectedForStats4));
+    }
+
+    @Test
+    public void testUnsupportedExpression() {
+        CallExpression expression =
+                call(
+                        BuiltInFunctionDefinitions.AND,
+                        call(
+                                BuiltInFunctionDefinitions.EQUALS,
+                                field(0, DataTypes.INT()),
+                                literal(3)),
+                        call(
+                                BuiltInFunctionDefinitions.SIMILAR,
+                                field(1, DataTypes.INT()),
+                                literal(5)));
+        assertThatThrownBy(
+                        () ->
+                                expression.accept(
+                                        new PredicateConverter(
+                                                RowType.of(new IntType(), new IntType()))))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+    }
+
+    @Test
+    public void testUnsupportedStartsPatternForLike() {
+        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
+        // starts pattern with '_' as wildcard
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("abc_", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // starts pattern like 'abc%xyz' or 'abc_xyz'
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("abc%xyz", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("abc_xyz", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // starts pattern like 'abc%xyz' or 'abc_xyz' with '%' or '_' to escape
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "=%abc=%%xyz=_",
+                                                        STRING()), // matches "%abc%(?s:.*)xyz_"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "abc=%%xyz",
+                                                        STRING()), // matches "abc%(?s:.*)xyz"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "abc=%_xyz",
+                                                        STRING()), // matches "abc%.xyz"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "abc=_%xyz",
+                                                        STRING()), // matches "abc_(?s:.*)xyz"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "abc=__xyz",
+                                                        STRING()), // matches "abc_.xyz"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // starts pattern with wildcard '%' at the beginning to escape
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("=%%", STRING()), // matches "%(?s:.*)"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+    }
+
+    @Test
+    public void testUnsupportedEndsPatternForLike() {
+        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
+        // ends pattern with '%' as wildcard
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("%456", STRING())) // matches "(?s:.*)456"
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // ends pattern with '_' as wildcard
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("_456", STRING())) // matches ".456"
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // ends pattern with '[]' as wildcard
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("_[456]", STRING())) // matches ".[456]"
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "%[h-m]",
+                                                        STRING())) // matches "(?s:.*)[h-m]"
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // ends pattern with '[^]' as wildcard
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "%[^h-m]",
+                                                        STRING())) // matches "(?s:.*)[^h-m]"
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("_[^xyz]", STRING())) // matches ".[^xyz]"
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // ends pattern escape wildcard '%'
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "%=%456",
+                                                        STRING()), // matches "(?s:.*)%456"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "%=_456",
+                                                        STRING()), // matches "(?s:.*)_456"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // ends pattern escape wildcard '_'
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("_=_456", STRING()), // matches "._456"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+    }
+
+    @Test
+    public void testUnsupportedEqualsPatternForLike() {
+        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
+        // equals pattern
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("123456", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // equals pattern escape '%'
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("12=%45", STRING()), // equals "12%45"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // equals pattern escape '_'
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("12=_45", STRING()), // equals "12_45"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+    }
+
+    @Test
+    public void testUnsupportedMiddlePatternForLike() {
+        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
+        // middle pattern with '%' as wildcard
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("%345%", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // middle pattern with '_' as wildcard
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("_345_", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // middle pattern with both '%' and '_' as wildcard
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("_345%", STRING())) // matches ".345(?s:.*)"
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("%345_", STRING())) // matches "(?s:.*)345."
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // middle pattern with '[]' as wildcard
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "%[a-c]_",
+                                                        STRING())) // matches "(?s:.*)[a-c]."
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // middle pattern with '[^]' as wildcard
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "%[^abc]_",
+                                                        STRING())) // matches "(?s:.*)[^abc]."
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // middle pattern escape '%'
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "%34=%5%",
+                                                        STRING()), // matches "(?s:.*)34%5(.*)"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+        // middle pattern escape '_'
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "%34=_5%",
+                                                        STRING()), // matches "(?s:.*)34_5(.*)"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+    }
+
+    @Test
+    public void testUnsupportedType() {
+        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
+        DataType structType = DataTypes.ROW(DataTypes.INT()).bridgedTo(Row.class);
+        CallExpression expression =
+                call(
+                        BuiltInFunctionDefinitions.EQUALS,
+                        field(0, structType),
+                        literal(Row.of(1), structType));
+        assertThatThrownBy(() -> expression.accept(converter))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+    }
+
+    private static FieldReferenceExpression field(int i, DataType type) {
+        return new FieldReferenceExpression("name", type, 0, i);
+    }
+
+    private static CallExpression call(FunctionDefinition function, ResolvedExpression... args) {
+        return new CallExpression(function, Arrays.asList(args), DataTypes.BOOLEAN());
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
index 36b8ba8e..ce15619f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
@@ -18,35 +18,15 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.expressions.CallExpression;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.store.format.FieldStats;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.types.Row;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
 
-import static org.apache.flink.table.api.DataTypes.STRING;
-import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link Predicate}s. */
 public class PredicateTest {
@@ -54,9 +34,7 @@ public class PredicateTest {
     @Test
     public void testEqual() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(BuiltInFunctionDefinitions.EQUALS, field(0, DataTypes.INT()), literal(5));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.equal(0, 5);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
@@ -74,12 +52,7 @@ public class PredicateTest {
     @Test
     public void testEqualNull() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.EQUALS,
-                        field(0, DataTypes.INT()),
-                        literal(null, DataTypes.INT()));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.equal(0, null);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -92,9 +65,7 @@ public class PredicateTest {
     @Test
     public void testNotEqual() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(BuiltInFunctionDefinitions.NOT_EQUALS, field(0, DataTypes.INT()), literal(5));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.notEqual(0, 5);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
@@ -113,12 +84,7 @@ public class PredicateTest {
     @Test
     public void testNotEqualNull() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.NOT_EQUALS,
-                        field(0, DataTypes.INT()),
-                        literal(null, DataTypes.INT()));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.notEqual(0, null);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -131,12 +97,7 @@ public class PredicateTest {
     @Test
     public void testGreater() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.GREATER_THAN,
-                        field(0, DataTypes.INT()),
-                        literal(5));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.greaterThan(0, 5);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
@@ -156,12 +117,7 @@ public class PredicateTest {
     @Test
     public void testGreaterNull() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.GREATER_THAN,
-                        field(0, DataTypes.INT()),
-                        literal(null, DataTypes.INT()));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.greaterThan(0, null);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -174,12 +130,7 @@ public class PredicateTest {
     @Test
     public void testGreaterOrEqual() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
-                        field(0, DataTypes.INT()),
-                        literal(5));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.greaterOrEqual(0, 5);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
@@ -199,12 +150,7 @@ public class PredicateTest {
     @Test
     public void testGreaterOrEqualNull() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
-                        field(0, DataTypes.INT()),
-                        literal(null, DataTypes.INT()));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.greaterOrEqual(0, null);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -217,9 +163,7 @@ public class PredicateTest {
     @Test
     public void testLess() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(BuiltInFunctionDefinitions.LESS_THAN, field(0, DataTypes.INT()), literal(5));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.lessThan(0, 5);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
@@ -238,12 +182,7 @@ public class PredicateTest {
     @Test
     public void testLessNull() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.LESS_THAN,
-                        field(0, DataTypes.INT()),
-                        literal(null, DataTypes.INT()));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.lessThan(0, null);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -256,12 +195,7 @@ public class PredicateTest {
     @Test
     public void testLessOrEqual() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
-                        field(0, DataTypes.INT()),
-                        literal(5));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.lessOrEqual(0, 5);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
@@ -280,12 +214,7 @@ public class PredicateTest {
     @Test
     public void testLessOrEqualNull() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
-                        field(0, DataTypes.INT()),
-                        literal(null, DataTypes.INT()));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.lessOrEqual(0, null);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -298,12 +227,7 @@ public class PredicateTest {
     @Test
     public void testIsNull() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.IS_NULL,
-                        field(0, DataTypes.INT()),
-                        literal(null, DataTypes.INT()));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.isNull(0);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(true);
@@ -317,12 +241,7 @@ public class PredicateTest {
     @Test
     public void testIsNotNull() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.IS_NOT_NULL,
-                        field(0, DataTypes.INT()),
-                        literal(null, DataTypes.INT()));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = builder.isNotNull(0);
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -335,21 +254,80 @@ public class PredicateTest {
         assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNull(0));
     }
 
+    @Test
+    public void testIn() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+        Predicate predicate = builder.in(0, Arrays.asList(1, 3));
+
+        assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+                .isEqualTo(false);
+    }
+
+    @Test
+    public void testInNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+        Predicate predicate = builder.in(0, Arrays.asList(1, null, 3));
+
+        assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+                .isEqualTo(false);
+    }
+
+    @Test
+    public void testNotIn() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+        Predicate predicate = builder.notIn(0, Arrays.asList(1, 3));
+
+        assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {2})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+                .isEqualTo(false);
+    }
+
+    @Test
+    public void testNotInNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+        Predicate predicate = builder.notIn(0, Arrays.asList(1, null, 3));
+
+        assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+                .isEqualTo(false);
+    }
+
     @Test
     public void testAnd() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType()));
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.AND,
-                        call(
-                                BuiltInFunctionDefinitions.EQUALS,
-                                field(0, DataTypes.INT()),
-                                literal(3)),
-                        call(
-                                BuiltInFunctionDefinitions.EQUALS,
-                                field(1, DataTypes.INT()),
-                                literal(5)));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = PredicateBuilder.and(builder.equal(0, 3), builder.equal(1, 5));
 
         assertThat(predicate.test(new Object[] {4, 5})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {3, 6})).isEqualTo(false);
@@ -385,18 +363,7 @@ public class PredicateTest {
     @Test
     public void testOr() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType()));
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.OR,
-                        call(
-                                BuiltInFunctionDefinitions.EQUALS,
-                                field(0, DataTypes.INT()),
-                                literal(3)),
-                        call(
-                                BuiltInFunctionDefinitions.EQUALS,
-                                field(1, DataTypes.INT()),
-                                literal(5)));
-        Predicate predicate = expression.accept(new PredicateConverter(builder));
+        Predicate predicate = PredicateBuilder.or(builder.equal(0, 3), builder.equal(1, 5));
 
         assertThat(predicate.test(new Object[] {4, 6})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {3, 6})).isEqualTo(true);
@@ -428,535 +395,4 @@ public class PredicateTest {
         assertThat(predicate.negate().orElse(null))
                 .isEqualTo(PredicateBuilder.and(builder.notEqual(0, 3), builder.notEqual(1, 5)));
     }
-
-    @MethodSource("provideLikeExpressions")
-    @ParameterizedTest
-    public void testStartsWith(
-            CallExpression callExpression,
-            List<Object[]> valuesList,
-            List<Boolean> expectedForValues,
-            List<Long> rowCountList,
-            List<FieldStats[]> statsList,
-            List<Boolean> expectedForStats) {
-        Predicate predicate =
-                callExpression.accept(new PredicateConverter(RowType.of(new VarCharType())));
-        IntStream.range(0, valuesList.size())
-                .forEach(
-                        i ->
-                                assertThat(predicate.test(valuesList.get(i)))
-                                        .isEqualTo(expectedForValues.get(i)));
-        IntStream.range(0, rowCountList.size())
-                .forEach(
-                        i ->
-                                assertThat(predicate.test(rowCountList.get(i), statsList.get(i)))
-                                        .isEqualTo(expectedForStats.get(i)));
-    }
-
-    @Test
-    public void testUnsupportedExpression() {
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.AND,
-                        call(
-                                BuiltInFunctionDefinitions.EQUALS,
-                                field(0, DataTypes.INT()),
-                                literal(3)),
-                        call(
-                                BuiltInFunctionDefinitions.SIMILAR,
-                                field(1, DataTypes.INT()),
-                                literal(5)));
-        assertThatThrownBy(
-                        () ->
-                                expression.accept(
-                                        new PredicateConverter(
-                                                RowType.of(new IntType(), new IntType()))))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-    }
-
-    @Test
-    public void testUnsupportedStartsPatternForLike() {
-        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
-        // starts pattern with '_' as wildcard
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("abc_", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // starts pattern like 'abc%xyz' or 'abc_xyz'
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("abc%xyz", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("abc_xyz", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // starts pattern like 'abc%xyz' or 'abc_xyz' with '%' or '_' to escape
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal(
-                                                        "=%abc=%%xyz=_",
-                                                        STRING()), // matches "%abc%(?s:.*)xyz_"
-                                                literal("=", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal(
-                                                        "abc=%%xyz",
-                                                        STRING()), // matches "abc%(?s:.*)xyz"
-                                                literal("=", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal(
-                                                        "abc=%_xyz",
-                                                        STRING()), // matches "abc%.xyz"
-                                                literal("=", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal(
-                                                        "abc=_%xyz",
-                                                        STRING()), // matches "abc_(?s:.*)xyz"
-                                                literal("=", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal(
-                                                        "abc=__xyz",
-                                                        STRING()), // matches "abc_.xyz"
-                                                literal("=", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // starts pattern with wildcard '%' at the beginning to escape
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("=%%", STRING()), // matches "%(?s:.*)"
-                                                literal("=", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-    }
-
-    @Test
-    public void testUnsupportedEndsPatternForLike() {
-        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
-        // ends pattern with '%' as wildcard
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("%456", STRING())) // matches "(?s:.*)456"
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // ends pattern with '_' as wildcard
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("_456", STRING())) // matches ".456"
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // ends pattern with '[]' as wildcard
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("_[456]", STRING())) // matches ".[456]"
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal(
-                                                        "%[h-m]",
-                                                        STRING())) // matches "(?s:.*)[h-m]"
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // ends pattern with '[^]' as wildcard
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal(
-                                                        "%[^h-m]",
-                                                        STRING())) // matches "(?s:.*)[^h-m]"
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("_[^xyz]", STRING())) // matches ".[^xyz]"
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // ends pattern escape wildcard '%'
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal(
-                                                        "%=%456",
-                                                        STRING()), // matches "(?s:.*)%456"
-                                                literal("=", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal(
-                                                        "%=_456",
-                                                        STRING()), // matches "(?s:.*)_456"
-                                                literal("=", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // ends pattern escape wildcard '_'
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("_=_456", STRING()), // matches "._456"
-                                                literal("=", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-    }
-
-    @Test
-    public void testUnsupportedEqualsPatternForLike() {
-        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
-        // equals pattern
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("123456", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // equals pattern escape '%'
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("12=%45", STRING()), // equals "12%45"
-                                                literal("=", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // equals pattern escape '_'
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("12=_45", STRING()), // equals "12_45"
-                                                literal("=", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-    }
-
-    @Test
-    public void testUnsupportedMiddlePatternForLike() {
-        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
-        // middle pattern with '%' as wildcard
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("%345%", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // middle pattern with '_' as wildcard
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("_345_", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // middle pattern with both '%' and '_' as wildcard
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("_345%", STRING())) // matches ".345(?s:.*)"
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal("%345_", STRING())) // matches "(?s:.*)345."
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // middle pattern with '[]' as wildcard
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal(
-                                                        "%[a-c]_",
-                                                        STRING())) // matches "(?s:.*)[a-c]."
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // middle pattern with '[^]' as wildcard
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal(
-                                                        "%[^abc]_",
-                                                        STRING())) // matches "(?s:.*)[^abc]."
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // middle pattern escape '%'
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal(
-                                                        "%34=%5%",
-                                                        STRING()), // matches "(?s:.*)34%5(.*)"
-                                                literal("=", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
-        // middle pattern escape '_'
-        assertThatThrownBy(
-                        () ->
-                                call(
-                                                BuiltInFunctionDefinitions.LIKE,
-                                                field(0, STRING()),
-                                                literal(
-                                                        "%34=_5%",
-                                                        STRING()), // matches "(?s:.*)34_5(.*)"
-                                                literal("=", STRING()))
-                                        .accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-    }
-
-    @Test
-    public void testUnsupportedType() {
-        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
-        DataType structType = DataTypes.ROW(DataTypes.INT()).bridgedTo(Row.class);
-        CallExpression expression =
-                call(
-                        BuiltInFunctionDefinitions.EQUALS,
-                        field(0, structType),
-                        literal(Row.of(1), structType));
-        assertThatThrownBy(() -> expression.accept(converter))
-                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-    }
-
-    public static Stream<Arguments> provideLikeExpressions() {
-        CallExpression expr1 =
-                call(
-                        BuiltInFunctionDefinitions.LIKE,
-                        field(0, STRING()),
-                        literal("abd%", STRING()));
-        List<Object[]> valuesList1 =
-                Arrays.asList(
-                        new Object[] {null},
-                        new Object[] {StringData.fromString("a")},
-                        new Object[] {StringData.fromString("ab")},
-                        new Object[] {StringData.fromString("abd")},
-                        new Object[] {StringData.fromString("abd%")},
-                        new Object[] {StringData.fromString("abd1")},
-                        new Object[] {StringData.fromString("abde@")},
-                        new Object[] {StringData.fromString("abd_")},
-                        new Object[] {StringData.fromString("abd_%")});
-        List<Boolean> expectedForValues1 =
-                Arrays.asList(false, false, false, true, true, true, true, true, true);
-        List<Long> rowCountList1 = Arrays.asList(0L, 3L, 3L, 3L);
-        List<FieldStats[]> statsList1 =
-                Arrays.asList(
-                        new FieldStats[] {new FieldStats(null, null, 0L)},
-                        new FieldStats[] {new FieldStats(null, null, 3L)},
-                        new FieldStats[] {
-                            new FieldStats(
-                                    StringData.fromString("ab"),
-                                    StringData.fromString("abc123"),
-                                    1L)
-                        },
-                        new FieldStats[] {
-                            new FieldStats(
-                                    StringData.fromString("abc"), StringData.fromString("abe"), 1L)
-                        });
-        List<Boolean> expectedForStats1 = Arrays.asList(false, false, false, true);
-
-        CallExpression expr2 =
-                call(
-                        BuiltInFunctionDefinitions.LIKE,
-                        field(0, STRING()),
-                        literal("test=_%", STRING()),
-                        literal("=", STRING()));
-        List<Object[]> valuesList2 =
-                Arrays.asList(
-                        new Object[] {StringData.fromString("test%")},
-                        new Object[] {StringData.fromString("test_123")},
-                        new Object[] {StringData.fromString("test_%")},
-                        new Object[] {StringData.fromString("test__")});
-        List<Boolean> expectedForValues2 = Arrays.asList(false, true, true, true);
-        List<Long> rowCountList2 = Collections.singletonList(3L);
-        List<FieldStats[]> statsList2 =
-                Collections.singletonList(
-                        new FieldStats[] {
-                            new FieldStats(
-                                    StringData.fromString("test_123"),
-                                    StringData.fromString("test_789"),
-                                    0L)
-                        });
-        List<Boolean> expectedForStats2 = Collections.singletonList(true);
-
-        // currently, SQL wildcards '[]' and '[^]' are deemed as normal characters in Flink
-        CallExpression expr3 =
-                call(
-                        BuiltInFunctionDefinitions.LIKE,
-                        field(0, STRING()),
-                        literal("[a-c]xyz%", STRING()));
-        List<Object[]> valuesList3 =
-                Arrays.asList(
-                        new Object[] {StringData.fromString("axyz")},
-                        new Object[] {StringData.fromString("bxyz")},
-                        new Object[] {StringData.fromString("cxyz")},
-                        new Object[] {StringData.fromString("[a-c]xyz")});
-        List<Boolean> expectedForValues3 = Arrays.asList(false, false, false, true);
-        List<Long> rowCountList3 = Collections.singletonList(3L);
-        List<FieldStats[]> statsList3 =
-                Collections.singletonList(
-                        new FieldStats[] {
-                            new FieldStats(
-                                    StringData.fromString("[a-c]xyz"),
-                                    StringData.fromString("[a-c]xyzz"),
-                                    0L)
-                        });
-        List<Boolean> expectedForStats3 = Collections.singletonList(true);
-
-        CallExpression expr4 =
-                call(
-                        BuiltInFunctionDefinitions.LIKE,
-                        field(0, STRING()),
-                        literal("[^a-d]xyz%", STRING()));
-        List<Object[]> valuesList4 =
-                Arrays.asList(
-                        new Object[] {StringData.fromString("exyz")},
-                        new Object[] {StringData.fromString("fxyz")},
-                        new Object[] {StringData.fromString("axyz")},
-                        new Object[] {StringData.fromString("[^a-d]xyz")});
-        List<Boolean> expectedForValues4 = Arrays.asList(false, false, false, true);
-        List<Long> rowCountList4 = Collections.singletonList(3L);
-        List<FieldStats[]> statsList4 =
-                Collections.singletonList(
-                        new FieldStats[] {
-                            new FieldStats(
-                                    StringData.fromString("[^a-d]xyz"),
-                                    StringData.fromString("[^a-d]xyzz"),
-                                    1L)
-                        });
-        List<Boolean> expectedForStats4 = Collections.singletonList(true);
-
-        return Stream.of(
-                Arguments.of(
-                        expr1,
-                        valuesList1,
-                        expectedForValues1,
-                        rowCountList1,
-                        statsList1,
-                        expectedForStats1),
-                Arguments.of(
-                        expr2,
-                        valuesList2,
-                        expectedForValues2,
-                        rowCountList2,
-                        statsList2,
-                        expectedForStats2),
-                Arguments.of(
-                        expr3,
-                        valuesList3,
-                        expectedForValues3,
-                        rowCountList3,
-                        statsList3,
-                        expectedForStats3),
-                Arguments.of(
-                        expr4,
-                        valuesList4,
-                        expectedForValues4,
-                        rowCountList4,
-                        statsList4,
-                        expectedForStats4));
-    }
-
-    private static FieldReferenceExpression field(int i, DataType type) {
-        return new FieldReferenceExpression("name", type, 0, i);
-    }
-
-    private static CallExpression call(FunctionDefinition function, ResolvedExpression... args) {
-        return new CallExpression(function, Arrays.asList(args), DataTypes.BOOLEAN());
-    }
 }
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
index 24406d79..0ab02e1a 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
@@ -174,9 +174,7 @@ public class SearchArgumentToPredicateConverterTest {
         SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
         SearchArgument sarg =
                 builder.in("f_bigint", PredicateLeaf.Type.LONG, 100L, 200L, 300L).build();
-        Predicate expected =
-                PredicateBuilder.or(
-                        BUILDER.equal(1, 100L), BUILDER.equal(1, 200L), BUILDER.equal(1, 300L));
+        Predicate expected = BUILDER.in(1, Arrays.asList(100L, 200L, 300L));
         assertExpected(sarg, expected);
     }
 
@@ -185,9 +183,7 @@ public class SearchArgumentToPredicateConverterTest {
         SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
         SearchArgument sarg =
                 builder.in("f_bigint", PredicateLeaf.Type.LONG, 100L, null, 300L).build();
-        Predicate expected =
-                PredicateBuilder.or(
-                        BUILDER.equal(1, 100L), BUILDER.equal(1, null), BUILDER.equal(1, 300L));
+        Predicate expected = BUILDER.in(1, Arrays.asList(100L, null, 300L));
         assertExpected(sarg, expected);
     }
 
@@ -199,11 +195,7 @@ public class SearchArgumentToPredicateConverterTest {
                         .in("f_bigint", PredicateLeaf.Type.LONG, 100L, 200L, 300L)
                         .end()
                         .build();
-        Predicate expected =
-                PredicateBuilder.and(
-                        BUILDER.notEqual(1, 100L),
-                        BUILDER.notEqual(1, 200L),
-                        BUILDER.notEqual(1, 300L));
+        Predicate expected = BUILDER.notIn(1, Arrays.asList(100L, 200L, 300L));
         assertExpected(sarg, expected);
     }
 
@@ -215,11 +207,7 @@ public class SearchArgumentToPredicateConverterTest {
                         .in("f_bigint", PredicateLeaf.Type.LONG, 100L, null, 300L)
                         .end()
                         .build();
-        Predicate expected =
-                PredicateBuilder.and(
-                        BUILDER.notEqual(1, 100L),
-                        BUILDER.notEqual(1, null),
-                        BUILDER.notEqual(1, 300L));
+        Predicate expected = BUILDER.notIn(1, Arrays.asList(100L, null, 300L));
         assertExpected(sarg, expected);
     }
 
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
index 6213ca39..285911bb 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
@@ -28,6 +28,7 @@ import org.apache.spark.sql.sources.EqualTo;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.sources.GreaterThan;
 import org.apache.spark.sql.sources.GreaterThanOrEqual;
+import org.apache.spark.sql.sources.In;
 import org.apache.spark.sql.sources.IsNotNull;
 import org.apache.spark.sql.sources.IsNull;
 import org.apache.spark.sql.sources.LessThan;
@@ -36,6 +37,9 @@ import org.apache.spark.sql.sources.Not;
 import org.apache.spark.sql.sources.Or;
 import org.apache.spark.sql.sources.StringStartsWith;
 
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.convertJavaObject;
 
 /** Conversion from {@link Filter} to {@link Predicate}. */
@@ -76,6 +80,14 @@ public class SparkFilterConverter {
             int index = fieldIndex(lt.attribute());
             Object literal = convertLiteral(index, lt.value());
             return builder.lessOrEqual(index, literal);
+        } else if (filter instanceof In) {
+            In in = (In) filter;
+            int index = fieldIndex(in.attribute());
+            return builder.in(
+                    index,
+                    Arrays.stream(in.values())
+                            .map(v -> convertLiteral(index, v))
+                            .collect(Collectors.toList()));
         } else if (filter instanceof IsNull) {
             return builder.isNull(fieldIndex(((IsNull) filter).attribute()));
         } else if (filter instanceof IsNotNull) {
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
index f2c3a711..8b35a31f 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.spark.sql.sources.EqualTo;
 import org.apache.spark.sql.sources.GreaterThan;
 import org.apache.spark.sql.sources.GreaterThanOrEqual;
+import org.apache.spark.sql.sources.In;
 import org.apache.spark.sql.sources.IsNotNull;
 import org.apache.spark.sql.sources.IsNull;
 import org.apache.spark.sql.sources.LessThan;
@@ -41,6 +42,7 @@ import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
+import java.util.Arrays;
 import java.util.Collections;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -115,6 +117,11 @@ public class SparkFilterConverterTest {
         Predicate expectedEqNull = builder.equal(0, null);
         Predicate actualEqNull = converter.convert(eqNull);
         assertThat(actualEqNull).isEqualTo(expectedEqNull);
+
+        In in = In.apply(field, new Object[] {1, null, 2});
+        Predicate expectedIn = builder.in(0, Arrays.asList(1, null, 2));
+        Predicate actualIn = converter.convert(in);
+        assertThat(actualIn).isEqualTo(expectedIn);
     }
 
     @Test