You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/14 02:55:12 UTC

[flink-table-store] branch master updated: [FLINK-28285] Push filter into orc reader

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 088955f7 [FLINK-28285] Push filter into orc reader
088955f7 is described below

commit 088955f72bddf8134934dfd21152e1933c0cab77
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jul 14 10:55:08 2022 +0800

    [FLINK-28285] Push filter into orc reader
    
    This closes #212
---
 .../flink/table/store/file/predicate/And.java      |   0
 .../table/store/file/predicate/CompareUtils.java   |   0
 .../store/file/predicate/CompoundPredicate.java    |   0
 .../flink/table/store/file/predicate/Equal.java    |   0
 .../table/store/file/predicate/GreaterOrEqual.java |   0
 .../table/store/file/predicate/GreaterThan.java    |   0
 .../flink/table/store/file/predicate/In.java       |   0
 .../table/store/file/predicate/IsNotNull.java      |   0
 .../flink/table/store/file/predicate/IsNull.java   |   0
 .../table/store/file/predicate/LeafFunction.java   |   0
 .../table/store/file/predicate/LeafPredicate.java  |  33 ++-
 .../store/file/predicate/LeafUnaryFunction.java    |   0
 .../table/store/file/predicate/LessOrEqual.java    |   0
 .../flink/table/store/file/predicate/LessThan.java |   0
 .../flink/table/store/file/predicate/NotEqual.java |   0
 .../flink/table/store/file/predicate/NotIn.java    |   0
 .../predicate/NullFalseLeafBinaryFunction.java     |   0
 .../flink/table/store/file/predicate/Or.java       |   0
 .../table/store/file/predicate/Predicate.java      |   0
 .../store/file/predicate/PredicateBuilder.java     |  49 +++-
 .../store/file/predicate/PredicateConverter.java   |   0
 .../table/store/file/predicate/StartsWith.java     |   0
 .../flink/table/store/format/FileFormat.java       |   4 +-
 .../store/connector/source/FileStoreSource.java    |   3 +
 .../source/TestChangelogDataReadWrite.java         |   6 +
 flink-table-store-core/pom.xml                     |  42 ++++
 .../table/store/file/data/DataFileReader.java      |  10 +-
 .../file/operation/AppendOnlyFileStoreRead.java    |  13 +-
 .../table/store/file/operation/FileStoreRead.java  |   3 +
 .../file/operation/KeyValueFileStoreRead.java      |  14 +-
 .../store/table/AppendOnlyFileStoreTable.java      |   6 +
 .../table/ChangelogValueCountFileStoreTable.java   |   6 +
 .../table/ChangelogWithKeyFileStoreTable.java      |  32 ++-
 .../flink/table/store/table/source/TableRead.java  |   3 +-
 .../file/format/FileStatsExtractingAvroFormat.java |   4 +-
 .../store/file/format/FlushingFileFormat.java      |   4 +-
 .../table/store/file/predicate/PredicateTest.java  | 111 ++++++++++
 .../store/table/AppendOnlyFileStoreTableTest.java  |   1 -
 .../ChangelogValueCountFileStoreTableTest.java     |   1 -
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  42 +++-
 .../table/store/table/FileStoreTableTestBase.java  |  29 +++
 .../table/store/table/WritePreemptMemoryTest.java  |   5 +-
 .../table/store/format/avro/AvroFileFormat.java    |   4 +-
 .../table/store/format/orc/OrcFileFormat.java      |   9 +-
 .../table/store/format/orc/OrcFilterConverter.java | 246 +++++++++++++++++++++
 .../store/format/orc/OrcFilterConverterTest.java   |  80 +++++++
 .../table/store/mapred/TableStoreInputFormat.java  |   7 +-
 .../SearchArgumentToPredicateConverterTest.java    |  40 +++-
 .../table/store/spark/SparkReaderFactory.java      |  19 +-
 .../apache/flink/table/store/spark/SparkScan.java  |   2 +-
 .../store/spark/SparkFilterConverterTest.java      |  10 +
 51 files changed, 785 insertions(+), 53 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/And.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/And.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/In.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/In.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
