You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/22 04:03:54 UTC

[flink-table-store] branch master updated: [FLINK-28179] LeafPredicate accepts multiple literals

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new cceca50  [FLINK-28179] LeafPredicate accepts multiple literals
cceca50 is described below

commit cceca50c778a0c450f959d279c69f98a5960ad3f
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Jun 22 12:03:49 2022 +0800

    [FLINK-28179] LeafPredicate accepts multiple literals
    
    This closes #168
---
 .../store/connector/source/TableStoreSource.java   |   4 +-
 .../file/operation/AbstractFileStoreScan.java      |   9 +-
 .../store/file/operation/FileStoreCommitImpl.java  |   2 +-
 .../predicate/{IsNull.java => CompareUtils.java}   |  42 ++--
 .../flink/table/store/file/predicate/Equal.java    |  25 +--
 .../table/store/file/predicate/GreaterOrEqual.java |  23 +--
 .../table/store/file/predicate/GreaterThan.java    |  23 +--
 .../table/store/file/predicate/IsNotNull.java      |  17 +-
 .../flink/table/store/file/predicate/IsNull.java   |  17 +-
 .../{IsNotNull.java => LeafBinaryFunction.java}    |  26 +--
 .../predicate/{IsNull.java => LeafFunction.java}   |  26 +--
 .../table/store/file/predicate/LeafPredicate.java  |  79 ++++++--
 .../{IsNotNull.java => LeafUnaryFunction.java}     |  25 ++-
 .../table/store/file/predicate/LessOrEqual.java    |  23 +--
 .../flink/table/store/file/predicate/LessThan.java |  23 +--
 .../flink/table/store/file/predicate/Literal.java  | 173 ----------------
 .../flink/table/store/file/predicate/NotEqual.java |  25 +--
 .../store/file/predicate/PredicateBuilder.java     | 172 ++++++++++++----
 .../store/file/predicate/PredicateConverter.java   |  98 +++++----
 .../table/store/file/predicate/StartsWith.java     |  27 ++-
 .../flink/table/store/table/sink/TableCompact.java |   3 +-
 .../flink/table/store/table/source/TableScan.java  |   5 +-
 .../file/operation/KeyValueFileStoreScanTest.java  |   4 +-
 .../store/file/predicate/PredicateBuilderTest.java |  32 +--
 .../file/predicate/PredicateConverterTest.java     |  34 ++--
 .../table/store/file/predicate/PredicateTest.java  | 220 +++++++--------------
 .../store/table/AppendOnlyFileStoreTableTest.java  |  12 +-
 .../ChangelogValueCountFileStoreTableTest.java     |  12 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  20 +-
 .../store/SearchArgumentToPredicateConverter.java  |  26 ++-
 .../SearchArgumentToPredicateConverterTest.java    |  63 +++---
 .../table/store/spark/SparkFilterConverter.java    |  37 ++--
 .../store/spark/SparkFilterConverterTest.java      |  53 +++--
 33 files changed, 624 insertions(+), 756 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 5b92912..d181e42 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -47,6 +47,7 @@ import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