similarity index 80%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
index 17a52f76..5e48776a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
@@ -26,8 +26,6 @@ import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -42,15 +40,21 @@ public class LeafPredicate implements Predicate {
 
     private final LeafFunction function;
     private final LogicalType type;
-    private final int index;
+    private final int fieldIndex;
+    private final String fieldName;
 
     private transient List<Object> literals;
 
     public LeafPredicate(
-            LeafFunction function, LogicalType type, int index, List<Object> literals) {
+            LeafFunction function,
+            LogicalType type,
+            int fieldIndex,
+            String fieldName,
+            List<Object> literals) {
         this.function = function;
         this.type = type;
-        this.index = index;
+        this.fieldIndex = fieldIndex;
+        this.fieldName = fieldName;
         this.literals = literals;
     }
 
@@ -58,13 +62,16 @@ public class LeafPredicate implements Predicate {
         return function;
     }
 
-    @Nullable
     public LogicalType type() {
         return type;
     }
 
     public int index() {
-        return index;
+        return fieldIndex;
+    }
+
+    public String fieldName() {
+        return fieldName;
     }
 
     public List<Object> literals() {
@@ -73,17 +80,18 @@ public class LeafPredicate implements Predicate {
 
     @Override
     public boolean test(Object[] values) {
-        return function.test(type, values[index], literals);
+        return function.test(type, values[fieldIndex], literals);
     }
 
     @Override
     public boolean test(long rowCount, FieldStats[] fieldStats) {
-        return function.test(type, rowCount, fieldStats[index], literals);
+        return function.test(type, rowCount, fieldStats[fieldIndex], literals);
     }
 
     @Override
     public Optional<Predicate> negate() {
-        return function.negate().map(negate -> new LeafPredicate(negate, type, index, literals));
+        return function.negate()
+                .map(negate -> new LeafPredicate(negate, type, fieldIndex, fieldName, literals));
     }
 
     @Override
@@ -95,7 +103,8 @@ public class LeafPredicate implements Predicate {
             return false;
         }
         LeafPredicate that = (LeafPredicate) o;
-        return index == that.index
+        return fieldIndex == that.fieldIndex
+                && Objects.equals(fieldName, that.fieldName)
                 && Objects.equals(function, that.function)
                 && Objects.equals(type, that.type)
                 && Objects.equals(literals, that.literals);
@@ -103,7 +112,7 @@ public class LeafPredicate implements Predicate {
 
     @Override
     public int hashCode() {
-        return Objects.hash(function, type, index, literals);
+        return Objects.hash(function, type, fieldIndex, fieldName, literals);
     }
 
     private ListSerializer<Object> objectsSerializer() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
similarity index 85%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
index 035809d9..a1b86a2d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
@@ -39,6 +41,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 
 import static java.util.Collections.singletonList;
 
@@ -88,19 +91,32 @@ public class PredicateBuilder {
     }
 
     public Predicate leaf(NullFalseLeafBinaryFunction function, int idx, Object literal) {
-        return new LeafPredicate(function, rowType.getTypeAt(idx), idx, singletonList(literal));
+        RowType.RowField field = rowType.getFields().get(idx);
+        return new LeafPredicate(
+                function, field.getType(), idx, field.getName(), singletonList(literal));
     }
 
     public Predicate leaf(LeafUnaryFunction function, int idx) {
-        return new LeafPredicate(function, rowType.getTypeAt(idx), idx, Collections.emptyList());
+        RowType.RowField field = rowType.getFields().get(idx);
+        return new LeafPredicate(
+                function, field.getType(), idx, field.getName(), Collections.emptyList());
     }
 
     public Predicate in(int idx, List<Object> literals) {
-        return new LeafPredicate(In.INSTANCE, rowType.getTypeAt(idx), idx, literals);
+        if (literals.size() > 20) {
+            RowType.RowField field = rowType.getFields().get(idx);
+            return new LeafPredicate(In.INSTANCE, field.getType(), idx, field.getName(), literals);
+        }
+
+        List<Predicate> equals = new ArrayList<>(literals.size());
+        for (Object literal : literals) {
+            equals.add(equal(idx, literal));
+        }
+        return or(equals);
     }
 
     public Predicate notIn(int idx, List<Object> literals) {
-        return new LeafPredicate(NotIn.INSTANCE, rowType.getTypeAt(idx), idx, literals);
+        return in(idx, literals).negate().get();
     }
 
     public Predicate between(int idx, Object includedLowerBound, Object includedUpperBound) {
@@ -140,13 +156,19 @@ public class PredicateBuilder {
                 .get();
     }
 
-    public static List<Predicate> splitAnd(Predicate predicate) {
+    public static List<Predicate> splitAnd(@Nullable Predicate predicate) {
+        if (predicate == null) {
+            return Collections.emptyList();
+        }
         List<Predicate> result = new ArrayList<>();
         splitCompound(And.INSTANCE, predicate, result);
         return result;
     }
 
-    public static List<Predicate> splitOr(Predicate predicate) {
+    public static List<Predicate> splitOr(@Nullable Predicate predicate) {
+        if (predicate == null) {
+            return Collections.emptyList();
+        }
         List<Predicate> result = new ArrayList<>();
         splitCompound(Or.INSTANCE, predicate, result);
         return result;
@@ -268,10 +290,25 @@ public class PredicateBuilder {
                                 leafPredicate.function(),
                                 leafPredicate.type(),
                                 mapped,
+                                leafPredicate.fieldName(),
                                 leafPredicate.literals()));
             } else {
                 return Optional.empty();
             }
         }
     }
+
+    public static boolean containsFields(Predicate predicate, Set<String> fields) {
+        if (predicate instanceof CompoundPredicate) {
+            for (Predicate child : ((CompoundPredicate) predicate).children()) {
+                if (containsFields(child, fields)) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            LeafPredicate leafPredicate = (LeafPredicate) predicate;
+            return fields.contains(leafPredicate.fieldName());
+        }
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
index 4fad3648..d457e258 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
@@ -26,7 +26,7 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.ArrayList;
@@ -59,7 +59,7 @@ public abstract class FileFormat {
      * @param filters A list of filters in conjunctive form for filtering on a best-effort basis.
      */
     public abstract BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<ResolvedExpression> filters);
+            RowType type, int[][] projection, List<Predicate> filters);
 
     /** Create a {@link BulkWriter.Factory} from the type. */
     public abstract BulkWriter.Factory<RowData> createWriterFactory(RowType type);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 2752b64a..cd2c7a28 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -84,6 +84,9 @@ public class FileStoreSource
         if (projectedFields != null) {
             read.withProjection(projectedFields);
         }
+        if (predicate != null) {
+            read.withFilter(predicate);
+        }
         return new FileStoreSourceReader(context, read);
     }
 
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 1d26808c..8c7a504b 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.store.file.memory.MemoryOwner;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite;
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
@@ -108,6 +109,11 @@ public class TestChangelogDataReadWrite {
                         avro,
                         pathFactory);
         return new KeyValueTableRead(read) {
+            @Override
+            public TableRead withFilter(Predicate predicate) {
+                throw new UnsupportedOperationException();
+            }
+
             @Override
             public TableRead withProjection(int[][] projection) {
                 throw new UnsupportedOperationException();
diff --git a/flink-table-store-core/pom.xml b/flink-table-store-core/pom.xml
index c364d497..ac25fab5 100644
--- a/flink-table-store-core/pom.xml
+++ b/flink-table-store-core/pom.xml
@@ -89,6 +89,48 @@ under the License.
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
index a57044f3..26cd5c65 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializer;
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
@@ -37,6 +38,8 @@ import org.apache.flink.table.types.logical.RowType;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * Reads {@link KeyValue}s from data files.
@@ -173,10 +176,11 @@ public class DataFileReader {
         }
 
         public DataFileReader create(BinaryRowData partition, int bucket) {
-            return create(partition, bucket, true);
+            return create(partition, bucket, true, Collections.emptyList());
         }
 
-        public DataFileReader create(BinaryRowData partition, int bucket, boolean projectKeys) {
+        public DataFileReader create(
+                BinaryRowData partition, int bucket, boolean projectKeys, List<Predicate> filters) {
             int[][] keyProjection = projectKeys ? this.keyProjection : fullKeyProjection;
             RowType projectedKeyType = projectKeys ? this.projectedKeyType : keyType;
 
@@ -188,7 +192,7 @@ public class DataFileReader {
                     schemaId,
                     projectedKeyType,
                     projectedValueType,
-                    fileFormat.createReaderFactory(recordType, projection),
+                    fileFormat.createReaderFactory(recordType, projection, filters),
                     pathFactory.createDataFilePathFactory(partition, bucket));
         }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
index c527f415..176b7524 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.store.file.data.AppendOnlyReader;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
@@ -37,6 +38,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
+
 /** {@link FileStoreRead} for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */
 public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
 
@@ -48,6 +51,8 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
 
     private int[][] projection;
 
+    private List<Predicate> filters;
+
     public AppendOnlyFileStoreRead(
             SchemaManager schemaManager,
             long schemaId,
@@ -68,10 +73,16 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
         return this;
     }
 
+    @Override
+    public FileStoreRead<RowData> withFilter(Predicate predicate) {
+        this.filters = splitAnd(predicate);
+        return this;
+    }
+
     @Override
     public RecordReader<RowData> createReader(Split split) throws IOException {
         BulkFormat<RowData, FileSourceSplit> readerFactory =
-                fileFormat.createReaderFactory(rowType, projection);
+                fileFormat.createReaderFactory(rowType, projection, filters);
         DataFilePathFactory dataFilePathFactory =
                 pathFactory.createDataFilePathFactory(split.partition(), split.bucket());
         List<ConcatRecordReader.ReaderSupplier<RowData>> suppliers = new ArrayList<>();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
index 43729878..f540f0cb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.table.source.Split;
 
@@ -30,6 +31,8 @@ import java.io.IOException;
  */
 public interface FileStoreRead<T> {
 
+    FileStoreRead<T> withFilter(Predicate predicate);
+
     /** Create a {@link RecordReader} from split. */
     RecordReader<T> createReader(Split split) throws IOException;
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index de83b87c..40bd85c5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.ProjectKeyRecordReader;
@@ -41,6 +42,7 @@ import java.util.List;
 import java.util.Optional;
 
 import static org.apache.flink.table.store.file.data.DataFilePathFactory.CHANGELOG_FILE_PREFIX;
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
 
 /**
  * {@link FileStoreRead} implementation for {@link
@@ -54,6 +56,8 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
 
     private int[][] keyProjectedFields;
 
+    private List<Predicate> filters;
+
     public KeyValueFileStoreRead(
             SchemaManager schemaManager,
             long schemaId,
@@ -81,11 +85,17 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
         return this;
     }
 
+    @Override
+    public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
+        this.filters = splitAnd(predicate);
+        return this;
+    }
+
     @Override
     public RecordReader<KeyValue> createReader(Split split) throws IOException {
         if (split.isIncremental()) {
             DataFileReader dataFileReader =
-                    dataFileReaderFactory.create(split.partition(), split.bucket(), true);
+                    dataFileReaderFactory.create(split.partition(), split.bucket(), true, filters);
             // Return the raw file contents without merging
             List<ConcatRecordReader.ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
             for (DataFileMeta file : split.files()) {
@@ -97,7 +107,7 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
             // in this case merge tree should merge records with same key
             // Do not project key in MergeTreeReader.
             DataFileReader dataFileReader =
-                    dataFileReaderFactory.create(split.partition(), split.bucket(), false);
+                    dataFileReaderFactory.create(split.partition(), split.bucket(), false, filters);
             MergeTreeReader reader =
                     new MergeTreeReader(
                             new IntervalPartition(split.files(), keyComparator).partition(),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 16f4be28..65975cc6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -85,6 +85,12 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
     public TableRead newRead() {
         AppendOnlyFileStoreRead read = store.newRead();
         return new TableRead() {
+            @Override
+            public TableRead withFilter(Predicate predicate) {
+                read.withFilter(predicate);
+                return this;
+            }
+
             @Override
             public TableRead withProjection(int[][] projection) {
                 read.withProjection(projection);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index ad90ede0..9dea5237 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -98,6 +98,12 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
     public TableRead newRead() {
         return new KeyValueTableRead(store.newRead()) {
 
+            @Override
+            public TableRead withFilter(Predicate predicate) {
+                read.withFilter(predicate);
+                return this;
+            }
+
             @Override
             public TableRead withProjection(int[][] projection) {
                 read.withKeyProjection(projection);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 37a29062..a47a1557 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -50,9 +49,13 @@ import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterat
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.containsFields;
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
 
@@ -142,7 +145,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
                                 tableSchema.fieldNames(),
                                 tableSchema.trimmedPrimaryKeys());
                 if (keyFilters.size() > 0) {
-                    scan.withKeyFilter(PredicateBuilder.and(keyFilters));
+                    scan.withKeyFilter(and(keyFilters));
                 }
             }
         };
@@ -150,7 +153,32 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
 
     @Override
     public TableRead newRead() {
+        List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();
+        Set<String> nonPrimaryKeys =
+                tableSchema.fieldNames().stream()
+                        .filter(name -> !primaryKeys.contains(name))
+                        .collect(Collectors.toSet());
         return new KeyValueTableRead(store.newRead()) {
+
+            @Override
+            public TableRead withFilter(Predicate predicate) {
+                List<Predicate> predicates = new ArrayList<>();
+                for (Predicate sub : splitAnd(predicate)) {
+                    // TODO support value filter
+                    if (containsFields(sub, nonPrimaryKeys)) {
+                        continue;
+                    }
+
+                    // TODO Actually, the index is wrong, but it is OK. The orc filter
+                    // just use name instead of index.
+                    predicates.add(sub);
+                }
+                if (predicates.size() > 0) {
+                    read.withFilter(and(predicates));
+                }
+                return this;
+            }
+
             @Override
             public TableRead withProjection(int[][] projection) {
                 read.withValueProjection(projection);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
index b8fa4061..440e3d2d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.RecordReader;
 
 import java.io.IOException;
@@ -28,7 +29,7 @@ import java.util.Arrays;
 /** An abstraction layer above {@link FileStoreRead} to provide reading of {@link RowData}. */
 public interface TableRead {
 
-    // TODO support filter push down
+    TableRead withFilter(Predicate predicate);
 
     default TableRead withProjection(int[] projection) {
         int[][] nestedProjection =
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
index 29ebd0c3..692ba1b5 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.stats.TestFileStatsExtractor;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.format.FileStatsExtractor;
@@ -44,7 +44,7 @@ public class FileStatsExtractingAvroFormat extends FileFormat {
 
     @Override
     public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<ResolvedExpression> filters) {
+            RowType type, int[][] projection, List<Predicate> filters) {
         return avro.createReaderFactory(type, projection, filters);
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
index 23569b21..94e11003 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -42,7 +42,7 @@ public class FlushingFileFormat extends FileFormat {
 
     @Override
     public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<ResolvedExpression> filters) {
+            RowType type, int[][] projection, List<Predicate> filters) {
         return format.createReaderFactory(type, projection, filters);
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
index ce15619f..5e58e73b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
@@ -24,7 +24,9 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -258,6 +260,7 @@ public class PredicateTest {
     public void testIn() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
         Predicate predicate = builder.in(0, Arrays.asList(1, 3));
+        assertThat(predicate).isInstanceOf(CompoundPredicate.class);
 
         assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
@@ -274,6 +277,7 @@ public class PredicateTest {
     public void testInNull() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
         Predicate predicate = builder.in(0, Arrays.asList(1, null, 3));
+        assertThat(predicate).isInstanceOf(CompoundPredicate.class);
 
         assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
@@ -290,6 +294,7 @@ public class PredicateTest {
     public void testNotIn() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
         Predicate predicate = builder.notIn(0, Arrays.asList(1, 3));
+        assertThat(predicate).isInstanceOf(CompoundPredicate.class);
 
         assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {2})).isEqualTo(true);
@@ -309,6 +314,7 @@ public class PredicateTest {
     public void testNotInNull() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
         Predicate predicate = builder.notIn(0, Arrays.asList(1, null, 3));
+        assertThat(predicate).isInstanceOf(CompoundPredicate.class);
 
         assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
@@ -324,6 +330,111 @@ public class PredicateTest {
                 .isEqualTo(false);
     }
 
+    @Test
+    public void testLargeIn() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+        List<Object> literals = new ArrayList<>();
+        literals.add(1);
+        literals.add(3);
+        for (int i = 10; i < 30; i++) {
+            literals.add(i);
+        }
+        Predicate predicate = builder.in(0, literals);
+        assertThat(predicate).isInstanceOf(LeafPredicate.class);
+
+        assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+                .isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 0)})).isEqualTo(true);
+    }
+
+    @Test
+    public void testLargeInNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+        List<Object> literals = new ArrayList<>();
+        literals.add(1);
+        literals.add(null);
+        literals.add(3);
+        for (int i = 10; i < 30; i++) {
+            literals.add(i);
+        }
+        Predicate predicate = builder.in(0, literals);
+        assertThat(predicate).isInstanceOf(LeafPredicate.class);
+
+        assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+                .isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 0)})).isEqualTo(true);
+    }
+
+    @Test
+    public void testLargeNotIn() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+        List<Object> literals = new ArrayList<>();
+        literals.add(1);
+        literals.add(3);
+        for (int i = 10; i < 30; i++) {
+            literals.add(i);
+        }
+        Predicate predicate = builder.notIn(0, literals);
+        assertThat(predicate).isInstanceOf(LeafPredicate.class);
+
+        assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {2})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+                .isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 0)})).isEqualTo(true);
+    }
+
+    @Test
+    public void testLargeNotInNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+        List<Object> literals = new ArrayList<>();
+        literals.add(1);
+        literals.add(null);
+        literals.add(3);
+        for (int i = 10; i < 30; i++) {
+            literals.add(i);
+        }
+        Predicate predicate = builder.notIn(0, literals);
+        assertThat(predicate).isInstanceOf(LeafPredicate.class);
+
+        assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+                .isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 0)}))
+                .isEqualTo(false);
+    }
+
     @Test
     public void testAnd() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType()));
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index f4498949..57cfb1bd 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -168,7 +168,6 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, tablePath.toString());
-        conf.set(CoreOptions.FILE_FORMAT, "avro");
         conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
         configure.accept(conf);
         SchemaManager schemaManager = new SchemaManager(tablePath);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 24733a7b..9f3604d3 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -171,7 +171,6 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, tablePath.toString());
-        conf.set(CoreOptions.FILE_FORMAT, "avro");
         conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
         configure.accept(conf);
         SchemaManager schemaManager = new SchemaManager(tablePath);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index a7a5a318..6a19f043 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -209,6 +209,47 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.close();
     }
 
+    @Override
+    @Test
+    public void testReadFilter() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
+
+        write.write(GenericRowData.of(1, 10, 100L));
+        write.write(GenericRowData.of(1, 20, 200L));
+        commit.commit("0", write.prepareCommit(true));
+
+        write.write(GenericRowData.of(1, 30, 300L));
+        write.write(GenericRowData.of(1, 40, 400L));
+        commit.commit("1", write.prepareCommit(true));
+
+        write.write(GenericRowData.of(1, 50, 500L));
+        write.write(GenericRowData.of(1, 60, 600L));
+        commit.commit("2", write.prepareCommit(true));
+
+        write.close();
+
+        PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+        List<Split> splits = table.newScan().plan().splits;
+
+        TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
+        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                "1|10|100",
+                                "1|20|200",
+                                "1|30|300",
+                                "1|40|400",
+                                "1|50|500",
+                                "1|60|600"));
+
+        read = table.newRead().withFilter(builder.equal(1, 30));
+        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+    }
+
     protected FileStoreTable createFileStoreTable(boolean changelogFile) throws Exception {
         return createFileStoreTable(conf -> conf.set(CoreOptions.CHANGELOG_FILE, changelogFile));
     }