@@ -184,8 +185,9 @@ public class TableStoreSource
     @Override
     public Result applyFilters(List<ResolvedExpression> filters) {
         List<Predicate> converted = new ArrayList<>();
+        RowType rowType = table.schema().logicalRowType();
         for (ResolvedExpression filter : filters) {
-            PredicateConverter.convert(filter).ifPresent(converted::add);
+            PredicateConverter.convert(rowType, filter).ifPresent(converted::add);
         }
         predicate = converted.isEmpty() ? null : PredicateBuilder.and(converted);
         return Result.of(filters, filters);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 1a82f29..4745d41 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
@@ -90,16 +89,14 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
 
     @Override
     public FileStoreScan withPartitionFilter(List<BinaryRowData> partitions) {
+        PredicateBuilder builder = new PredicateBuilder(partitionConverter.rowType());
         Function<BinaryRowData, Predicate> partitionToPredicate =
                 p -> {
                     List<Predicate> fieldPredicates = new ArrayList<>();
                     Object[] partitionObjects = partitionConverter.convert(p);
                     for (int i = 0; i < partitionConverter.getArity(); i++) {
-                        Literal l =
-                                new Literal(
-                                        partitionConverter.rowType().getTypeAt(i),
-                                        partitionObjects[i]);
-                        fieldPredicates.add(PredicateBuilder.equal(i, l));
+                        Object partition = partitionObjects[i];
+                        fieldPredicates.add(builder.equal(i, partition));
                     }
                     return PredicateBuilder.and(fieldPredicates);
                 };
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 79b0f8c..24d3301 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -204,7 +204,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
 
         List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
         // sanity check, all changes must be done within the given partition
-        Predicate partitionFilter = PredicateConverter.CONVERTER.fromMap(partition, partitionType);
+        Predicate partitionFilter = PredicateConverter.fromMap(partition, partitionType);
         if (partitionFilter != null) {
             for (ManifestEntry entry : appendChanges) {
                 if (!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java
similarity index 53%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java
index a227972..8c8ae4d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java
@@ -18,29 +18,23 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
-
-import java.util.Optional;
-
-/** A {@link LeafPredicate.Function} to eval is null. */
-public class IsNull implements LeafPredicate.Function {
-
-    public static final IsNull INSTANCE = new IsNull();
-
-    private IsNull() {}
-
-    @Override
-    public boolean test(Object[] values, int index, Literal literal) {
-        return values[index] == null;
-    }
-
-    @Override
-    public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
-        return fieldStats[index].nullCount() > 0;
-    }
-
-    @Override
-    public Optional<Predicate> negate(int index, Literal literal) {
-        return Optional.of(new LeafPredicate(IsNotNull.INSTANCE, index, literal));
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** Utils for comparator. */
+public class CompareUtils {
+    private CompareUtils() {}
+
+    private static final BytePrimitiveArrayComparator BINARY_COMPARATOR =
+            new BytePrimitiveArrayComparator(true);
+
+    public static int compareLiteral(LogicalType type, Object v1, Object v2) {
+        if (v1 instanceof Comparable) {
+            return ((Comparable<Object>) v1).compareTo(v2);
+        } else if (v1 instanceof byte[]) {
+            return BINARY_COMPARATOR.compare((byte[]) v1, (byte[]) v2);
+        } else {
+            throw new RuntimeException("Unsupported type: " + type);
+        }
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
index 074acf1..cf9a0f9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
@@ -19,34 +19,35 @@
 package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
 
-/** A {@link LeafPredicate.Function} to eval equal. */
-public class Equal implements LeafPredicate.Function {
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafBinaryFunction} to eval equal. */
+public class Equal extends LeafBinaryFunction {
 
     public static final Equal INSTANCE = new Equal();
 
     private Equal() {}
 
     @Override
-    public boolean test(Object[] values, int index, Literal literal) {
-        Object field = values[index];
-        return field != null && literal.compareValueTo(field) == 0;
+    public boolean test(LogicalType type, Object field, Object literal) {
+        return field != null && compareLiteral(type, literal, field) == 0;
     }
 
     @Override
-    public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
-        FieldStats stats = fieldStats[index];
-        if (rowCount == stats.nullCount()) {
+    public boolean test(LogicalType type, long rowCount, FieldStats fieldStats, Object literal) {
+        if (rowCount == fieldStats.nullCount()) {
             return false;
         }
-        return literal.compareValueTo(stats.minValue()) >= 0
-                && literal.compareValueTo(stats.maxValue()) <= 0;
+        return compareLiteral(type, literal, fieldStats.minValue()) >= 0
+                && compareLiteral(type, literal, fieldStats.maxValue()) <= 0;
     }
 
     @Override
-    public Optional<Predicate> negate(int index, Literal literal) {
-        return Optional.of(new LeafPredicate(NotEqual.INSTANCE, index, literal));
+    public Optional<LeafFunction> negate() {
+        return Optional.of(NotEqual.INSTANCE);
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
index 9527d3b..e93857e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
@@ -19,33 +19,34 @@
 package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
 
-/** A {@link LeafPredicate.Function} to eval greater or equal. */
-public class GreaterOrEqual implements LeafPredicate.Function {
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafBinaryFunction} to eval greater or equal. */
+public class GreaterOrEqual extends LeafBinaryFunction {
 
     public static final GreaterOrEqual INSTANCE = new GreaterOrEqual();
 
     private GreaterOrEqual() {}
 
     @Override
-    public boolean test(Object[] values, int index, Literal literal) {
-        Object field = values[index];
-        return field != null && literal.compareValueTo(field) <= 0;
+    public boolean test(LogicalType type, Object field, Object literal) {
+        return field != null && compareLiteral(type, literal, field) <= 0;
     }
 
     @Override
-    public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
-        FieldStats stats = fieldStats[index];
-        if (rowCount == stats.nullCount()) {
+    public boolean test(LogicalType type, long rowCount, FieldStats fieldStats, Object literal) {
+        if (rowCount == fieldStats.nullCount()) {
             return false;
         }
-        return literal.compareValueTo(stats.maxValue()) <= 0;
+        return compareLiteral(type, literal, fieldStats.maxValue()) <= 0;
     }
 
     @Override
-    public Optional<Predicate> negate(int index, Literal literal) {
-        return Optional.of(new LeafPredicate(LessThan.INSTANCE, index, literal));
+    public Optional<LeafFunction> negate() {
+        return Optional.of(LessThan.INSTANCE);
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
index d097f88..3e771ae 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
@@ -19,33 +19,34 @@
 package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
 
-/** A {@link LeafPredicate.Function} to eval greater. */
-public class GreaterThan implements LeafPredicate.Function {
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafFunction} to eval greater. */
+public class GreaterThan extends LeafBinaryFunction {
 
     public static final GreaterThan INSTANCE = new GreaterThan();
 
     private GreaterThan() {}
 
     @Override
-    public boolean test(Object[] values, int index, Literal literal) {
-        Object field = values[index];
-        return field != null && literal.compareValueTo(field) < 0;
+    public boolean test(LogicalType type, Object field, Object literal) {
+        return field != null && compareLiteral(type, literal, field) < 0;
     }
 
     @Override
-    public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
-        FieldStats stats = fieldStats[index];
-        if (rowCount == stats.nullCount()) {
+    public boolean test(LogicalType type, long rowCount, FieldStats fieldStats, Object literal) {
+        if (rowCount == fieldStats.nullCount()) {
             return false;
         }
-        return literal.compareValueTo(stats.maxValue()) < 0;
+        return compareLiteral(type, literal, fieldStats.maxValue()) < 0;
     }
 
     @Override
-    public Optional<Predicate> negate(int index, Literal literal) {
-        return Optional.of(new LeafPredicate(LessOrEqual.INSTANCE, index, literal));
+    public Optional<LeafFunction> negate() {
+        return Optional.of(LessOrEqual.INSTANCE);
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
index e4e19c5..093ad66 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
@@ -19,28 +19,29 @@
 package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
 
-/** A {@link LeafPredicate.Function} to eval is not null. */
-public class IsNotNull implements LeafPredicate.Function {
+/** A {@link LeafBinaryFunction} to eval is not null. */
+public class IsNotNull extends LeafUnaryFunction {
 
     public static final IsNotNull INSTANCE = new IsNotNull();
 
     private IsNotNull() {}
 
     @Override
-    public boolean test(Object[] values, int index, Literal literal) {
-        return values[index] != null;
+    public boolean test(LogicalType type, Object field) {
+        return field != null;
     }
 
     @Override
-    public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
-        return fieldStats[index].nullCount() < rowCount;
+    public boolean test(LogicalType type, long rowCount, FieldStats fieldStats) {
+        return fieldStats.nullCount() < rowCount;
     }
 
     @Override
-    public Optional<Predicate> negate(int index, Literal literal) {
-        return Optional.of(new LeafPredicate(IsNull.INSTANCE, index, literal));
+    public Optional<LeafFunction> negate() {
+        return Optional.of(IsNull.INSTANCE);
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
index a227972..5aa4842 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
@@ -19,28 +19,29 @@
 package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
 
-/** A {@link LeafPredicate.Function} to eval is null. */
-public class IsNull implements LeafPredicate.Function {
+/** A {@link LeafBinaryFunction} to eval is null. */
+public class IsNull extends LeafUnaryFunction {
 
     public static final IsNull INSTANCE = new IsNull();
 
     private IsNull() {}
 
     @Override
-    public boolean test(Object[] values, int index, Literal literal) {
-        return values[index] == null;
+    public boolean test(LogicalType type, Object field) {
+        return field == null;
     }
 
     @Override
-    public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
-        return fieldStats[index].nullCount() > 0;
+    public boolean test(LogicalType type, long rowCount, FieldStats fieldStats) {
+        return fieldStats.nullCount() > 0;
     }
 
     @Override
-    public Optional<Predicate> negate(int index, Literal literal) {
-        return Optional.of(new LeafPredicate(IsNotNull.INSTANCE, index, literal));
+    public Optional<LeafFunction> negate() {
+        return Optional.of(IsNotNull.INSTANCE);
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java
similarity index 55%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java
index e4e19c5..0b91cb7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java
@@ -19,28 +19,28 @@
 package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
 
-import java.util.Optional;
+import java.util.List;
 
-/** A {@link LeafPredicate.Function} to eval is not null. */
-public class IsNotNull implements LeafPredicate.Function {
+/** Function to test a field with a literal. */
+public abstract class LeafBinaryFunction implements LeafFunction {
 
-    public static final IsNotNull INSTANCE = new IsNotNull();
+    private static final long serialVersionUID = 1L;
 
-    private IsNotNull() {}
+    public abstract boolean test(LogicalType type, Object field, Object literal);
 
-    @Override
-    public boolean test(Object[] values, int index, Literal literal) {
-        return values[index] != null;
-    }
+    public abstract boolean test(
+            LogicalType type, long rowCount, FieldStats fieldStats, Object literal);
 
     @Override
-    public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
-        return fieldStats[index].nullCount() < rowCount;
+    public boolean test(LogicalType type, Object field, List<Object> literals) {
+        return test(type, field, literals.get(0));
     }
 
     @Override
-    public Optional<Predicate> negate(int index, Literal literal) {
-        return Optional.of(new LeafPredicate(IsNull.INSTANCE, index, literal));
+    public boolean test(
+            LogicalType type, long rowCount, FieldStats fieldStats, List<Object> literals) {
+        return test(type, rowCount, fieldStats, literals.get(0));
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
similarity index 59%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
index a227972..97bfceb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
@@ -19,28 +19,18 @@
 package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
 
+import java.io.Serializable;
+import java.util.List;
 import java.util.Optional;
 
-/** A {@link LeafPredicate.Function} to eval is null. */
-public class IsNull implements LeafPredicate.Function {
+/** Function to test a field with literals. */
+public interface LeafFunction extends Serializable {
 
-    public static final IsNull INSTANCE = new IsNull();
+    boolean test(LogicalType type, Object field, List<Object> literals);
 
-    private IsNull() {}
+    boolean test(LogicalType type, long rowCount, FieldStats fieldStats, List<Object> literals);
 
-    @Override
-    public boolean test(Object[] values, int index, Literal literal) {
-        return values[index] == null;
-    }
-
-    @Override
-    public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
-        return fieldStats[index].nullCount() > 0;
-    }
-
-    @Override
-    public Optional<Predicate> negate(int index, Literal literal) {
-        return Optional.of(new LeafPredicate(IsNotNull.INSTANCE, index, literal));
-    }
+    Optional<LeafFunction> negate();
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
index 244058e..2e8d6c8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
@@ -18,70 +18,107 @@
 
 package org.apache.flink.table.store.file.predicate;
 
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
 
-import java.io.Serializable;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 
-/** Leaf node of a {@link Predicate} tree. Compares a field in the row with an {@link Literal}. */
+/** Leaf node of a {@link Predicate} tree. Compares a field in the row with literals. */
 public class LeafPredicate implements Predicate {
 
-    private final Function function;
+    private static final long serialVersionUID = 1L;
+
+    private final LeafFunction function;
+    private final LogicalType type;
     private final int index;
-    private final Literal literal;
 
-    public LeafPredicate(Function function, int index, Literal literal) {
+    private transient List<Object> literals;
+
+    public LeafPredicate(
+            LeafFunction function, LogicalType type, int index, List<Object> literals) {
         this.function = function;
+        this.type = type;
         this.index = index;
-        this.literal = literal;
+        this.literals = literals;
     }
 
-    public Function function() {
+    public LeafFunction function() {
         return function;
     }
 
+    @Nullable
+    public LogicalType type() {
+        return type;
+    }
+
     public int index() {
         return index;
     }
 
-    public Literal literal() {
-        return literal;
+    public List<Object> literals() {
+        return literals;
     }
 
     @Override
     public boolean test(Object[] values) {
-        return function.test(values, index, literal);
+        return function.test(type, values[index], literals);
     }
 
     @Override
     public boolean test(long rowCount, FieldStats[] fieldStats) {
-        return function.test(rowCount, fieldStats, index, literal);
+        return function.test(type, rowCount, fieldStats[index], literals);
     }
 
     @Override
     public Optional<Predicate> negate() {
-        return function.negate(index, literal);
+        return function.negate().map(negate -> new LeafPredicate(negate, type, index, literals));
     }
 
     @Override
     public boolean equals(Object o) {
-        if (!(o instanceof LeafPredicate)) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
             return false;
         }
         LeafPredicate that = (LeafPredicate) o;
-        return Objects.equals(function, that.function)
-                && index == that.index
-                && Objects.equals(literal, that.literal);
+        return index == that.index
+                && Objects.equals(function, that.function)
+                && Objects.equals(type, that.type)
+                && Objects.equals(literals, that.literals);
     }
 
-    /** Function to compare a field in the row with an {@link Literal}. */
-    public interface Function extends Serializable {
+    @Override
+    public int hashCode() {
+        return Objects.hash(function, type, index, literals);
+    }
 
-        boolean test(Object[] values, int index, Literal literal);
+    private ListSerializer<Object> objectsSerializer() {
+        return new ListSerializer<>(
+                NullableSerializer.wrapIfNullIsNotSupported(
+                        InternalSerializers.create(type), false));
+    }
 
-        boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal);
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        objectsSerializer().serialize(literals, new DataOutputViewStreamWrapper(out));
+    }
 
-        Optional<Predicate> negate(int index, Literal literal);
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        literals = objectsSerializer().deserialize(new DataInputViewStreamWrapper(in));
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
similarity index 59%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
index e4e19c5..aa414c6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
@@ -19,28 +19,27 @@
 package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
 
-import java.util.Optional;
+import java.util.List;
 
-/** A {@link LeafPredicate.Function} to eval is not null. */
-public class IsNotNull implements LeafPredicate.Function {
+/** Function to test a field. */
+public abstract class LeafUnaryFunction implements LeafFunction {
 
-    public static final IsNotNull INSTANCE = new IsNotNull();
+    private static final long serialVersionUID = 1L;
 
-    private IsNotNull() {}
+    public abstract boolean test(LogicalType type, Object value);
 
-    @Override
-    public boolean test(Object[] values, int index, Literal literal) {
-        return values[index] != null;
-    }
+    public abstract boolean test(LogicalType type, long rowCount, FieldStats fieldStats);
 
     @Override
-    public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
-        return fieldStats[index].nullCount() < rowCount;
+    public boolean test(LogicalType type, Object value, List<Object> literals) {
+        return test(type, value);
     }
 
     @Override
-    public Optional<Predicate> negate(int index, Literal literal) {
-        return Optional.of(new LeafPredicate(IsNull.INSTANCE, index, literal));
+    public boolean test(
+            LogicalType type, long rowCount, FieldStats fieldStats, List<Object> literals) {
+        return test(type, rowCount, fieldStats);
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
index 5b544ea..784fef3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
@@ -19,33 +19,34 @@
 package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
 
-/** A {@link LeafPredicate.Function} to eval less or equal. */
-public class LessOrEqual implements LeafPredicate.Function {
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafBinaryFunction} to eval less or equal. */
+public class LessOrEqual extends LeafBinaryFunction {
 
     public static final LessOrEqual INSTANCE = new LessOrEqual();
 
     private LessOrEqual() {}
 
     @Override
-    public boolean test(Object[] values, int index, Literal literal) {
-        Object field = values[index];
-        return field != null && literal.compareValueTo(field) >= 0;
+    public boolean test(LogicalType type, Object field, Object literal) {
+        return field != null && compareLiteral(type, literal, field) >= 0;
     }
 
     @Override
-    public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
-        FieldStats stats = fieldStats[index];
-        if (rowCount == stats.nullCount()) {
+    public boolean test(LogicalType type, long rowCount, FieldStats fieldStats, Object literal) {
+        if (rowCount == fieldStats.nullCount()) {
             return false;
         }
-        return literal.compareValueTo(stats.minValue()) >= 0;
+        return compareLiteral(type, literal, fieldStats.minValue()) >= 0;
     }
 
     @Override
-    public Optional<Predicate> negate(int index, Literal literal) {
-        return Optional.of(new LeafPredicate(GreaterThan.INSTANCE, index, literal));
+    public Optional<LeafFunction> negate() {
+        return Optional.of(GreaterThan.INSTANCE);
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
index 7fc3f2c..5ef2175 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
@@ -19,33 +19,34 @@
 package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
 
-/** A {@link LeafPredicate.Function} to eval less. */
-public class LessThan implements LeafPredicate.Function {
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafBinaryFunction} to eval less or equal. */
+public class LessThan extends LeafBinaryFunction {
 
     public static final LessThan INSTANCE = new LessThan();
 
     private LessThan() {}
 
     @Override
-    public boolean test(Object[] values, int index, Literal literal) {
-        Object field = values[index];
-        return field != null && literal.compareValueTo(field) > 0;
+    public boolean test(LogicalType type, Object field, Object literal) {
+        return field != null && compareLiteral(type, literal, field) > 0;
     }
 
     @Override
-    public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
-        FieldStats stats = fieldStats[index];
-        if (rowCount == stats.nullCount()) {
+    public boolean test(LogicalType type, long rowCount, FieldStats fieldStats, Object literal) {
+        if (rowCount == fieldStats.nullCount()) {
             return false;
         }
-        return literal.compareValueTo(stats.minValue()) > 0;
+        return compareLiteral(type, literal, fieldStats.minValue()) > 0;
     }
 
     @Override
-    public Optional<Predicate> negate(int index, Literal literal) {
-        return Optional.of(new LeafPredicate(GreaterOrEqual.INSTANCE, index, literal));
+    public Optional<LeafFunction> negate() {
+        return Optional.of(GreaterOrEqual.INSTANCE);
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
deleted file mode 100644
index 4cbdb09..0000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.predicate;
-
-import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.runtime.typeutils.InternalSerializers;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalType;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
-import java.util.Objects;
-
-/** A serializable literal class. */
-public class Literal implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final BytePrimitiveArrayComparator BINARY_COMPARATOR =
-            new BytePrimitiveArrayComparator(true);
-
-    private final LogicalType type;
-
-    private transient Object value;
-
-    public Literal(LogicalType type, Object value) {
-        this.type = type;
-        this.value = value;
-    }
-
-    public LogicalType type() {
-        return type;
-    }
-
-    public Object value() {
-        return value;
-    }
-
-    public int compareValueTo(Object o) {
-        if (value instanceof Comparable) {
-            return ((Comparable<Object>) value).compareTo(o);
-        } else if (value instanceof byte[]) {
-            return BINARY_COMPARATOR.compare((byte[]) value, (byte[]) o);
-        } else {
-            throw new RuntimeException("Unsupported type: " + type);
-        }
-    }
-
-    private void writeObject(ObjectOutputStream out) throws IOException {
-        out.defaultWriteObject();
-        InternalSerializers.create(type).serialize(value, new DataOutputViewStreamWrapper(out));
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-        in.defaultReadObject();
-        value = InternalSerializers.create(type).deserialize(new DataInputViewStreamWrapper(in));
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof Literal)) {
-            return false;
-        }
-        Literal literal = (Literal) o;
-        return type.equals(literal.type) && value.equals(literal.value);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(type, value);
-    }
-
-    public static Literal fromJavaObject(LogicalType literalType, Object o) {
-        if (o == null) {
-            throw new UnsupportedOperationException("Null literals are currently unsupported");
-        }
-        switch (literalType.getTypeRoot()) {
-            case BOOLEAN:
-                return new Literal(literalType, o);
-            case BIGINT:
-                return new Literal(literalType, ((Number) o).longValue());
-            case DOUBLE:
-                return new Literal(literalType, ((Number) o).doubleValue());
-            case TINYINT:
-                return new Literal(literalType, ((Number) o).byteValue());
-            case SMALLINT:
-                return new Literal(literalType, ((Number) o).shortValue());
-            case INTEGER:
-                return new Literal(literalType, ((Number) o).intValue());
-            case FLOAT:
-                return new Literal(literalType, ((Number) o).floatValue());
-            case VARCHAR:
-                return new Literal(literalType, StringData.fromString(o.toString()));
-            case DATE:
-                // Hive uses `java.sql.Date.valueOf(lit.toString());` to convert a literal to Date
-                // Which uses `java.util.Date()` internally to create the object and that uses the
-                // TimeZone.getDefaultRef()
-                // To get back the expected date we have to use the LocalDate which gets rid of the
-                // TimeZone misery as it uses the year/month/day to generate the object
-                LocalDate localDate;
-                if (o instanceof Timestamp) {
-                    localDate = ((Timestamp) o).toLocalDateTime().toLocalDate();
-                } else if (o instanceof Date) {
-                    localDate = ((Date) o).toLocalDate();
-                } else if (o instanceof LocalDate) {
-                    localDate = (LocalDate) o;
-                } else {
-                    throw new UnsupportedOperationException(
-                            "Unexpected date literal of class " + o.getClass().getName());
-                }
-                LocalDate epochDay =
-                        Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC).toLocalDate();
-                int numberOfDays = (int) ChronoUnit.DAYS.between(epochDay, localDate);
-                return new Literal(literalType, numberOfDays);
-            case DECIMAL:
-                DecimalType decimalType = (DecimalType) literalType;
-                int precision = decimalType.getPrecision();
-                int scale = decimalType.getScale();
-                return new Literal(
-                        literalType, DecimalData.fromBigDecimal((BigDecimal) o, precision, scale));
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                TimestampData timestampData;
-                if (o instanceof Timestamp) {
-                    timestampData = TimestampData.fromTimestamp((Timestamp) o);
-                } else if (o instanceof Instant) {
-                    timestampData = TimestampData.fromInstant((Instant) o);
-                } else if (o instanceof LocalDateTime) {
-                    timestampData = TimestampData.fromLocalDateTime((LocalDateTime) o);
-                } else {
-                    throw new UnsupportedOperationException("Unsupported object: " + o);
-                }
-                return new Literal(literalType, timestampData);
-            default:
-                throw new UnsupportedOperationException(
-                        "Unsupported predicate leaf type " + literalType.getTypeRoot().name());
-        }
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
index f3e9066..7137951 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
@@ -19,34 +19,35 @@
 package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
 
-/** A {@link LeafPredicate.Function} to eval not equal. */
-public class NotEqual implements LeafPredicate.Function {
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafBinaryFunction} to eval not equal. */
+public class NotEqual extends LeafBinaryFunction {
 
     public static final NotEqual INSTANCE = new NotEqual();
 
     private NotEqual() {}
 
     @Override
-    public boolean test(Object[] values, int index, Literal literal) {
-        Object field = values[index];
-        return field != null && literal.compareValueTo(field) != 0;
+    public boolean test(LogicalType type, Object field, Object literal) {
+        return field != null && compareLiteral(type, literal, field) != 0;
     }
 
     @Override
-    public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
-        FieldStats stats = fieldStats[index];
-        if (rowCount == stats.nullCount()) {
+    public boolean test(LogicalType type, long rowCount, FieldStats fieldStats, Object literal) {
+        if (rowCount == fieldStats.nullCount()) {
             return false;
         }
-        return literal.compareValueTo(stats.minValue()) != 0
-                || literal.compareValueTo(stats.maxValue()) != 0;
+        return compareLiteral(type, literal, fieldStats.minValue()) != 0
+                || compareLiteral(type, literal, fieldStats.maxValue()) != 0;
     }
 
     @Override
-    public Optional<Predicate> negate(int index, Literal literal) {
-        return Optional.of(new LeafPredicate(Equal.INSTANCE, index, literal));
+    public Optional<LeafFunction> negate() {
+        return Optional.of(Equal.INSTANCE);
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
index 03285ee..620900e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
@@ -18,49 +18,98 @@
 
 package org.apache.flink.table.store.file.predicate;
 
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
+import static java.util.Collections.singletonList;
+
 /** A utility class to create {@link Predicate} object for common filter conditions. */
 public class PredicateBuilder {
 
-    public static Predicate equal(int idx, Literal literal) {
-        return new LeafPredicate(Equal.INSTANCE, idx, literal);
+    private final RowType rowType;
+
+    public PredicateBuilder(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    public Predicate equal(int idx, Object literal) {
+        return leaf(Equal.INSTANCE, idx, literal);
+    }
+
+    public Predicate notEqual(int idx, Object literal) {
+        return leaf(NotEqual.INSTANCE, idx, literal);
     }
 
-    public static Predicate notEqual(int idx, Literal literal) {
-        return new LeafPredicate(NotEqual.INSTANCE, idx, literal);
+    public Predicate lessThan(int idx, Object literal) {
+        return leaf(LessThan.INSTANCE, idx, literal);
     }
 
-    public static Predicate lessThan(int idx, Literal literal) {
-        return new LeafPredicate(LessThan.INSTANCE, idx, literal);
+    public Predicate lessOrEqual(int idx, Object literal) {
+        return leaf(LessOrEqual.INSTANCE, idx, literal);
     }
 
-    public static Predicate lessOrEqual(int idx, Literal literal) {
-        return new LeafPredicate(LessOrEqual.INSTANCE, idx, literal);
+    public Predicate greaterThan(int idx, Object literal) {
+        return leaf(GreaterThan.INSTANCE, idx, literal);
     }
 
-    public static Predicate greaterThan(int idx, Literal literal) {
-        return new LeafPredicate(GreaterThan.INSTANCE, idx, literal);
+    public Predicate greaterOrEqual(int idx, Object literal) {
+        return leaf(GreaterOrEqual.INSTANCE, idx, literal);
     }
 
-    public static Predicate greaterOrEqual(int idx, Literal literal) {
-        return new LeafPredicate(GreaterOrEqual.INSTANCE, idx, literal);
+    public Predicate isNull(int idx) {
+        return leaf(IsNull.INSTANCE, idx);
     }
 
-    public static Predicate isNull(int idx) {
-        return new LeafPredicate(IsNull.INSTANCE, idx, null);
+    public Predicate isNotNull(int idx) {
+        return leaf(IsNotNull.INSTANCE, idx);
     }
 
-    public static Predicate isNotNull(int idx) {
-        return new LeafPredicate(IsNotNull.INSTANCE, idx, null);
+    public Predicate startsWith(int idx, Object patternLiteral) {
+        return leaf(StartsWith.INSTANCE, idx, patternLiteral);
     }
 
-    public static Predicate startsWith(int idx, Literal patternLiteral) {
-        return new LeafPredicate(StartsWith.INSTANCE, idx, patternLiteral);
+    public Predicate leaf(LeafBinaryFunction function, int idx, Object literal) {
+        return new LeafPredicate(function, rowType.getTypeAt(idx), idx, singletonList(literal));
+    }
+
+    public Predicate leaf(LeafUnaryFunction function, int idx) {
+        return new LeafPredicate(function, rowType.getTypeAt(idx), idx, Collections.emptyList());
+    }
+
+    public Predicate in(int idx, List<Object> literals) {
+        Preconditions.checkArgument(
+                literals.size() > 0,
+                "There must be at least 1 literal to construct an IN predicate");
+        return literals.stream()
+                .map(l -> equal(idx, l))
+                .reduce((a, b) -> new CompoundPredicate(Or.INSTANCE, Arrays.asList(a, b)))
+                .get();
+    }
+
+    public Predicate between(int idx, Object includedLowerBound, Object includedUpperBound) {
+        return new CompoundPredicate(
+                And.INSTANCE,
+                Arrays.asList(
+                        greaterOrEqual(idx, includedLowerBound),
+                        lessOrEqual(idx, includedUpperBound)));
     }
 
     public static Predicate and(Predicate... predicates) {
@@ -71,6 +120,9 @@ public class PredicateBuilder {
         Preconditions.checkArgument(
                 predicates.size() > 0,
                 "There must be at least 1 inner predicate to construct an AND predicate");
+        if (predicates.size() == 1) {
+            return predicates.get(0);
+        }
         return predicates.stream()
                 .reduce((a, b) -> new CompoundPredicate(And.INSTANCE, Arrays.asList(a, b)))
                 .get();
@@ -89,25 +141,6 @@ public class PredicateBuilder {
                 .get();
     }
 
-    public static Predicate in(int idx, List<Literal> literals) {
-        Preconditions.checkArgument(
-                literals.size() > 0,
-                "There must be at least 1 literal to construct an IN predicate");
-        return literals.stream()
-                .map(l -> equal(idx, l))
-                .reduce((a, b) -> new CompoundPredicate(Or.INSTANCE, Arrays.asList(a, b)))
-                .get();
-    }
-
-    public static Predicate between(
-            int idx, Literal includedLowerBound, Literal includedUpperBound) {
-        return new CompoundPredicate(
-                And.INSTANCE,
-                Arrays.asList(
-                        greaterOrEqual(idx, includedLowerBound),
-                        lessOrEqual(idx, includedUpperBound)));
-    }
-
     public static List<Predicate> splitAnd(Predicate predicate) {
         List<Predicate> result = new ArrayList<>();
         splitAnd(predicate, result);
@@ -124,4 +157,69 @@ public class PredicateBuilder {
             result.add(predicate);
         }
     }
+
+    public static Object convertJavaObject(LogicalType literalType, Object o) {
+        if (o == null) {
+            throw new UnsupportedOperationException("Null literals are currently unsupported");
+        }
+        switch (literalType.getTypeRoot()) {
+            case BOOLEAN:
+                return o;
+            case BIGINT:
+                return ((Number) o).longValue();
+            case DOUBLE:
+                return ((Number) o).doubleValue();
+            case TINYINT:
+                return ((Number) o).byteValue();
+            case SMALLINT:
+                return ((Number) o).shortValue();
+            case INTEGER:
+                return ((Number) o).intValue();
+            case FLOAT:
+                return ((Number) o).floatValue();
+            case VARCHAR:
+                return StringData.fromString(o.toString());
+            case DATE:
+                // Hive uses `java.sql.Date.valueOf(lit.toString());` to convert a literal to Date
+                // Which uses `java.util.Date()` internally to create the object and that uses the
+                // TimeZone.getDefaultRef()
+                // To get back the expected date we have to use the LocalDate which gets rid of the
+                // TimeZone misery as it uses the year/month/day to generate the object
+                LocalDate localDate;
+                if (o instanceof Timestamp) {
+                    localDate = ((Timestamp) o).toLocalDateTime().toLocalDate();
+                } else if (o instanceof Date) {
+                    localDate = ((Date) o).toLocalDate();
+                } else if (o instanceof LocalDate) {
+                    localDate = (LocalDate) o;
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Unexpected date literal of class " + o.getClass().getName());
+                }
+                LocalDate epochDay =
+                        Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC).toLocalDate();
+                return (int) ChronoUnit.DAYS.between(epochDay, localDate);
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) literalType;
+                int precision = decimalType.getPrecision();
+                int scale = decimalType.getScale();
+                return DecimalData.fromBigDecimal((BigDecimal) o, precision, scale);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                TimestampData timestampData;
+                if (o instanceof Timestamp) {
+                    timestampData = TimestampData.fromTimestamp((Timestamp) o);
+                } else if (o instanceof Instant) {
+                    timestampData = TimestampData.fromInstant((Instant) o);
+                } else if (o instanceof LocalDateTime) {
+                    timestampData = TimestampData.fromLocalDateTime((LocalDateTime) o);
+                } else {
+                    throw new UnsupportedOperationException("Unsupported object: " + o);
+                }
+                return timestampData;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported predicate leaf type " + literalType.getTypeRoot().name());
+        }
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
index 377d0fa..2683561 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -34,7 +34,6 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
 
 import javax.annotation.Nullable;
 
@@ -51,29 +50,19 @@ import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.suppor
 /** Convert {@link Expression} to {@link Predicate}. */
 public class PredicateConverter implements ExpressionVisitor<Predicate> {
 
-    public static final PredicateConverter CONVERTER = new PredicateConverter();
+    private final PredicateBuilder builder;
 
-    /** Accepts simple LIKE patterns like "abc%". */
-    private static final Pattern BEGIN_PATTERN = Pattern.compile("([^%]+)%");
+    public PredicateConverter(RowType type) {
+        this(new PredicateBuilder(type));
+    }
 
-    @Nullable
-    public Predicate fromMap(Map<String, String> map, RowType rowType) {
-        // TODO: It is somewhat misleading that an empty map creates a null predicate filter
-        List<String> fieldNames = rowType.getFieldNames();
-        Predicate predicate = null;
-        for (Map.Entry<String, String> entry : map.entrySet()) {
-            int idx = fieldNames.indexOf(entry.getKey());
-            LogicalType type = rowType.getTypeAt(idx);
-            Literal literal = new Literal(type, TypeUtils.castFromString(entry.getValue(), type));
-            if (predicate == null) {
-                predicate = PredicateBuilder.equal(idx, literal);
-            } else {
-                predicate = PredicateBuilder.and(predicate, PredicateBuilder.equal(idx, literal));
-            }
-        }
-        return predicate;
+    public PredicateConverter(PredicateBuilder builder) {
+        this.builder = builder;
     }
 
+    /** Accepts simple LIKE patterns like "abc%". */
+    private static final Pattern BEGIN_PATTERN = Pattern.compile("([^%]+)%");
+
     @Override
     public Predicate visit(CallExpression call) {
         FunctionDefinition func = call.getFunctionDefinition();
@@ -84,31 +73,26 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
         } else if (func == BuiltInFunctionDefinitions.OR) {
             return PredicateBuilder.or(children.get(0).accept(this), children.get(1).accept(this));
         } else if (func == BuiltInFunctionDefinitions.EQUALS) {
-            return visitBiFunction(children, PredicateBuilder::equal, PredicateBuilder::equal);
+            return visitBiFunction(children, builder::equal, builder::equal);
         } else if (func == BuiltInFunctionDefinitions.NOT_EQUALS) {
-            return visitBiFunction(
-                    children, PredicateBuilder::notEqual, PredicateBuilder::notEqual);
+            return visitBiFunction(children, builder::notEqual, builder::notEqual);
         } else if (func == BuiltInFunctionDefinitions.GREATER_THAN) {
-            return visitBiFunction(
-                    children, PredicateBuilder::greaterThan, PredicateBuilder::lessThan);
+            return visitBiFunction(children, builder::greaterThan, builder::lessThan);
         } else if (func == BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL) {
-            return visitBiFunction(
-                    children, PredicateBuilder::greaterOrEqual, PredicateBuilder::lessOrEqual);
+            return visitBiFunction(children, builder::greaterOrEqual, builder::lessOrEqual);
         } else if (func == BuiltInFunctionDefinitions.LESS_THAN) {
-            return visitBiFunction(
-                    children, PredicateBuilder::lessThan, PredicateBuilder::greaterThan);
+            return visitBiFunction(children, builder::lessThan, builder::greaterThan);
         } else if (func == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL) {
-            return visitBiFunction(
-                    children, PredicateBuilder::lessOrEqual, PredicateBuilder::greaterOrEqual);
+            return visitBiFunction(children, builder::lessOrEqual, builder::greaterOrEqual);
         } else if (func == BuiltInFunctionDefinitions.IS_NULL) {
             return extractFieldReference(children.get(0))
                     .map(FieldReferenceExpression::getFieldIndex)
-                    .map(PredicateBuilder::isNull)
+                    .map(builder::isNull)
                     .orElseThrow(UnsupportedExpression::new);
         } else if (func == BuiltInFunctionDefinitions.IS_NOT_NULL) {
             return extractFieldReference(children.get(0))
                     .map(FieldReferenceExpression::getFieldIndex)
-                    .map(PredicateBuilder::isNotNull)
+                    .map(builder::isNotNull)
                     .orElseThrow(UnsupportedExpression::new);
         } else if (func == BuiltInFunctionDefinitions.LIKE) {
             FieldReferenceExpression fieldRefExpr =
@@ -120,14 +104,12 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
                 String sqlPattern =
                         extractLiteral(fieldRefExpr.getOutputDataType(), children.get(1))
                                 .orElseThrow(UnsupportedExpression::new)
-                                .value()
                                 .toString();
                 String escape =
                         children.size() <= 2
                                 ? null
                                 : extractLiteral(fieldRefExpr.getOutputDataType(), children.get(2))
                                         .orElseThrow(UnsupportedExpression::new)
-                                        .value()
                                         .toString();
                 String escapedSqlPattern = sqlPattern;
                 boolean allowQuick = false;
@@ -171,11 +153,9 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
                 if (allowQuick) {
                     Matcher beginMatcher = BEGIN_PATTERN.matcher(escapedSqlPattern);
                     if (beginMatcher.matches()) {
-                        return PredicateBuilder.startsWith(
+                        return builder.startsWith(
                                 fieldRefExpr.getFieldIndex(),
-                                new Literal(
-                                        VarCharType.STRING_TYPE,
-                                        BinaryStringData.fromString(beginMatcher.group(1))));
+                                BinaryStringData.fromString(beginMatcher.group(1)));
                     }
                 }
             }
@@ -188,10 +168,10 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
 
     private Predicate visitBiFunction(
             List<Expression> children,
-            BiFunction<Integer, Literal, Predicate> visit1,
-            BiFunction<Integer, Literal, Predicate> visit2) {
+            BiFunction<Integer, Object, Predicate> visit1,
+            BiFunction<Integer, Object, Predicate> visit2) {
         Optional<FieldReferenceExpression> fieldRefExpr = extractFieldReference(children.get(0));
-        Optional<Literal> literal;
+        Optional<Object> literal;
         if (fieldRefExpr.isPresent()) {
             literal = extractLiteral(fieldRefExpr.get().getOutputDataType(), children.get(1));
             if (literal.isPresent()) {
@@ -217,12 +197,12 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
         return Optional.empty();
     }
 
-    private Optional<Literal> extractLiteral(DataType expectedType, Expression expression) {
+    private Optional<Object> extractLiteral(DataType expectedType, Expression expression) {
         LogicalType expectedLogicalType = expectedType.getLogicalType();
         if (!supportsPredicate(expectedLogicalType)) {
             return Optional.empty();
         }
-        Literal literal = null;
+        Object literal = null;
         if (expression instanceof ValueLiteralExpression) {
             ValueLiteralExpression valueExpression = (ValueLiteralExpression) expression;
             DataType actualType = valueExpression.getOutputDataType();
@@ -233,14 +213,11 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
             }
             Object value = valueOpt.get();
             if (actualLogicalType.getTypeRoot().equals(expectedLogicalType.getTypeRoot())) {
-                literal =
-                        new Literal(
-                                expectedLogicalType,
-                                getConverter(expectedType).toInternalOrNull(value));
+                literal = getConverter(expectedType).toInternalOrNull(value);
             } else if (supportsImplicitCast(actualLogicalType, expectedLogicalType)) {
                 try {
                     value = TypeUtils.castFromString(value.toString(), expectedLogicalType);
-                    literal = new Literal(expectedLogicalType, value);
+                    literal = value;
                 } catch (Exception ignored) {
                     // ignore here, let #visit throw UnsupportedExpression
                 }
@@ -303,14 +280,33 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
      * @param filter a resolved expression
      * @return {@link Predicate} if no {@link UnsupportedExpression} thrown.
      */
-    public static Optional<Predicate> convert(ResolvedExpression filter) {
+    public static Optional<Predicate> convert(RowType rowType, ResolvedExpression filter) {
         try {
-            return Optional.ofNullable(filter.accept(PredicateConverter.CONVERTER));
+            return Optional.ofNullable(filter.accept(new PredicateConverter(rowType)));
         } catch (UnsupportedExpression e) {
             return Optional.empty();
         }
     }
 
+    @Nullable
+    public static Predicate fromMap(Map<String, String> map, RowType rowType) {
+        // TODO: It is somewhat misleading that an empty map creates a null predicate filter
+        List<String> fieldNames = rowType.getFieldNames();
+        Predicate predicate = null;
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        for (Map.Entry<String, String> entry : map.entrySet()) {
+            int idx = fieldNames.indexOf(entry.getKey());
+            LogicalType type = rowType.getTypeAt(idx);
+            Object literal = TypeUtils.castFromString(entry.getValue(), type);
+            if (predicate == null) {
+                predicate = builder.equal(idx, literal);
+            } else {
+                predicate = PredicateBuilder.and(predicate, builder.equal(idx, literal));
+            }
+        }
+        return predicate;
+    }
+
     /** Encounter an unsupported expression, the caller can choose to ignore this filter branch. */
     public static class UnsupportedExpression extends RuntimeException {}
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
index af03163..cf1b3db 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
@@ -20,39 +20,38 @@ package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.data.binary.BinaryStringData;
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
 
-/**
- * A {@link LeafPredicate.Function} to evaluate {@code filter like 'abc%' or filter like 'abc_'}.
- */
-public class StartsWith implements LeafPredicate.Function {
+/** A {@link LeafBinaryFunction} to evaluate {@code filter like 'abc%' or filter like 'abc_'}. */
+public class StartsWith extends LeafBinaryFunction {
 
     public static final StartsWith INSTANCE = new StartsWith();
 
     private StartsWith() {}
 
     @Override
-    public boolean test(Object[] values, int index, Literal patternLiteral) {
-        BinaryStringData field = (BinaryStringData) values[index];
-        return field != null && field.startsWith((BinaryStringData) patternLiteral.value());
+    public boolean test(LogicalType type, Object field, Object patternLiteral) {
+        BinaryStringData fieldString = (BinaryStringData) field;
+        return fieldString != null && fieldString.startsWith((BinaryStringData) patternLiteral);
     }
 
     @Override
-    public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal patternLiteral) {
-        FieldStats stats = fieldStats[index];
-        if (rowCount == stats.nullCount()) {
+    public boolean test(
+            LogicalType type, long rowCount, FieldStats fieldStats, Object patternLiteral) {
+        if (rowCount == fieldStats.nullCount()) {
             return false;
         }
-        BinaryStringData min = (BinaryStringData) stats.minValue();
-        BinaryStringData max = (BinaryStringData) stats.maxValue();
-        BinaryStringData pattern = (BinaryStringData) patternLiteral.value();
+        BinaryStringData min = (BinaryStringData) fieldStats.minValue();
+        BinaryStringData max = (BinaryStringData) fieldStats.maxValue();
+        BinaryStringData pattern = (BinaryStringData) patternLiteral;
         return (min.startsWith(pattern) || min.compareTo(pattern) <= 0)
                 && (max.startsWith(pattern) || max.compareTo(pattern) >= 0);
     }
 
     @Override
-    public Optional<Predicate> negate(int index, Literal literal) {
+    public Optional<LeafFunction> negate() {
         return Optional.empty();
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
index 9632d6d..90afb94 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
@@ -56,8 +56,7 @@ public class TableCompact {
     }
 
     public TableCompact withPartitions(Map<String, String> partitionSpec) {
-        scan.withPartitionFilter(
-                PredicateConverter.CONVERTER.fromMap(partitionSpec, partitionType));
+        scan.withPartitionFilter(PredicateConverter.fromMap(partitionSpec, partitionType));
         return this;
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index 4a0e52f..e858470 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -146,7 +146,10 @@ public abstract class TableScan {
             if (mapped >= 0) {
                 return Optional.of(
                         new LeafPredicate(
-                                leafPredicate.function(), mapped, leafPredicate.literal()));
+                                leafPredicate.function(),
+                                leafPredicate.type(),
+                                mapped,
+                                leafPredicate.literals()));
             } else {
                 return Optional.empty();
             }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
index 74f7eca..201eaab 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
@@ -26,10 +26,10 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -115,7 +115,7 @@ public class KeyValueFileStoreScanTest {
         KeyValueFileStoreScan scan = store.newScan();
         scan.withSnapshot(snapshot.id());
         scan.withKeyFilter(
-                PredicateBuilder.equal(0, new Literal(new IntType(false), wantedShopId)));
+                new PredicateBuilder(RowType.of(new IntType(false))).equal(0, wantedShopId));
 
         Map<BinaryRowData, BinaryRowData> expected =
                 store.toKvMap(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
index dd19099..39cd4b0 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.store.file.predicate;
 
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -29,25 +32,30 @@ public class PredicateBuilderTest {
 
     @Test
     public void testSplitAnd() {
+        PredicateBuilder builder =
+                new PredicateBuilder(
+                        RowType.of(
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType()));
+
         Predicate child1 =
-                PredicateBuilder.or(
-                        PredicateBuilder.isNull(0),
-                        PredicateBuilder.isNull(1),
-                        PredicateBuilder.isNull(2));
+                PredicateBuilder.or(builder.isNull(0), builder.isNull(1), builder.isNull(2));
         Predicate child2 =
-                PredicateBuilder.and(
-                        PredicateBuilder.isNull(3),
-                        PredicateBuilder.isNull(4),
-                        PredicateBuilder.isNull(5));
-        Predicate child3 = PredicateBuilder.isNull(6);
+                PredicateBuilder.and(builder.isNull(3), builder.isNull(4), builder.isNull(5));
+        Predicate child3 = builder.isNull(6);
 
         assertThat(PredicateBuilder.splitAnd(PredicateBuilder.and(child1, child2, child3)))
                 .isEqualTo(
                         Arrays.asList(
                                 child1,
-                                PredicateBuilder.isNull(3),
-                                PredicateBuilder.isNull(4),
-                                PredicateBuilder.isNull(5),
+                                builder.isNull(3),
+                                builder.isNull(4),
+                                builder.isNull(5),
                                 child3));
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
index c138d2b..8106249 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
@@ -24,6 +24,9 @@ import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.RowType;
 
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -39,7 +42,10 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Test for {@link PredicateConverter}. */
 public class PredicateConverterTest {
 
-    private static final PredicateConverter CONVERTER = new PredicateConverter();
+    private static final PredicateBuilder BUILDER =
+            new PredicateBuilder(RowType.of(new BigIntType(), new DoubleType()));
+
+    private static final PredicateConverter CONVERTER = new PredicateConverter(BUILDER);
 
     @MethodSource("provideResolvedExpression")
     @ParameterizedTest
@@ -57,12 +63,12 @@ public class PredicateConverterTest {
         FieldReferenceExpression longRefExpr =
                 new FieldReferenceExpression("long1", DataTypes.BIGINT(), 0, 0);
         ValueLiteralExpression intLitExpr = new ValueLiteralExpression(10);
-        Literal longLit = new Literal(DataTypes.BIGINT().getLogicalType(), 10L);
+        long longLit = 10L;
 
         FieldReferenceExpression doubleRefExpr =
                 new FieldReferenceExpression("double1", DataTypes.DOUBLE(), 0, 1);
         ValueLiteralExpression floatLitExpr = new ValueLiteralExpression(3.14f);
-        Literal doubleLit = new Literal(DataTypes.DOUBLE().getLogicalType(), 3.14d);
+        double doubleLit = 3.14d;
 
         return Stream.of(
                 Arguments.of(longRefExpr, null),
@@ -72,50 +78,50 @@ public class PredicateConverterTest {
                                 BuiltInFunctionDefinitions.IS_NULL,
                                 Collections.singletonList(longRefExpr),
                                 DataTypes.BOOLEAN()),
-                        PredicateBuilder.isNull(0)),
+                        BUILDER.isNull(0)),
                 Arguments.of(
                         CallExpression.permanent(
                                 BuiltInFunctionDefinitions.IS_NOT_NULL,
                                 Collections.singletonList(doubleRefExpr),
                                 DataTypes.BOOLEAN()),
-                        PredicateBuilder.isNotNull(1)),
+                        BUILDER.isNotNull(1)),
                 Arguments.of(
                         CallExpression.permanent(
                                 BuiltInFunctionDefinitions.EQUALS,
                                 // test literal on left
                                 Arrays.asList(intLitExpr, longRefExpr),
                                 DataTypes.BOOLEAN()),
-                        PredicateBuilder.equal(0, longLit)),
+                        BUILDER.equal(0, longLit)),
                 Arguments.of(
                         CallExpression.permanent(
                                 BuiltInFunctionDefinitions.NOT_EQUALS,
                                 Arrays.asList(longRefExpr, intLitExpr),
                                 DataTypes.BOOLEAN()),
-                        PredicateBuilder.notEqual(0, longLit)),
+                        BUILDER.notEqual(0, longLit)),
                 Arguments.of(
                         CallExpression.permanent(
                                 BuiltInFunctionDefinitions.GREATER_THAN,
                                 Arrays.asList(longRefExpr, intLitExpr),
                                 DataTypes.BOOLEAN()),
-                        PredicateBuilder.greaterThan(0, longLit)),
+                        BUILDER.greaterThan(0, longLit)),
                 Arguments.of(
                         CallExpression.permanent(
                                 BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
                                 Arrays.asList(longRefExpr, intLitExpr),
                                 DataTypes.BOOLEAN()),
-                        PredicateBuilder.greaterOrEqual(0, longLit)),
+                        BUILDER.greaterOrEqual(0, longLit)),
                 Arguments.of(
                         CallExpression.permanent(
                                 BuiltInFunctionDefinitions.LESS_THAN,
                                 Arrays.asList(longRefExpr, intLitExpr),
                                 DataTypes.BOOLEAN()),
-                        PredicateBuilder.lessThan(0, longLit)),
+                        BUILDER.lessThan(0, longLit)),
                 Arguments.of(
                         CallExpression.permanent(
                                 BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
                                 Arrays.asList(longRefExpr, intLitExpr),
                                 DataTypes.BOOLEAN()),
-                        PredicateBuilder.lessOrEqual(0, longLit)),
+                        BUILDER.lessOrEqual(0, longLit)),
                 Arguments.of(
                         CallExpression.permanent(
                                 BuiltInFunctionDefinitions.AND,
@@ -130,8 +136,7 @@ public class PredicateConverterTest {
                                                 DataTypes.BOOLEAN())),
                                 DataTypes.BOOLEAN()),
                         PredicateBuilder.and(
-                                PredicateBuilder.lessOrEqual(0, longLit),
-                                PredicateBuilder.equal(1, doubleLit))),
+                                BUILDER.lessOrEqual(0, longLit), BUILDER.equal(1, doubleLit))),
                 Arguments.of(
                         CallExpression.permanent(
                                 BuiltInFunctionDefinitions.OR,
@@ -146,7 +151,6 @@ public class PredicateConverterTest {
                                                 DataTypes.BOOLEAN())),
                                 DataTypes.BOOLEAN()),
                         PredicateBuilder.or(
-                                PredicateBuilder.notEqual(0, longLit),
-                                PredicateBuilder.equal(1, doubleLit))));
+                                BUILDER.notEqual(0, longLit), BUILDER.equal(1, doubleLit))));
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
index 744127f..3d396ee 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
@@ -26,17 +26,9 @@ import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.store.file.stats.FieldStats;
-import org.apache.flink.table.store.utils.TypeUtils;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.BinaryType;
-import org.apache.flink.table.types.logical.BooleanType;
-import org.apache.flink.table.types.logical.CharType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.DoubleType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.SmallIntType;
-import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.Row;
 
@@ -45,11 +37,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -58,7 +45,6 @@ import java.util.stream.Stream;
 
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
-import static org.apache.flink.table.store.file.predicate.PredicateConverter.CONVERTER;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -67,9 +53,10 @@ public class PredicateTest {
 
     @Test
     public void testEqual() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
         CallExpression expression =
                 call(BuiltInFunctionDefinitions.EQUALS, field(0, DataTypes.INT()), literal(5));
-        Predicate predicate = expression.accept(CONVERTER);
+        Predicate predicate = expression.accept(new PredicateConverter(builder));
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
@@ -81,17 +68,15 @@ public class PredicateTest {
         assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
                 .isEqualTo(false);
 
-        assertThat(predicate.negate().orElse(null))
-                .isEqualTo(
-                        PredicateBuilder.notEqual(
-                                0, new Literal(DataTypes.INT().getLogicalType(), 5)));
+        assertThat(predicate.negate().orElse(null)).isEqualTo(builder.notEqual(0, 5));
     }
 
     @Test
     public void testNotEqual() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
         CallExpression expression =
                 call(BuiltInFunctionDefinitions.NOT_EQUALS, field(0, DataTypes.INT()), literal(5));
-        Predicate predicate = expression.accept(CONVERTER);
+        Predicate predicate = expression.accept(new PredicateConverter(builder));
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
@@ -104,20 +89,18 @@ public class PredicateTest {
         assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
                 .isEqualTo(false);
 
-        assertThat(predicate.negate().orElse(null))
-                .isEqualTo(
-                        PredicateBuilder.equal(
-                                0, new Literal(DataTypes.INT().getLogicalType(), 5)));
+        assertThat(predicate.negate().orElse(null)).isEqualTo(builder.equal(0, 5));
     }
 
     @Test
     public void testGreater() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
         CallExpression expression =
                 call(
                         BuiltInFunctionDefinitions.GREATER_THAN,
                         field(0, DataTypes.INT()),
                         literal(5));
-        Predicate predicate = expression.accept(CONVERTER);
+        Predicate predicate = expression.accept(new PredicateConverter(builder));
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
@@ -131,20 +114,18 @@ public class PredicateTest {
         assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
                 .isEqualTo(false);
 
-        assertThat(predicate.negate().orElse(null))
-                .isEqualTo(
-                        PredicateBuilder.lessOrEqual(
-                                0, new Literal(DataTypes.INT().getLogicalType(), 5)));
+        assertThat(predicate.negate().orElse(null)).isEqualTo(builder.lessOrEqual(0, 5));
     }
 
     @Test
     public void testGreaterOrEqual() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
         CallExpression expression =
                 call(
                         BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
                         field(0, DataTypes.INT()),
                         literal(5));
-        Predicate predicate = expression.accept(CONVERTER);
+        Predicate predicate = expression.accept(new PredicateConverter(builder));
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
@@ -158,17 +139,15 @@ public class PredicateTest {
         assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
                 .isEqualTo(false);
 
-        assertThat(predicate.negate().orElse(null))
-                .isEqualTo(
-                        PredicateBuilder.lessThan(
-                                0, new Literal(DataTypes.INT().getLogicalType(), 5)));
+        assertThat(predicate.negate().orElse(null)).isEqualTo(builder.lessThan(0, 5));
     }
 
     @Test
     public void testLess() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
         CallExpression expression =
                 call(BuiltInFunctionDefinitions.LESS_THAN, field(0, DataTypes.INT()), literal(5));
-        Predicate predicate = expression.accept(CONVERTER);
+        Predicate predicate = expression.accept(new PredicateConverter(builder));
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
@@ -181,20 +160,18 @@ public class PredicateTest {
         assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
                 .isEqualTo(false);
 
-        assertThat(predicate.negate().orElse(null))
-                .isEqualTo(
-                        PredicateBuilder.greaterOrEqual(
-                                0, new Literal(DataTypes.INT().getLogicalType(), 5)));
+        assertThat(predicate.negate().orElse(null)).isEqualTo(builder.greaterOrEqual(0, 5));
     }
 
     @Test
     public void testLessOrEqual() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
         CallExpression expression =
                 call(
                         BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
                         field(0, DataTypes.INT()),
                         literal(5));
-        Predicate predicate = expression.accept(CONVERTER);
+        Predicate predicate = expression.accept(new PredicateConverter(builder));
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
@@ -207,20 +184,18 @@ public class PredicateTest {
         assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
                 .isEqualTo(false);
 
-        assertThat(predicate.negate().orElse(null))
-                .isEqualTo(
-                        PredicateBuilder.greaterThan(
-                                0, new Literal(DataTypes.INT().getLogicalType(), 5)));
+        assertThat(predicate.negate().orElse(null)).isEqualTo(builder.greaterThan(0, 5));
     }
 
     @Test
     public void testIsNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
         CallExpression expression =
                 call(
                         BuiltInFunctionDefinitions.IS_NULL,
                         field(0, DataTypes.INT()),
                         literal(null, DataTypes.INT()));
-        Predicate predicate = expression.accept(CONVERTER);
+        Predicate predicate = expression.accept(new PredicateConverter(builder));
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(true);
@@ -228,17 +203,18 @@ public class PredicateTest {
         assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
         assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 1)})).isEqualTo(true);
 
-        assertThat(predicate.negate().orElse(null)).isEqualTo(PredicateBuilder.isNotNull(0));
+        assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNotNull(0));
     }
 
     @Test
     public void testIsNotNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
         CallExpression expression =
                 call(
                         BuiltInFunctionDefinitions.IS_NOT_NULL,
                         field(0, DataTypes.INT()),
                         literal(null, DataTypes.INT()));
-        Predicate predicate = expression.accept(CONVERTER);
+        Predicate predicate = expression.accept(new PredicateConverter(builder));
 
         assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -248,11 +224,12 @@ public class PredicateTest {
         assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null, null, 3)}))
                 .isEqualTo(false);
 
-        assertThat(predicate.negate().orElse(null)).isEqualTo(PredicateBuilder.isNull(0));
+        assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNull(0));
     }
 
     @Test
     public void testAnd() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType()));
         CallExpression expression =
                 call(
                         BuiltInFunctionDefinitions.AND,
@@ -264,7 +241,7 @@ public class PredicateTest {
                                 BuiltInFunctionDefinitions.EQUALS,
                                 field(1, DataTypes.INT()),
                                 literal(5)));
-        Predicate predicate = expression.accept(CONVERTER);
+        Predicate predicate = expression.accept(new PredicateConverter(builder));
 
         assertThat(predicate.test(new Object[] {4, 5})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {3, 6})).isEqualTo(false);
@@ -294,16 +271,12 @@ public class PredicateTest {
                 .isEqualTo(false);
 
         assertThat(predicate.negate().orElse(null))
-                .isEqualTo(
-                        PredicateBuilder.or(
-                                PredicateBuilder.notEqual(
-                                        0, new Literal(DataTypes.INT().getLogicalType(), 3)),
-                                PredicateBuilder.notEqual(
-                                        1, new Literal(DataTypes.INT().getLogicalType(), 5))));
+                .isEqualTo(PredicateBuilder.or(builder.notEqual(0, 3), builder.notEqual(1, 5)));
     }
 
     @Test
     public void testOr() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType()));
         CallExpression expression =
                 call(
                         BuiltInFunctionDefinitions.OR,
@@ -315,7 +288,7 @@ public class PredicateTest {
                                 BuiltInFunctionDefinitions.EQUALS,
                                 field(1, DataTypes.INT()),
                                 literal(5)));
-        Predicate predicate = expression.accept(CONVERTER);
+        Predicate predicate = expression.accept(new PredicateConverter(builder));
 
         assertThat(predicate.test(new Object[] {4, 6})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {3, 6})).isEqualTo(true);
@@ -345,12 +318,7 @@ public class PredicateTest {
                 .isEqualTo(false);
 
         assertThat(predicate.negate().orElse(null))
-                .isEqualTo(
-                        PredicateBuilder.and(
-                                PredicateBuilder.notEqual(
-                                        0, new Literal(DataTypes.INT().getLogicalType(), 3)),
-                                PredicateBuilder.notEqual(
-                                        1, new Literal(DataTypes.INT().getLogicalType(), 5))));
+                .isEqualTo(PredicateBuilder.and(builder.notEqual(0, 3), builder.notEqual(1, 5)));
     }
 
     @MethodSource("provideLikeExpressions")
@@ -362,7 +330,8 @@ public class PredicateTest {
             List<Long> rowCountList,
             List<FieldStats[]> statsList,
             List<Boolean> expectedForStats) {
-        Predicate predicate = callExpression.accept(CONVERTER);
+        Predicate predicate =
+                callExpression.accept(new PredicateConverter(RowType.of(new VarCharType())));
         IntStream.range(0, valuesList.size())
                 .forEach(
                         i ->
@@ -388,12 +357,17 @@ public class PredicateTest {
                                 BuiltInFunctionDefinitions.SIMILAR,
                                 field(1, DataTypes.INT()),
                                 literal(5)));
-        assertThatThrownBy(() -> expression.accept(CONVERTER))
+        assertThatThrownBy(
+                        () ->
+                                expression.accept(
+                                        new PredicateConverter(
+                                                RowType.of(new IntType(), new IntType()))))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
     }
 
     @Test
     public void testUnsupportedStartsPatternForLike() {
+        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
         // starts pattern with '_' as wildcard
         assertThatThrownBy(
                         () ->
@@ -401,7 +375,7 @@ public class PredicateTest {
                                                 BuiltInFunctionDefinitions.LIKE,
                                                 field(0, STRING()),
                                                 literal("abc_", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // starts pattern like 'abc%xyz' or 'abc_xyz'
@@ -411,7 +385,7 @@ public class PredicateTest {
                                                 BuiltInFunctionDefinitions.LIKE,
                                                 field(0, STRING()),
                                                 literal("abc%xyz", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
         assertThatThrownBy(
                         () ->
@@ -419,7 +393,7 @@ public class PredicateTest {
                                                 BuiltInFunctionDefinitions.LIKE,
                                                 field(0, STRING()),
                                                 literal("abc_xyz", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // starts pattern like 'abc%xyz' or 'abc_xyz' with '%' or '_' to escape
@@ -432,7 +406,7 @@ public class PredicateTest {
                                                         "=%abc=%%xyz=_",
                                                         STRING()), // matches "%abc%(?s:.*)xyz_"
                                                 literal("=", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
         assertThatThrownBy(
                         () ->
@@ -443,7 +417,7 @@ public class PredicateTest {
                                                         "abc=%%xyz",
                                                         STRING()), // matches "abc%(?s:.*)xyz"
                                                 literal("=", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
         assertThatThrownBy(
                         () ->
@@ -454,7 +428,7 @@ public class PredicateTest {
                                                         "abc=%_xyz",
                                                         STRING()), // matches "abc%.xyz"
                                                 literal("=", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
         assertThatThrownBy(
                         () ->
@@ -465,7 +439,7 @@ public class PredicateTest {
                                                         "abc=_%xyz",
                                                         STRING()), // matches "abc_(?s:.*)xyz"
                                                 literal("=", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
         assertThatThrownBy(
                         () ->
@@ -476,7 +450,7 @@ public class PredicateTest {
                                                         "abc=__xyz",
                                                         STRING()), // matches "abc_.xyz"
                                                 literal("=", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // starts pattern with wildcard '%' at the beginning to escape
@@ -487,12 +461,13 @@ public class PredicateTest {
                                                 field(0, STRING()),
                                                 literal("=%%", STRING()), // matches "%(?s:.*)"
                                                 literal("=", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
     }
 
     @Test
     public void testUnsupportedEndsPatternForLike() {
+        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
         // ends pattern with '%' as wildcard
         assertThatThrownBy(
                         () ->
@@ -500,7 +475,7 @@ public class PredicateTest {
                                                 BuiltInFunctionDefinitions.LIKE,
                                                 field(0, STRING()),
                                                 literal("%456", STRING())) // matches "(?s:.*)456"
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // ends pattern with '_' as wildcard
@@ -510,7 +485,7 @@ public class PredicateTest {
                                                 BuiltInFunctionDefinitions.LIKE,
                                                 field(0, STRING()),
                                                 literal("_456", STRING())) // matches ".456"
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // ends pattern with '[]' as wildcard
@@ -520,7 +495,7 @@ public class PredicateTest {
                                                 BuiltInFunctionDefinitions.LIKE,
                                                 field(0, STRING()),
                                                 literal("_[456]", STRING())) // matches ".[456]"
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
         assertThatThrownBy(
                         () ->
@@ -530,7 +505,7 @@ public class PredicateTest {
                                                 literal(
                                                         "%[h-m]",
                                                         STRING())) // matches "(?s:.*)[h-m]"
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // ends pattern with '[^]' as wildcard
@@ -542,7 +517,7 @@ public class PredicateTest {
                                                 literal(
                                                         "%[^h-m]",
                                                         STRING())) // matches "(?s:.*)[^h-m]"
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         assertThatThrownBy(
@@ -551,7 +526,7 @@ public class PredicateTest {
                                                 BuiltInFunctionDefinitions.LIKE,
                                                 field(0, STRING()),
                                                 literal("_[^xyz]", STRING())) // matches ".[^xyz]"
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // ends pattern escape wildcard '%'
@@ -564,7 +539,7 @@ public class PredicateTest {
                                                         "%=%456",
                                                         STRING()), // matches "(?s:.*)%456"
                                                 literal("=", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
         assertThatThrownBy(
                         () ->
@@ -575,7 +550,7 @@ public class PredicateTest {
                                                         "%=_456",
                                                         STRING()), // matches "(?s:.*)_456"
                                                 literal("=", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // ends pattern escape wildcard '_'
@@ -586,12 +561,13 @@ public class PredicateTest {
                                                 field(0, STRING()),
                                                 literal("_=_456", STRING()), // matches "._456"
                                                 literal("=", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
     }
 
     @Test
     public void testUnsupportedEqualsPatternForLike() {
+        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
         // equals pattern
         assertThatThrownBy(
                         () ->
@@ -599,7 +575,7 @@ public class PredicateTest {
                                                 BuiltInFunctionDefinitions.LIKE,
                                                 field(0, STRING()),
                                                 literal("123456", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // equals pattern escape '%'
@@ -610,7 +586,7 @@ public class PredicateTest {
                                                 field(0, STRING()),
                                                 literal("12=%45", STRING()), // equals "12%45"
                                                 literal("=", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // equals pattern escape '_'
@@ -621,12 +597,13 @@ public class PredicateTest {
                                                 field(0, STRING()),
                                                 literal("12=_45", STRING()), // equals "12_45"
                                                 literal("=", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
     }
 
     @Test
     public void testUnsupportedMiddlePatternForLike() {
+        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
         // middle pattern with '%' as wildcard
         assertThatThrownBy(
                         () ->
@@ -634,7 +611,7 @@ public class PredicateTest {
                                                 BuiltInFunctionDefinitions.LIKE,
                                                 field(0, STRING()),
                                                 literal("%345%", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // middle pattern with '_' as wildcard
@@ -644,7 +621,7 @@ public class PredicateTest {
                                                 BuiltInFunctionDefinitions.LIKE,
                                                 field(0, STRING()),
                                                 literal("_345_", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // middle pattern with both '%' and '_' as wildcard
@@ -654,7 +631,7 @@ public class PredicateTest {
                                                 BuiltInFunctionDefinitions.LIKE,
                                                 field(0, STRING()),
                                                 literal("_345%", STRING())) // matches ".345(?s:.*)"
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
         assertThatThrownBy(
                         () ->
@@ -662,7 +639,7 @@ public class PredicateTest {
                                                 BuiltInFunctionDefinitions.LIKE,
                                                 field(0, STRING()),
                                                 literal("%345_", STRING())) // matches "(?s:.*)345."
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // middle pattern with '[]' as wildcard
@@ -674,7 +651,7 @@ public class PredicateTest {
                                                 literal(
                                                         "%[a-c]_",
                                                         STRING())) // matches "(?s:.*)[a-c]."
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // middle pattern with '[^]' as wildcard
@@ -686,7 +663,7 @@ public class PredicateTest {
                                                 literal(
                                                         "%[^abc]_",
                                                         STRING())) // matches "(?s:.*)[^abc]."
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // middle pattern escape '%'
@@ -699,7 +676,7 @@ public class PredicateTest {
                                                         "%34=%5%",
                                                         STRING()), // matches "(?s:.*)34%5(.*)"
                                                 literal("=", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
 
         // middle pattern escape '_'
@@ -712,56 +689,23 @@ public class PredicateTest {
                                                         "%34=_5%",
                                                         STRING()), // matches "(?s:.*)34_5(.*)"
                                                 literal("=", STRING()))
-                                        .accept(CONVERTER))
+                                        .accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
     }
 
     @Test
     public void testUnsupportedType() {
+        PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
         DataType structType = DataTypes.ROW(DataTypes.INT()).bridgedTo(Row.class);
         CallExpression expression =
                 call(
                         BuiltInFunctionDefinitions.EQUALS,
                         field(0, structType),
                         literal(Row.of(1), structType));
-        assertThatThrownBy(() -> expression.accept(CONVERTER))
+        assertThatThrownBy(() -> expression.accept(converter))
                 .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
     }
 
-    @MethodSource("provideLiterals")
-    @ParameterizedTest
-    public void testSerDeLiteral(LogicalType type, Object data) throws Exception {
-        Literal literal = new Literal(type, data);
-        Object object = readObject(writeObject(literal));
-        assertThat(object).isInstanceOf(Literal.class);
-        assertThat(((Literal) object).type()).isEqualTo(literal.type());
-        assertThat(((Literal) object).compareValueTo(literal.value())).isEqualTo(0);
-    }
-
-    public static Stream<Arguments> provideLiterals() {
-        CharType charType = new CharType();
-        VarCharType varCharType = VarCharType.STRING_TYPE;
-        BooleanType booleanType = new BooleanType();
-        BinaryType binaryType = new BinaryType();
-        DecimalType decimalType = new DecimalType(2);
-        SmallIntType smallIntType = new SmallIntType();
-        BigIntType bigIntType = new BigIntType();
-        DoubleType doubleType = new DoubleType();
-        TimestampType timestampType = new TimestampType();
-        return Stream.of(
-                Arguments.of(charType, TypeUtils.castFromString("s", charType)),
-                Arguments.of(varCharType, TypeUtils.castFromString("AbCd1Xy%@*", varCharType)),
-                Arguments.of(booleanType, TypeUtils.castFromString("false", booleanType)),
-                Arguments.of(binaryType, TypeUtils.castFromString("0101", binaryType)),
-                Arguments.of(smallIntType, TypeUtils.castFromString("-2", smallIntType)),
-                Arguments.of(decimalType, TypeUtils.castFromString("22.10", decimalType)),
-                Arguments.of(bigIntType, TypeUtils.castFromString("-9999999999", bigIntType)),
-                Arguments.of(doubleType, TypeUtils.castFromString("3.14159265357", doubleType)),
-                Arguments.of(
-                        timestampType,
-                        TypeUtils.castFromString("2022-03-25 15:00:02", timestampType)));
-    }
-
     public static Stream<Arguments> provideLikeExpressions() {
         CallExpression expr1 =
                 call(
@@ -900,22 +844,6 @@ public class PredicateTest {
                         expectedForStats4));
     }
 
-    private byte[] writeObject(Literal literal) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-        ObjectOutputStream oos = new ObjectOutputStream(baos);
-        oos.writeObject(literal);
-        oos.close();
-        return baos.toByteArray();
-    }
-
-    private Object readObject(byte[] bytes) throws IOException, ClassNotFoundException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-        ObjectInputStream ois = new ObjectInputStream(bais);
-        Object object = ois.readObject();
-        ois.close();
-        return object;
-    }
-
     private static FieldReferenceExpression field(int i, DataType type) {
         return new FieldReferenceExpression("name", type, 0, i);
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 0ee8e79..f15bf5f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -20,11 +20,9 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.Schema;
@@ -79,10 +77,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
     public void testBatchFilter() throws Exception {
         writeData();
         FileStoreTable table = createFileStoreTable();
+        PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType());
 
-        Predicate predicate =
-                PredicateBuilder.equal(
-                        2, Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L));
+        Predicate predicate = builder.equal(2, 201L);
         List<Split> splits = table.newScan().withFilter(predicate).plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
@@ -125,10 +122,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
     public void testStreamingFilter() throws Exception {
         writeData();
         FileStoreTable table = createFileStoreTable();
+        PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType());
 
-        Predicate predicate =
-                PredicateBuilder.equal(
-                        2, Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 101L));
+        Predicate predicate = builder.equal(2, 101L);
         List<Split> splits =
                 table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
         TableRead read = table.newRead().withIncremental(true);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index b6fa2ab..2c1509f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -20,11 +20,9 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.Schema;
@@ -79,10 +77,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
     public void testBatchFilter() throws Exception {
         writeData();
         FileStoreTable table = createFileStoreTable();
+        PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType());
 
-        Predicate predicate =
-                PredicateBuilder.equal(
-                        2, Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L));
+        Predicate predicate = builder.equal(2, 201L);
         List<Split> splits = table.newScan().withFilter(predicate).plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
@@ -124,10 +121,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
     public void testStreamingFilter() throws Exception {
         writeData();
         FileStoreTable table = createFileStoreTable();
+        PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType());
 
-        Predicate predicate =
-                PredicateBuilder.equal(
-                        2, Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L));
+        Predicate predicate = builder.equal(2, 201L);
         List<Split> splits =
                 table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
         TableRead read = table.newRead().withIncremental(true);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 0846327..d248cc4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -20,11 +20,9 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.Schema;
@@ -79,14 +77,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
     public void testBatchFilter() throws Exception {
         writeData();
         FileStoreTable table = createFileStoreTable();
+        PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType());
 
-        Predicate predicate =
-                PredicateBuilder.and(
-                        PredicateBuilder.equal(
-                                2,
-                                Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L)),
-                        PredicateBuilder.equal(
-                                1, Literal.fromJavaObject(DataTypes.INT().getLogicalType(), 21)));
+        Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L), builder.equal(1, 21));
         List<Split> splits = table.newScan().withFilter(predicate).plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
@@ -129,14 +122,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
     public void testStreamingFilter() throws Exception {
         writeData();
         FileStoreTable table = createFileStoreTable();
+        PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType());
 
-        Predicate predicate =
-                PredicateBuilder.and(
-                        PredicateBuilder.equal(
-                                2,
-                                Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L)),
-                        PredicateBuilder.equal(
-                                1, Literal.fromJavaObject(DataTypes.INT().getLogicalType(), 21)));
+        Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L), builder.equal(1, 21));
         List<Split> splits =
                 table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
         TableRead read = table.newRead().withIncremental(true);
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
index 9a090c2..4c4ef43 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.table.store;
 
-import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
@@ -35,6 +35,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.convertJavaObject;
+
 /** Converts {@link SearchArgument} to {@link Predicate} with best effort. */
 public class SearchArgumentToPredicateConverter {
 
@@ -45,6 +47,7 @@ public class SearchArgumentToPredicateConverter {
     private final List<PredicateLeaf> leaves;
     private final List<String> columnNames;
     private final List<LogicalType> columnTypes;
+    private final PredicateBuilder builder;
 
     public SearchArgumentToPredicateConverter(
             SearchArgument sarg, List<String> columnNames, List<LogicalType> columnTypes) {
@@ -52,6 +55,11 @@ public class SearchArgumentToPredicateConverter {
         this.leaves = sarg.getLeaves();
         this.columnNames = columnNames;
         this.columnTypes = columnTypes;
+        this.builder =
+                new PredicateBuilder(
+                        RowType.of(
+                                columnTypes.toArray(new LogicalType[0]),
+                                columnNames.toArray(new String[0])));
     }
 
     public Optional<Predicate> convert() {
@@ -98,35 +106,35 @@ public class SearchArgumentToPredicateConverter {
         LogicalType columnType = columnTypes.get(idx);
         switch (leaf.getOperator()) {
             case EQUALS:
-                return PredicateBuilder.equal(idx, toLiteral(columnType, leaf.getLiteral()));
+                return builder.equal(idx, toLiteral(columnType, leaf.getLiteral()));
             case LESS_THAN:
-                return PredicateBuilder.lessThan(idx, toLiteral(columnType, leaf.getLiteral()));
+                return builder.lessThan(idx, toLiteral(columnType, leaf.getLiteral()));
             case LESS_THAN_EQUALS:
-                return PredicateBuilder.lessOrEqual(idx, toLiteral(columnType, leaf.getLiteral()));
+                return builder.lessOrEqual(idx, toLiteral(columnType, leaf.getLiteral()));
             case IN:
-                return PredicateBuilder.in(
+                return builder.in(
                         idx,
                         leaf.getLiteralList().stream()
                                 .map(o -> toLiteral(columnType, o))
                                 .collect(Collectors.toList()));
             case BETWEEN:
                 List<Object> literalList = leaf.getLiteralList();
-                return PredicateBuilder.between(
+                return builder.between(
                         idx,
                         toLiteral(columnType, literalList.get(0)),
                         toLiteral(columnType, literalList.get(1)));
             case IS_NULL:
-                return PredicateBuilder.isNull(idx);
+                return builder.isNull(idx);
             default:
                 throw new UnsupportedOperationException(
                         "Unsupported operator " + leaf.getOperator());
         }
     }
 
-    private Literal toLiteral(LogicalType literalType, Object o) {
+    private Object toLiteral(LogicalType literalType, Object o) {
         if (o instanceof HiveDecimalWritable) {
             o = ((HiveDecimalWritable) o).getHiveDecimal().bigDecimalValue();
         }
-        return Literal.fromJavaObject(literalType, o);
+        return convertJavaObject(literalType, o);
     }
 }
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
index 54583ae..ca576e0 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
@@ -22,10 +22,10 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -94,7 +94,7 @@ public class SearchArgumentToPredicateConverterTest {
                 new SearchArgumentToPredicateConverter(
                         sarg, Collections.singletonList("a"), Collections.singletonList(flinkType));
 
-        Predicate expected = PredicateBuilder.equal(0, new Literal(flinkType, flinkLiteral));
+        Predicate expected = new PredicateBuilder(RowType.of(flinkType)).equal(0, flinkLiteral);
         Predicate actual = converter.convert().orElse(null);
         assertThat(actual).isEqualTo(expected);
     }
@@ -106,12 +106,17 @@ public class SearchArgumentToPredicateConverterTest {
                     DataTypes.BIGINT().getLogicalType(),
                     DataTypes.DOUBLE().getLogicalType());
     private static final LogicalType BIGINT_TYPE = DataTypes.BIGINT().getLogicalType();
+    private static final PredicateBuilder BUILDER =
+            new PredicateBuilder(
+                    RowType.of(
+                            COLUMN_TYPES.toArray(new LogicalType[0]),
+                            COLUMN_NAMES.toArray(new String[0])));
 
     @Test
     public void testEqual() {
         SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
         SearchArgument sarg = builder.equals("f_bigint", PredicateLeaf.Type.LONG, 100L).build();
-        Predicate expected = PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 100L));
+        Predicate expected = BUILDER.equal(1, 100L);
         assertExpected(sarg, expected);
     }
 
@@ -120,7 +125,7 @@ public class SearchArgumentToPredicateConverterTest {
         SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
         SearchArgument sarg =
                 builder.startNot().equals("f_bigint", PredicateLeaf.Type.LONG, 100L).end().build();
-        Predicate expected = PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 100L));
+        Predicate expected = BUILDER.notEqual(1, 100L);
         assertExpected(sarg, expected);
     }
 
@@ -128,7 +133,7 @@ public class SearchArgumentToPredicateConverterTest {
     public void testLessThan() {
         SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
         SearchArgument sarg = builder.lessThan("f_bigint", PredicateLeaf.Type.LONG, 100L).build();
-        Predicate expected = PredicateBuilder.lessThan(1, new Literal(BIGINT_TYPE, 100L));
+        Predicate expected = BUILDER.lessThan(1, 100L);
         assertExpected(sarg, expected);
     }
 
@@ -140,7 +145,7 @@ public class SearchArgumentToPredicateConverterTest {
                         .lessThan("f_bigint", PredicateLeaf.Type.LONG, 100L)
                         .end()
                         .build();
-        Predicate expected = PredicateBuilder.greaterOrEqual(1, new Literal(BIGINT_TYPE, 100L));
+        Predicate expected = BUILDER.greaterOrEqual(1, 100L);
         assertExpected(sarg, expected);
     }
 
@@ -149,7 +154,7 @@ public class SearchArgumentToPredicateConverterTest {
         SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
         SearchArgument sarg =
                 builder.lessThanEquals("f_bigint", PredicateLeaf.Type.LONG, 100L).build();
-        Predicate expected = PredicateBuilder.lessOrEqual(1, new Literal(BIGINT_TYPE, 100L));
+        Predicate expected = BUILDER.lessOrEqual(1, 100L);
         assertExpected(sarg, expected);
     }
 
@@ -161,7 +166,7 @@ public class SearchArgumentToPredicateConverterTest {
                         .lessThanEquals("f_bigint", PredicateLeaf.Type.LONG, 100L)
                         .end()
                         .build();
-        Predicate expected = PredicateBuilder.greaterThan(1, new Literal(BIGINT_TYPE, 100L));
+        Predicate expected = BUILDER.greaterThan(1, 100L);
         assertExpected(sarg, expected);
     }
 
@@ -172,9 +177,7 @@ public class SearchArgumentToPredicateConverterTest {
                 builder.in("f_bigint", PredicateLeaf.Type.LONG, 100L, 200L, 300L).build();
         Predicate expected =
                 PredicateBuilder.or(
-                        PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 100L)),
-                        PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 200L)),
-                        PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 300L)));
+                        BUILDER.equal(1, 100L), BUILDER.equal(1, 200L), BUILDER.equal(1, 300L));
         assertExpected(sarg, expected);
     }
 
@@ -188,9 +191,9 @@ public class SearchArgumentToPredicateConverterTest {
                         .build();
         Predicate expected =
                 PredicateBuilder.and(
-                        PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 100L)),
-                        PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 200L)),
-                        PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 300L)));
+                        BUILDER.notEqual(1, 100L),
+                        BUILDER.notEqual(1, 200L),
+                        BUILDER.notEqual(1, 300L));
         assertExpected(sarg, expected);
     }
 
@@ -200,9 +203,7 @@ public class SearchArgumentToPredicateConverterTest {
         SearchArgument sarg =
                 builder.between("f_bigint", PredicateLeaf.Type.LONG, 100L, 200L).build();
         Predicate expected =
-                PredicateBuilder.and(
-                        PredicateBuilder.greaterOrEqual(1, new Literal(BIGINT_TYPE, 100L)),
-                        PredicateBuilder.lessOrEqual(1, new Literal(BIGINT_TYPE, 200L)));
+                PredicateBuilder.and(BUILDER.greaterOrEqual(1, 100L), BUILDER.lessOrEqual(1, 200L));
         assertExpected(sarg, expected);
     }
 
@@ -215,9 +216,7 @@ public class SearchArgumentToPredicateConverterTest {
                         .end()
                         .build();
         Predicate expected =
-                PredicateBuilder.or(
-                        PredicateBuilder.lessThan(1, new Literal(BIGINT_TYPE, 100L)),
-                        PredicateBuilder.greaterThan(1, new Literal(BIGINT_TYPE, 200L)));
+                PredicateBuilder.or(BUILDER.lessThan(1, 100L), BUILDER.greaterThan(1, 200L));
         assertExpected(sarg, expected);
     }
 
@@ -225,7 +224,7 @@ public class SearchArgumentToPredicateConverterTest {
     public void testIsNull() {
         SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
         SearchArgument sarg = builder.isNull("f_bigint", PredicateLeaf.Type.LONG).build();
-        Predicate expected = PredicateBuilder.isNull(1);
+        Predicate expected = BUILDER.isNull(1);
         assertExpected(sarg, expected);
     }
 
@@ -234,7 +233,7 @@ public class SearchArgumentToPredicateConverterTest {
         SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
         SearchArgument sarg =
                 builder.startNot().isNull("f_bigint", PredicateLeaf.Type.LONG).end().build();
-        Predicate expected = PredicateBuilder.isNotNull(1);
+        Predicate expected = BUILDER.isNotNull(1);
         assertExpected(sarg, expected);
     }
 
@@ -250,9 +249,7 @@ public class SearchArgumentToPredicateConverterTest {
                         .build();
         Predicate expected =
                 PredicateBuilder.or(
-                        PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 100L)),
-                        PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 200L)),
-                        PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 300L)));
+                        BUILDER.equal(1, 100L), BUILDER.equal(1, 200L), BUILDER.equal(1, 300L));
         assertExpected(sarg, expected);
     }
 
@@ -270,9 +267,9 @@ public class SearchArgumentToPredicateConverterTest {
                         .build();
         Predicate expected =
                 PredicateBuilder.and(
-                        PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 100L)),
-                        PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 200L)),
-                        PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 300L)));
+                        BUILDER.notEqual(1, 100L),
+                        BUILDER.notEqual(1, 200L),
+                        BUILDER.notEqual(1, 300L));
         assertExpected(sarg, expected);
     }
 
@@ -294,9 +291,9 @@ public class SearchArgumentToPredicateConverterTest {
                         .build();
         Predicate expected =
                 PredicateBuilder.and(
-                        PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 100L)),
-                        PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 200L)),
-                        PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 300L)));
+                        BUILDER.notEqual(1, 100L),
+                        BUILDER.notEqual(1, 200L),
+                        BUILDER.notEqual(1, 300L));
         assertExpected(sarg, expected);
     }
 
@@ -320,9 +317,7 @@ public class SearchArgumentToPredicateConverterTest {
                         .build();
         Predicate expected =
                 PredicateBuilder.or(
-                        PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 100L)),
-                        PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 200L)),
-                        PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 300L)));
+                        BUILDER.equal(1, 100L), BUILDER.equal(1, 200L), BUILDER.equal(1, 300L));
         assertExpected(sarg, expected);
     }
 
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
index ec077c2..6213ca3 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.spark;
 
-import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -37,13 +36,17 @@ import org.apache.spark.sql.sources.Not;
 import org.apache.spark.sql.sources.Or;
 import org.apache.spark.sql.sources.StringStartsWith;
 
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.convertJavaObject;
+
 /** Conversion from {@link Filter} to {@link Predicate}. */
 public class SparkFilterConverter {
 
     private final RowType rowType;
+    private final PredicateBuilder builder;
 
     public SparkFilterConverter(RowType rowType) {
         this.rowType = rowType;
+        this.builder = new PredicateBuilder(rowType);
     }
 
     public Predicate convert(Filter filter) {
@@ -51,32 +54,32 @@ public class SparkFilterConverter {
             EqualTo eq = (EqualTo) filter;
             // TODO deal with isNaN
             int index = fieldIndex(eq.attribute());
-            Literal literal = convertLiteral(index, eq.value());
-            return PredicateBuilder.equal(index, literal);
+            Object literal = convertLiteral(index, eq.value());
+            return builder.equal(index, literal);
         } else if (filter instanceof GreaterThan) {
             GreaterThan gt = (GreaterThan) filter;
             int index = fieldIndex(gt.attribute());
-            Literal literal = convertLiteral(index, gt.value());
-            return PredicateBuilder.greaterThan(index, literal);
+            Object literal = convertLiteral(index, gt.value());
+            return builder.greaterThan(index, literal);
         } else if (filter instanceof GreaterThanOrEqual) {
             GreaterThanOrEqual gt = (GreaterThanOrEqual) filter;
             int index = fieldIndex(gt.attribute());
-            Literal literal = convertLiteral(index, gt.value());
-            return PredicateBuilder.greaterOrEqual(index, literal);
+            Object literal = convertLiteral(index, gt.value());
+            return builder.greaterOrEqual(index, literal);
         } else if (filter instanceof LessThan) {
             LessThan lt = (LessThan) filter;
             int index = fieldIndex(lt.attribute());
-            Literal literal = convertLiteral(index, lt.value());
-            return PredicateBuilder.lessThan(index, literal);
+            Object literal = convertLiteral(index, lt.value());
+            return builder.lessThan(index, literal);
         } else if (filter instanceof LessThanOrEqual) {
             LessThanOrEqual lt = (LessThanOrEqual) filter;
             int index = fieldIndex(lt.attribute());
-            Literal literal = convertLiteral(index, lt.value());
-            return PredicateBuilder.lessOrEqual(index, literal);
+            Object literal = convertLiteral(index, lt.value());
+            return builder.lessOrEqual(index, literal);
         } else if (filter instanceof IsNull) {
-            return PredicateBuilder.isNull(fieldIndex(((IsNull) filter).attribute()));
+            return builder.isNull(fieldIndex(((IsNull) filter).attribute()));
         } else if (filter instanceof IsNotNull) {
-            return PredicateBuilder.isNotNull(fieldIndex(((IsNotNull) filter).attribute()));
+            return builder.isNotNull(fieldIndex(((IsNotNull) filter).attribute()));
         } else if (filter instanceof And) {
             And and = (And) filter;
             return PredicateBuilder.and(convert(and.left()), convert(and.right()));
@@ -89,8 +92,8 @@ public class SparkFilterConverter {
         } else if (filter instanceof StringStartsWith) {
             StringStartsWith startsWith = (StringStartsWith) filter;
             int index = fieldIndex(startsWith.attribute());
-            Literal literal = convertLiteral(index, startsWith.value());
-            return PredicateBuilder.startsWith(index, literal);
+            Object literal = convertLiteral(index, startsWith.value());
+            return builder.startsWith(index, literal);
         }
 
         // TODO: In, NotIn, AlwaysTrue, AlwaysFalse, EqualNullSafe
@@ -106,8 +109,8 @@ public class SparkFilterConverter {
         return index;
     }
 
-    private Literal convertLiteral(int index, Object value) {
+    private Object convertLiteral(int index, Object value) {
         LogicalType type = rowType.getTypeAt(index);
-        return Literal.fromJavaObject(type, value);
+        return convertJavaObject(type, value);
     }
 }
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
index dead152..dcc10ed 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.types.logical.DateType;
@@ -51,55 +50,55 @@ public class SparkFilterConverterTest {
 
     @Test
     public void testAll() {
-        SparkFilterConverter converter =
-                new SparkFilterConverter(
-                        new RowType(
-                                Collections.singletonList(
-                                        new RowType.RowField("id", new IntType()))));
+        RowType rowType =
+                new RowType(Collections.singletonList(new RowType.RowField("id", new IntType())));
+        SparkFilterConverter converter = new SparkFilterConverter(rowType);
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+
         String field = "id";
         IsNull isNull = IsNull.apply(field);
-        Predicate expectedIsNull = PredicateBuilder.isNull(0);
+        Predicate expectedIsNull = builder.isNull(0);
         Predicate actualIsNull = converter.convert(isNull);
         assertThat(actualIsNull).isEqualTo(expectedIsNull);
 
         IsNotNull isNotNull = IsNotNull.apply(field);
-        Predicate expectedIsNotNull = PredicateBuilder.isNotNull(0);
+        Predicate expectedIsNotNull = builder.isNotNull(0);
         Predicate actualIsNotNull = converter.convert(isNotNull);
         assertThat(actualIsNotNull).isEqualTo(expectedIsNotNull);
 
         LessThan lt = LessThan.apply(field, 1);
-        Predicate expectedLt = PredicateBuilder.lessThan(0, new Literal(new IntType(), 1));
+        Predicate expectedLt = builder.lessThan(0, 1);
         Predicate actualLt = converter.convert(lt);
         assertThat(actualLt).isEqualTo(expectedLt);
 
         LessThanOrEqual ltEq = LessThanOrEqual.apply(field, 1);
-        Predicate expectedLtEq = PredicateBuilder.lessOrEqual(0, new Literal(new IntType(), 1));
+        Predicate expectedLtEq = builder.lessOrEqual(0, 1);
         Predicate actualLtEq = converter.convert(ltEq);
         assertThat(actualLtEq).isEqualTo(expectedLtEq);
 
         GreaterThan gt = GreaterThan.apply(field, 1);
-        Predicate expectedGt = PredicateBuilder.greaterThan(0, new Literal(new IntType(), 1));
+        Predicate expectedGt = builder.greaterThan(0, 1);
         Predicate actualGt = converter.convert(gt);
         assertThat(actualGt).isEqualTo(expectedGt);
 
         GreaterThanOrEqual gtEq = GreaterThanOrEqual.apply(field, 1);
-        Predicate expectedGtEq = PredicateBuilder.greaterOrEqual(0, new Literal(new IntType(), 1));
+        Predicate expectedGtEq = builder.greaterOrEqual(0, 1);
         Predicate actualGtEq = converter.convert(gtEq);
         assertThat(actualGtEq).isEqualTo(expectedGtEq);
 
         EqualTo eq = EqualTo.apply(field, 1);
-        Predicate expectedEq = PredicateBuilder.equal(0, new Literal(new IntType(), 1));
+        Predicate expectedEq = builder.equal(0, 1);
         Predicate actualEq = converter.convert(eq);
         assertThat(actualEq).isEqualTo(expectedEq);
     }
 
     @Test
     public void testTimestamp() {
-        SparkFilterConverter converter =
-                new SparkFilterConverter(
-                        new RowType(
-                                Collections.singletonList(
-                                        new RowType.RowField("x", new TimestampType()))));
+        RowType rowType =
+                new RowType(
+                        Collections.singletonList(new RowType.RowField("x", new TimestampType())));
+        SparkFilterConverter converter = new SparkFilterConverter(rowType);
+        PredicateBuilder builder = new PredicateBuilder(rowType);
 
         Timestamp timestamp = Timestamp.valueOf("2018-10-18 00:00:57.907");
         LocalDateTime localDateTime = LocalDateTime.parse("2018-10-18T00:00:57.907");
@@ -108,11 +107,7 @@ public class SparkFilterConverterTest {
         Predicate instantExpression = converter.convert(GreaterThan.apply("x", instant));
         Predicate timestampExpression = converter.convert(GreaterThan.apply("x", timestamp));
         Predicate rawExpression =
-                PredicateBuilder.greaterThan(
-                        0,
-                        new Literal(
-                                new TimestampType(),
-                                TimestampData.fromLocalDateTime(localDateTime)));
+                builder.greaterThan(0, TimestampData.fromLocalDateTime(localDateTime));
 
         assertThat(timestampExpression).isEqualTo(rawExpression);
         assertThat(instantExpression).isEqualTo(rawExpression);
@@ -120,11 +115,10 @@ public class SparkFilterConverterTest {
 
     @Test
     public void testDate() {
-        SparkFilterConverter converter =
-                new SparkFilterConverter(
-                        new RowType(
-                                Collections.singletonList(
-                                        new RowType.RowField("x", new DateType()))));
+        RowType rowType =
+                new RowType(Collections.singletonList(new RowType.RowField("x", new DateType())));
+        SparkFilterConverter converter = new SparkFilterConverter(rowType);
+        PredicateBuilder builder = new PredicateBuilder(rowType);
 
         LocalDate localDate = LocalDate.parse("2018-10-18");
         Date date = Date.valueOf(localDate);
@@ -132,8 +126,7 @@ public class SparkFilterConverterTest {
 
         Predicate localDateExpression = converter.convert(GreaterThan.apply("x", localDate));
         Predicate dateExpression = converter.convert(GreaterThan.apply("x", date));
-        Predicate rawExpression =
-                PredicateBuilder.greaterThan(0, new Literal(new DateType(), epochDay));
+        Predicate rawExpression = builder.greaterThan(0, epochDay);
 
         assertThat(dateExpression).isEqualTo(rawExpression);
         assertThat(localDateExpression).isEqualTo(rawExpression);