@@ -219,7 +260,6 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, tablePath.toString());
-        conf.set(CoreOptions.FILE_FORMAT, "avro");
         conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
         configure.accept(conf);
         SchemaManager schemaManager = new SchemaManager(tablePath);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 2b818185..8b9250bf 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -40,6 +40,7 @@ import org.apache.flink.types.RowKind;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -133,6 +134,34 @@ public abstract class FileStoreTableTestBase {
         assertThat(splits.get(0).bucket()).isEqualTo(1);
     }
 
+    @Test
+    public void testReadFilter() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
+
+        write.write(GenericRowData.of(1, 10, 100L));
+        write.write(GenericRowData.of(1, 20, 200L));
+        commit.commit("0", write.prepareCommit(true));
+
+        write.write(GenericRowData.of(1, 30, 300L));
+        write.write(GenericRowData.of(1, 40, 400L));
+        commit.commit("1", write.prepareCommit(true));
+
+        write.write(GenericRowData.of(1, 50, 500L));
+        write.write(GenericRowData.of(1, 60, 600L));
+        commit.commit("2", write.prepareCommit(true));
+
+        write.close();
+
+        PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+        List<Split> splits = table.newScan().plan().splits;
+        TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
+        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+    }
+
     protected List<String> getResult(
             TableRead read,
             List<Split> splits,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index d4ee1d13..7252a6a6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -60,6 +60,10 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
         testWritePreemptMemory(true);
     }
 
+    @Override // this has been tested in ChangelogWithKeyFileStoreTableTest
+    @Test
+    public void testReadFilter() {}
+
     private void testWritePreemptMemory(boolean singlePartition) throws Exception {
         // write
         FileStoreTable table = createFileStoreTable();
@@ -93,7 +97,6 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
         Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, tablePath.toString());
-        conf.set(CoreOptions.FILE_FORMAT, "avro");
         conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
         conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(30 * 1024));
         conf.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
index 66e1901a..90618f1d 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
@@ -31,8 +31,8 @@ import org.apache.flink.formats.avro.RowDataToAvroConverters;
 import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.utils.Projection;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -65,7 +65,7 @@ public class AvroFileFormat extends FileFormat {
 
     @Override
     public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<ResolvedExpression> filters) {
+            RowType type, int[][] projection, List<Predicate> filters) {
         // avro is a file format that keeps schemas in file headers,
         // if the schema given to the reader is not equal to the schema in header,
         // reader will automatically map the fields and give back records with our desired
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 60ebece8..880074c2 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -29,8 +29,7 @@ import org.apache.flink.orc.OrcSplitReaderUtil;
 import org.apache.flink.orc.vector.RowDataVectorizer;
 import org.apache.flink.orc.writer.OrcBulkWriterFactory;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.store.utils.Projection;
@@ -68,12 +67,12 @@ public class OrcFileFormat extends FileFormat {
 
     @Override
     public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<ResolvedExpression> filters) {
+            RowType type, int[][] projection, List<Predicate> filters) {
         List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();
 
         if (filters != null) {
-            for (Expression pred : filters) {
-                OrcFilters.Predicate orcPred = OrcFilters.toOrcPredicate(pred);
+            for (Predicate pred : filters) {
+                OrcFilters.Predicate orcPred = OrcFilterConverter.toOrcPredicate(pred);
                 if (orcPred != null) {
                     orcPredicates.add(orcPred);
                 }
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFilterConverter.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFilterConverter.java
new file mode 100644
index 00000000..acccd161
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFilterConverter.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.orc.OrcFilters;
+import org.apache.flink.orc.OrcFilters.Predicate;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.file.predicate.CompoundPredicate;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.GreaterOrEqual;
+import org.apache.flink.table.store.file.predicate.GreaterThan;
+import org.apache.flink.table.store.file.predicate.IsNotNull;
+import org.apache.flink.table.store.file.predicate.IsNull;
+import org.apache.flink.table.store.file.predicate.LeafFunction;
+import org.apache.flink.table.store.file.predicate.LeafPredicate;
+import org.apache.flink.table.store.file.predicate.LessOrEqual;
+import org.apache.flink.table.store.file.predicate.LessThan;
+import org.apache.flink.table.store.file.predicate.NotEqual;
+import org.apache.flink.table.store.file.predicate.Or;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.function.TriFunction;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.sql.Date;
+import java.time.LocalDate;
+import java.util.function.Function;
+
+/** Utility class that provides helper methods to work with Orc Filter PushDown. */
+public class OrcFilterConverter {
+
+    private static final ImmutableMap<
+                    Class<? extends LeafFunction>, Function<LeafPredicate, Predicate>>
+            FILTERS =
+                    new ImmutableMap.Builder<
+                                    Class<? extends LeafFunction>,
+                                    Function<LeafPredicate, Predicate>>()
+                            .put(IsNull.class, OrcFilterConverter::convertIsNull)
+                            .put(IsNotNull.class, OrcFilterConverter::convertIsNotNull)
+                            .put(
+                                    Equal.class,
+                                    call -> convertBinary(call, OrcFilterConverter::convertEquals))
+                            .put(
+                                    NotEqual.class,
+                                    call ->
+                                            convertBinary(
+                                                    call, OrcFilterConverter::convertNotEquals))
+                            .put(
+                                    GreaterThan.class,
+                                    call ->
+                                            convertBinary(
+                                                    call, OrcFilterConverter::convertGreaterThan))
+                            .put(
+                                    GreaterOrEqual.class,
+                                    call ->
+                                            convertBinary(
+                                                    call,
+                                                    OrcFilterConverter::convertGreaterThanEquals))
+                            .put(
+                                    LessThan.class,
+                                    call ->
+                                            convertBinary(
+                                                    call, OrcFilterConverter::convertLessThan))
+                            .put(
+                                    LessOrEqual.class,
+                                    call ->
+                                            convertBinary(
+                                                    call,
+                                                    OrcFilterConverter::convertLessThanEquals))
+                            .build();
+
+    private static Predicate convertIsNull(LeafPredicate predicate) {
+        PredicateLeaf.Type colType = toOrcType(predicate.type());
+        if (colType == null) {
+            return null;
+        }
+
+        return new OrcFilters.IsNull(predicate.fieldName(), colType);
+    }
+
+    private static Predicate convertIsNotNull(LeafPredicate predicate) {
+        Predicate isNull = convertIsNull(predicate);
+        if (isNull == null) {
+            return null;
+        }
+        return new OrcFilters.Not(isNull);
+    }
+
+    private static Predicate convertOr(CompoundPredicate or) {
+        if (or.children().size() != 2) {
+            throw new RuntimeException("Illegal or children: " + or.children().size());
+        }
+
+        Predicate c1 = toOrcPredicate(or.children().get(0));
+        if (c1 == null) {
+            return null;
+        }
+        Predicate c2 = toOrcPredicate(or.children().get(1));
+        if (c2 == null) {
+            return null;
+        }
+
+        return new OrcFilters.Or(c1, c2);
+    }
+
+    private static Predicate convertBinary(
+            LeafPredicate predicate,
+            TriFunction<String, PredicateLeaf.Type, Serializable, Predicate> func) {
+        PredicateLeaf.Type litType = toOrcType(predicate.type());
+        if (litType == null) {
+            return null;
+        }
+
+        String colName = predicate.fieldName();
+
+        // fetch literal and ensure it is serializable
+        Object orcObj = toOrcObject(litType, predicate.literals().get(0));
+        Serializable literal;
+        // validate that literal is serializable
+        if (orcObj instanceof Serializable) {
+            literal = (Serializable) orcObj;
+        } else {
+            return null;
+        }
+
+        return func.apply(colName, litType, literal);
+    }
+
+    private static Predicate convertEquals(
+            String colName, PredicateLeaf.Type litType, Serializable literal) {
+        return new OrcFilters.Equals(colName, litType, literal);
+    }
+
+    private static Predicate convertNotEquals(
+            String colName, PredicateLeaf.Type litType, Serializable literal) {
+        return new OrcFilters.Not(convertEquals(colName, litType, literal));
+    }
+
+    private static Predicate convertGreaterThan(
+            String colName, PredicateLeaf.Type litType, Serializable literal) {
+        return new OrcFilters.Not(new OrcFilters.LessThanEquals(colName, litType, literal));
+    }
+
+    private static Predicate convertGreaterThanEquals(
+            String colName, PredicateLeaf.Type litType, Serializable literal) {
+        return new OrcFilters.Not(new OrcFilters.LessThan(colName, litType, literal));
+    }
+
+    private static Predicate convertLessThan(
+            String colName, PredicateLeaf.Type litType, Serializable literal) {
+        return new OrcFilters.LessThan(colName, litType, literal);
+    }
+
+    private static Predicate convertLessThanEquals(
+            String colName, PredicateLeaf.Type litType, Serializable literal) {
+        return new OrcFilters.LessThanEquals(colName, litType, literal);
+    }
+
+    @Nullable
+    public static Predicate toOrcPredicate(
+            org.apache.flink.table.store.file.predicate.Predicate expression) {
+        if (expression instanceof CompoundPredicate) {
+            CompoundPredicate compound = (CompoundPredicate) expression;
+            if (compound.function().equals(Or.INSTANCE)) {
+                return convertOr(compound);
+            }
+        } else if (expression instanceof LeafPredicate) {
+            LeafPredicate leaf = (LeafPredicate) expression;
+            Function<LeafPredicate, Predicate> function = FILTERS.get(leaf.function().getClass());
+            if (function == null) {
+                return null;
+            }
+            return function.apply(leaf);
+        }
+        return null;
+    }
+
+    @Nullable
+    private static Object toOrcObject(PredicateLeaf.Type litType, Object literalObj) {
+        if (literalObj == null) {
+            return null;
+        }
+
+        switch (litType) {
+            case STRING:
+                return literalObj.toString();
+            case DECIMAL:
+                return ((DecimalData) literalObj).toBigDecimal();
+            case DATE:
+                return Date.valueOf(LocalDate.ofEpochDay(((Number) literalObj).longValue()));
+            case TIMESTAMP:
+                return ((TimestampData) literalObj).toTimestamp();
+            default:
+                return literalObj;
+        }
+    }
+
+    @Nullable
+    private static PredicateLeaf.Type toOrcType(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+                return PredicateLeaf.Type.LONG;
+            case FLOAT:
+            case DOUBLE:
+                return PredicateLeaf.Type.FLOAT;
+            case BOOLEAN:
+                return PredicateLeaf.Type.BOOLEAN;
+            case CHAR:
+            case VARCHAR:
+                return PredicateLeaf.Type.STRING;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return PredicateLeaf.Type.TIMESTAMP;
+            case DATE:
+                return PredicateLeaf.Type.DATE;
+            case DECIMAL:
+                return PredicateLeaf.Type.DECIMAL;
+            default:
+                return null;
+        }
+    }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
new file mode 100644
index 00000000..1c11af1c
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.orc.OrcFilters;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit Tests for {@link OrcFilterConverter}. */
+public class OrcFilterConverterTest {
+
+    @Test
+    public void testApplyPredicate() {
+        PredicateBuilder builder =
+                new PredicateBuilder(
+                        new RowType(
+                                Collections.singletonList(
+                                        new RowType.RowField("long1", new BigIntType()))));
+        test(builder.isNull(0), new OrcFilters.IsNull("long1", PredicateLeaf.Type.LONG));
+        test(
+                builder.isNotNull(0),
+                new OrcFilters.Not(new OrcFilters.IsNull("long1", PredicateLeaf.Type.LONG)));
+        test(builder.equal(0, 10L), new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 10));
+        test(
+                builder.notEqual(0, 10L),
+                new OrcFilters.Not(new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 10)));
+        test(
+                builder.lessThan(0, 10L),
+                new OrcFilters.LessThan("long1", PredicateLeaf.Type.LONG, 10));
+        test(
+                builder.lessOrEqual(0, 10L),
+                new OrcFilters.LessThanEquals("long1", PredicateLeaf.Type.LONG, 10));
+        test(
+                builder.greaterThan(0, 10L),
+                new OrcFilters.Not(
+                        new OrcFilters.LessThanEquals("long1", PredicateLeaf.Type.LONG, 10)));
+        test(
+                builder.greaterOrEqual(0, 10L),
+                new OrcFilters.Not(new OrcFilters.LessThan("long1", PredicateLeaf.Type.LONG, 10)));
+
+        test(
+                builder.in(0, Arrays.asList(1L, 2L, 3L)),
+                new OrcFilters.Or(
+                        new OrcFilters.Or(
+                                new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 1),
+                                new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 2)),
+                        new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 3)));
+    }
+
+    private void test(Predicate predicate, OrcFilters.Predicate orcPredicate) {
+        assertThat(OrcFilterConverter.toOrcPredicate(predicate))
+                .hasToString(orcPredicate.toString());
+    }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index bcdf4814..5d24bef9 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
 
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
@@ -52,7 +53,7 @@ import java.util.Optional;
 public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer> {
 
     @Override
-    public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
+    public InputSplit[] getSplits(JobConf jobConf, int numSplits) {
         FileStoreTable table = createFileStoreTable(jobConf);
         TableScan scan = table.newScan();
         createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
@@ -66,8 +67,10 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
             InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
         FileStoreTable table = createFileStoreTable(jobConf);
         TableStoreInputSplit split = (TableStoreInputSplit) inputSplit;
+        TableRead read = table.newRead();
+        createPredicate(table.schema(), jobConf).ifPresent(read::withFilter);
         return new TableStoreRecordReader(
-                table.newRead(),
+                read,
                 split,
                 table.schema().fieldNames(),
                 Arrays.asList(ColumnProjectionUtils.getReadColumnNames(jobConf)));
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
index 0ab02e1a..37d799ea 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
@@ -94,7 +94,9 @@ public class SearchArgumentToPredicateConverterTest {
                 new SearchArgumentToPredicateConverter(
                         sarg, Collections.singletonList("a"), Collections.singletonList(flinkType));
 
-        Predicate expected = new PredicateBuilder(RowType.of(flinkType)).equal(0, flinkLiteral);
+        Predicate expected =
+                new PredicateBuilder(RowType.of(new LogicalType[] {flinkType}, new String[] {"a"}))
+                        .equal(0, flinkLiteral);
         Predicate actual = converter.convert().orElse(null);
         assertThat(actual).isEqualTo(expected);
     }
@@ -176,6 +178,15 @@ public class SearchArgumentToPredicateConverterTest {
                 builder.in("f_bigint", PredicateLeaf.Type.LONG, 100L, 200L, 300L).build();
         Predicate expected = BUILDER.in(1, Arrays.asList(100L, 200L, 300L));
         assertExpected(sarg, expected);
+
+        builder = SearchArgumentFactory.newBuilder();
+        Object[] literals = new Object[30];
+        for (int i = 0; i < literals.length; i++) {
+            literals[i] = i * 100L;
+        }
+        sarg = builder.in("f_bigint", PredicateLeaf.Type.LONG, literals).build();
+        expected = BUILDER.in(1, Arrays.asList(literals));
+        assertExpected(sarg, expected);
     }
 
     @Test
@@ -211,6 +222,33 @@ public class SearchArgumentToPredicateConverterTest {
         assertExpected(sarg, expected);
     }
 
+    @Test
+    public void testLargeIn() {
+        SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+        Object[] literals = new Object[30];
+        literals[0] = null;
+        for (int i = 1; i < literals.length; i++) {
+            literals[i] = i * 100L;
+        }
+        SearchArgument sarg = builder.in("f_bigint", PredicateLeaf.Type.LONG, literals).build();
+        Predicate expected = BUILDER.in(1, Arrays.asList(literals));
+        assertExpected(sarg, expected);
+    }
+
+    @Test
+    public void testLargeNotIn() {
+        SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+        Object[] literals = new Object[30];
+        literals[0] = null;
+        for (int i = 1; i < literals.length; i++) {
+            literals[i] = i * 100L;
+        }
+        SearchArgument sarg =
+                builder.startNot().in("f_bigint", PredicateLeaf.Type.LONG, literals).end().build();
+        Predicate expected = BUILDER.notIn(1, Arrays.asList(literals));
+        assertExpected(sarg, expected);
+    }
+
     @Test
     public void testBetween() {
         SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
index f247f908..c94db2eb 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
@@ -18,9 +18,11 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.utils.TypeUtils;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -31,6 +33,9 @@ import org.apache.spark.sql.connector.read.PartitionReaderFactory;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.List;
+
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
 
 /** A Spark {@link PartitionReaderFactory} for table store. */
 public class SparkReaderFactory implements PartitionReaderFactory {
@@ -39,10 +44,13 @@ public class SparkReaderFactory implements PartitionReaderFactory {
 
     private final FileStoreTable table;
     private final int[] projectedFields;
+    private final List<Predicate> predicates;
 
-    public SparkReaderFactory(FileStoreTable table, int[] projectedFields) {
+    public SparkReaderFactory(
+            FileStoreTable table, int[] projectedFields, List<Predicate> predicates) {
         this.table = table;
         this.projectedFields = projectedFields;
+        this.predicates = predicates;
     }
 
     private RowType readRowType() {
@@ -52,11 +60,12 @@ public class SparkReaderFactory implements PartitionReaderFactory {
     @Override
     public PartitionReader<InternalRow> createReader(InputPartition partition) {
         RecordReader<RowData> reader;
+        TableRead read = table.newRead().withProjection(projectedFields);
+        if (predicates.size() > 0) {
+            read.withFilter(and(predicates));
+        }
         try {
-            reader =
-                    table.newRead()
-                            .withProjection(projectedFields)
-                            .createReader(((SparkInputPartition) partition).split());
+            reader = read.createReader(((SparkInputPartition) partition).split());
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
index da96491d..5db24a09 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
@@ -79,7 +79,7 @@ public class SparkScan implements Scan, SupportsReportStatistics {
 
             @Override
             public PartitionReaderFactory createReaderFactory() {
-                return new SparkReaderFactory(table, projectedFields);
+                return new SparkReaderFactory(table, projectedFields, predicates);
             }
         };
     }
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
index 8b35a31f..7180f3d6 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
@@ -122,6 +122,16 @@ public class SparkFilterConverterTest {
         Predicate expectedIn = builder.in(0, Arrays.asList(1, null, 2));
         Predicate actualIn = converter.convert(in);
         assertThat(actualIn).isEqualTo(expectedIn);
+
+        Object[] literals = new Object[30];
+        literals[0] = null;
+        for (int i = 1; i < literals.length; i++) {
+            literals[i] = i * 100;
+        }
+        In largeIn = In.apply(field, literals);
+        Predicate expectedLargeIn = builder.in(0, Arrays.asList(literals));
+        Predicate actualLargeIn = converter.convert(largeIn);
+        assertThat(actualLargeIn).isEqualTo(expectedLargeIn);
     }
 
     @Test