You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/05 13:33:36 UTC

[flink-table-store] branch master updated: [FLINK-29129] Try best to push down value filters

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 769f19d8 [FLINK-29129] Try best to push down value filters
769f19d8 is described below

commit 769f19d8566f0a30d3a7c09b297b9dce3280ac1e
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Mon Sep 5 21:33:32 2022 +0800

    [FLINK-29129] Try best to push down value filters
    
    This closes #280
---
 .../flink/table/store/format/FileFormat.java       |  4 +-
 .../source/FileStoreSourceReaderTest.java          | 24 +++++++
 .../source/FileStoreSourceSplitReaderTest.java     | 23 +++++++
 .../table/store/file/data/DataFileReader.java      |  5 +-
 .../store/file/mergetree/MergeTreeReader.java      |  6 ++
 .../file/operation/AppendOnlyFileStoreRead.java    |  4 +-
 .../file/operation/KeyValueFileStoreRead.java      | 74 ++++++++++++++++++----
 .../table/store/file/schema/SchemaManager.java     |  4 +-
 .../table/ChangelogWithKeyFileStoreTable.java      | 23 +------
 .../flink/table/store/file/TestFileStore.java      |  8 ++-
 .../table/store/file/data/DataFileTestUtils.java   | 16 +++++
 .../file/format/FileStatsExtractingAvroFormat.java |  4 +-
 .../store/file/format/FlushingFileFormat.java      |  4 +-
 .../store/file/operation/FileStoreCommitTest.java  | 15 ++++-
 .../store/file/operation/FileStoreExpireTest.java  | 14 +++-
 .../file/operation/KeyValueFileStoreReadTest.java  | 26 ++++++--
 .../file/operation/KeyValueFileStoreScanTest.java  | 16 ++++-
 .../store/table/AppendOnlyFileStoreTableTest.java  |  1 -
 .../table/ChangelogWithKeyFileStoreTableTest.java  | 39 +++++++++---
 .../table/store/format/avro/AvroFileFormat.java    |  4 +-
 .../table/store/format/orc/OrcFileFormat.java      |  4 +-
 21 files changed, 255 insertions(+), 63 deletions(-)

diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
index d457e258..f34d4e1b 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
@@ -29,6 +29,8 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.types.logical.RowType;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -59,7 +61,7 @@ public abstract class FileFormat {
      * @param filters A list of filters in conjunctive form for filtering on a best-effort basis.
      */
     public abstract BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<Predicate> filters);
+            RowType type, int[][] projection, @Nullable List<Predicate> filters);
 
     /** Create a {@link BulkWriter.Factory} from the type. */
     public abstract BulkWriter.Factory<RowData> createWriterFactory(RowType type);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
index a7d9d2dc..d48b9c56 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
@@ -19,10 +19,18 @@
 package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.util.Arrays;
 import java.util.Collections;
 
 import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
@@ -34,6 +42,22 @@ public class FileStoreSourceReaderTest {
 
     @TempDir java.nio.file.Path tempDir;
 
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+        schemaManager.commitNewVersion(
+                new UpdateSchema(
+                        new RowType(
+                                Arrays.asList(
+                                        new RowType.RowField("k", new BigIntType()),
+                                        new RowType.RowField("v", new BigIntType()),
+                                        new RowType.RowField("default", new IntType()))),
+                        Collections.singletonList("default"),
+                        Arrays.asList("k", "default"),
+                        Collections.emptyMap(),
+                        null));
+    }
+
     @Test
     public void testRequestSplitWhenNoSplitRestored() throws Exception {
         final TestingReaderContext context = new TestingReaderContext();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index 32a40de1..25211fe6 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -23,15 +23,22 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -67,6 +74,22 @@ public class FileStoreSourceSplitReaderTest {
         service = null;
     }
 
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+        schemaManager.commitNewVersion(
+                new UpdateSchema(
+                        new RowType(
+                                Arrays.asList(
+                                        new RowType.RowField("k", new BigIntType()),
+                                        new RowType.RowField("v", new BigIntType()),
+                                        new RowType.RowField("default", new IntType()))),
+                        Collections.singletonList("default"),
+                        Arrays.asList("k", "default"),
+                        Collections.emptyMap(),
+                        null));
+    }
+
     @Test
     public void testPrimaryKey() throws Exception {
         innerTestOnce(false, 0);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
index 5da2a2b0..7a0c6eef 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
@@ -178,7 +178,10 @@ public class DataFileReader {
         }
 
         public DataFileReader create(
-                BinaryRowData partition, int bucket, boolean projectKeys, List<Predicate> filters) {
+                BinaryRowData partition,
+                int bucket,
+                boolean projectKeys,
+                @Nullable List<Predicate> filters) {
             int[][] keyProjection = projectKeys ? this.keyProjection : fullKeyProjection;
             RowType projectedKeyType = projectKeys ? this.projectedKeyType : keyType;
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
index da39d0a3..a3830bc1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
@@ -62,6 +62,12 @@ public class MergeTreeReader implements RecordReader<KeyValue> {
         this.reader = ConcatRecordReader.create(readers);
     }
 
+    public MergeTreeReader(boolean dropDelete, List<ReaderSupplier<KeyValue>> sectionReaders)
+            throws IOException {
+        this.dropDelete = dropDelete;
+        this.reader = ConcatRecordReader.create(sectionReaders);
+    }
+
     @Nullable
     @Override
     public RecordIterator<KeyValue> readBatch() throws IOException {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
index 176b7524..7f4f21a6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -34,6 +34,8 @@ import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.utils.Projection;
 import org.apache.flink.table.types.logical.RowType;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -51,7 +53,7 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
 
     private int[][] projection;
 
-    private List<Predicate> filters;
+    @Nullable private List<Predicate> filters;
 
     public AppendOnlyFileStoreRead(
             SchemaManager schemaManager,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index 40bd85c5..0f7a6648 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -23,11 +23,13 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileReader;
 import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.ProjectKeyRecordReader;
 import org.apache.flink.table.store.file.utils.RecordReader;
@@ -35,13 +37,18 @@ import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.RowType;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.file.data.DataFilePathFactory.CHANGELOG_FILE_PREFIX;
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.containsFields;
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
 
 /**
@@ -50,13 +57,17 @@ import static org.apache.flink.table.store.file.predicate.PredicateBuilder.split
  */
 public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
 
+    private final TableSchema tableSchema;
     private final DataFileReader.Factory dataFileReaderFactory;
     private final Comparator<RowData> keyComparator;
     private final MergeFunction mergeFunction;
+    private final boolean valueCountMode;
+
+    @Nullable private int[][] keyProjectedFields;
 
-    private int[][] keyProjectedFields;
+    @Nullable private List<Predicate> filtersForOverlappedSection;
 
-    private List<Predicate> filters;
+    @Nullable private List<Predicate> filtersForNonOverlappedSection;
 
     public KeyValueFileStoreRead(
             SchemaManager schemaManager,
@@ -67,11 +78,13 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
             MergeFunction mergeFunction,
             FileFormat fileFormat,
             FileStorePathFactory pathFactory) {
+        this.tableSchema = schemaManager.schema(schemaId);
         this.dataFileReaderFactory =
                 new DataFileReader.Factory(
                         schemaManager, schemaId, keyType, valueType, fileFormat, pathFactory);
         this.keyComparator = keyComparator;
         this.mergeFunction = mergeFunction;
+        this.valueCountMode = tableSchema.trimmedPrimaryKeys().isEmpty();
     }
 
     public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
@@ -87,7 +100,27 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
 
     @Override
     public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
-        this.filters = splitAnd(predicate);
+        List<Predicate> allFilters = new ArrayList<>();
+        List<Predicate> pkFilters = null;
+        List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();
+        Set<String> nonPrimaryKeys =
+                tableSchema.fieldNames().stream()
+                        .filter(name -> !primaryKeys.contains(name))
+                        .collect(Collectors.toSet());
+        for (Predicate sub : splitAnd(predicate)) {
+            allFilters.add(sub);
+            if (!containsFields(sub, nonPrimaryKeys)) {
+                if (pkFilters == null) {
+                    pkFilters = new ArrayList<>();
+                }
+                // TODO Actually, the index is wrong, but it is OK.
+                //  The orc filter just use name instead of index.
+                pkFilters.add(sub);
+            }
+        }
+        // for section which does not have key range overlap, push down value filters too
+        filtersForNonOverlappedSection = allFilters;
+        filtersForOverlappedSection = valueCountMode ? allFilters : pkFilters;
         return this;
     }
 
@@ -95,7 +128,8 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
     public RecordReader<KeyValue> createReader(Split split) throws IOException {
         if (split.isIncremental()) {
             DataFileReader dataFileReader =
-                    dataFileReaderFactory.create(split.partition(), split.bucket(), true, filters);
+                    dataFileReaderFactory.create(
+                            split.partition(), split.bucket(), true, filtersForOverlappedSection);
             // Return the raw file contents without merging
             List<ConcatRecordReader.ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
             for (DataFileMeta file : split.files()) {
@@ -106,15 +140,7 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
         } else {
             // in this case merge tree should merge records with same key
             // Do not project key in MergeTreeReader.
-            DataFileReader dataFileReader =
-                    dataFileReaderFactory.create(split.partition(), split.bucket(), false, filters);
-            MergeTreeReader reader =
-                    new MergeTreeReader(
-                            new IntervalPartition(split.files(), keyComparator).partition(),
-                            true,
-                            dataFileReader,
-                            keyComparator,
-                            mergeFunction.copy());
+            MergeTreeReader reader = new MergeTreeReader(true, createSectionReaders(split));
 
             // project key using ProjectKeyRecordReader
             return keyProjectedFields == null
@@ -123,6 +149,28 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
         }
     }
 
+    private List<ConcatRecordReader.ReaderSupplier<KeyValue>> createSectionReaders(Split split) {
+        List<ConcatRecordReader.ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
+        MergeFunction mergeFunc = mergeFunction.copy();
+        for (List<SortedRun> section :
+                new IntervalPartition(split.files(), keyComparator).partition()) {
+            DataFileReader dataFileReader = createDataFileReader(split, section.size() > 1);
+            sectionReaders.add(
+                    () ->
+                            MergeTreeReader.readerForSection(
+                                    section, dataFileReader, keyComparator, mergeFunc));
+        }
+        return sectionReaders;
+    }
+
+    private DataFileReader createDataFileReader(Split split, boolean overlapped) {
+        return dataFileReaderFactory.create(
+                split.partition(),
+                split.bucket(),
+                false,
+                overlapped ? filtersForOverlappedSection : filtersForNonOverlappedSection);
+    }
+
     private Optional<String> changelogFile(DataFileMeta fileMeta) {
         for (String file : fileMeta.extraFiles()) {
             if (file.startsWith(CHANGELOG_FILE_PREFIX)) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index d9ab4b75..0b387441 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file.schema;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
@@ -314,7 +315,8 @@ public class SchemaManager implements Serializable {
         return new Path(tableRoot + "/schema");
     }
 
-    private Path toSchemaPath(long id) {
+    @VisibleForTesting
+    public Path toSchemaPath(long id) {
         return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index c7e3f0d8..8a983479 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -50,13 +50,10 @@ import org.apache.flink.table.store.utils.RowDataUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
-import static org.apache.flink.table.store.file.predicate.PredicateBuilder.containsFields;
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
 
@@ -155,29 +152,11 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
 
     @Override
     public TableRead newRead() {
-        List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();
-        Set<String> nonPrimaryKeys =
-                tableSchema.fieldNames().stream()
-                        .filter(name -> !primaryKeys.contains(name))
-                        .collect(Collectors.toSet());
         return new KeyValueTableRead(store.newRead()) {
 
             @Override
             public TableRead withFilter(Predicate predicate) {
-                List<Predicate> predicates = new ArrayList<>();
-                for (Predicate sub : splitAnd(predicate)) {
-                    // TODO support value filter
-                    if (containsFields(sub, nonPrimaryKeys)) {
-                        continue;
-                    }
-
-                    // TODO Actually, the index is wrong, but it is OK. The orc filter
-                    // just use name instead of index.
-                    predicates.add(sub);
-                }
-                if (predicates.size() > 0) {
-                    read.withFilter(and(predicates));
-                }
+                read.withFilter(predicate);
                 return this;
             }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 1b347791..339f8011 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -378,14 +378,19 @@ public class TestFileStore extends KeyValueFileStore {
     }
 
     private Set<Path> getFilesInUse() {
+        Set<Path> result = new HashSet<>();
         FileStorePathFactory pathFactory = pathFactory();
         ManifestList manifestList = manifestListFactory().create();
         FileStoreScan scan = newScan();
 
+        SchemaManager schemaManager = new SchemaManager(options.path());
+        schemaManager.listAllIds().forEach(id -> result.add(schemaManager.toSchemaPath(id)));
+
         SnapshotManager snapshotManager = snapshotManager();
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
+
         if (latestSnapshotId == null) {
-            return Collections.emptySet();
+            return result;
         }
 
         long firstInUseSnapshotId = Snapshot.FIRST_SNAPSHOT_ID;
@@ -396,7 +401,6 @@ public class TestFileStore extends KeyValueFileStore {
             }
         }
 
-        Set<Path> result = new HashSet<>();
         for (long id = firstInUseSnapshotId; id <= latestSnapshotId; id++) {
             Path snapshotPath = snapshotManager.snapshotPath(id);
             Snapshot snapshot = Snapshot.fromPath(snapshotPath);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
index b977c6a8..9b0736f5 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.data;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.stats.StatsTestUtils;
 
 /** Utils for {@link DataFileMeta}. */
 public class DataFileTestUtils {
@@ -47,6 +48,21 @@ public class DataFileTestUtils {
                 DataFileMeta.DUMMY_LEVEL);
     }
 
+    public static DataFileMeta newFile() {
+        return new DataFileMeta(
+                "",
+                0,
+                0,
+                DataFileMeta.EMPTY_MIN_KEY,
+                DataFileMeta.EMPTY_MAX_KEY,
+                StatsTestUtils.newEmptyTableStats(),
+                StatsTestUtils.newEmptyTableStats(),
+                0,
+                0,
+                0,
+                0);
+    }
+
     public static DataFileMeta newFile(
             String name, int level, int minKey, int maxKey, long maxSequence) {
         return new DataFileMeta(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
index 692ba1b5..d63305e2 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
@@ -29,6 +29,8 @@ import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.types.logical.RowType;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Optional;
 
@@ -44,7 +46,7 @@ public class FileStatsExtractingAvroFormat extends FileFormat {
 
     @Override
     public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<Predicate> filters) {
+            RowType type, int[][] projection, @Nullable List<Predicate> filters) {
         return avro.createReaderFactory(type, projection, filters);
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
index 94e11003..0ffa1e12 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
@@ -27,6 +27,8 @@ import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.RowType;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.List;
 
@@ -42,7 +44,7 @@ public class FlushingFileFormat extends FileFormat {
 
     @Override
     public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<Predicate> filters) {
+            RowType type, int[][] projection, @Nullable List<Predicate> filters) {
         return format.createReaderFactory(type, projection, filters);
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 13accde3..37c168a0 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -339,16 +341,25 @@ public class FileStoreCommitTest {
         assertThat(store.snapshotManager().findLatest()).isEqualTo(snapshot.id() + 1);
     }
 
-    private TestFileStore createStore(boolean failing) {
+    private TestFileStore createStore(boolean failing) throws Exception {
         return createStore(failing, 1);
     }
 
-    private TestFileStore createStore(boolean failing, int numBucket) {
+    private TestFileStore createStore(boolean failing, int numBucket) throws Exception {
         String root =
                 failing
                         ? FailingAtomicRenameFileSystem.getFailingPath(
                                 failingName, tempDir.toString())
                         : TestAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString();
+        SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+        schemaManager.commitNewVersion(
+                new UpdateSchema(
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+                        TestKeyValueGenerator.getPrimaryKeys(
+                                TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+                        Collections.emptyMap(),
+                        null));
         return TestFileStore.create(
                 "avro",
                 root,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index b9fac9fd..49587a20 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -29,6 +29,8 @@ import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.manifest.FileKind;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 
@@ -40,6 +42,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -56,7 +59,7 @@ public class FileStoreExpireTest {
     private SnapshotManager snapshotManager;
 
     @BeforeEach
-    public void beforeEach() throws IOException {
+    public void beforeEach() throws Exception {
         gen = new TestKeyValueGenerator();
         store =
                 TestFileStore.create(
@@ -68,6 +71,15 @@ public class FileStoreExpireTest {
                         TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         new DeduplicateMergeFunction());
         snapshotManager = store.snapshotManager();
+        SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+        schemaManager.commitNewVersion(
+                new UpdateSchema(
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+                        TestKeyValueGenerator.getPrimaryKeys(
+                                TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+                        Collections.emptyMap(),
+                        null));
     }
 
     @AfterEach
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index a2086ba8..a48bbdc4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
@@ -28,6 +29,8 @@ import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.table.source.Split;
@@ -42,12 +45,14 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -222,10 +227,23 @@ public class KeyValueFileStoreReadTest {
     }
 
     private TestFileStore createStore(
-            RowType partitionType,
-            RowType keyType,
-            RowType valueType,
-            MergeFunction mergeFunction) {
+            RowType partitionType, RowType keyType, RowType valueType, MergeFunction mergeFunction)
+            throws Exception {
+        SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+        boolean valueCountMode = mergeFunction instanceof ValueCountMergeFunction;
+        schemaManager.commitNewVersion(
+                new UpdateSchema(
+                        valueCountMode ? keyType : valueType,
+                        partitionType.getFieldNames(),
+                        valueCountMode
+                                ? Collections.emptyList()
+                                : Stream.concat(
+                                                keyType.getFieldNames().stream()
+                                                        .map(field -> field.replace("key_", "")),
+                                                partitionType.getFieldNames().stream())
+                                        .collect(Collectors.toList()),
+                        Collections.emptyMap(),
+                        null));
         return TestFileStore.create(
                 "avro", tempDir.toString(), 1, partitionType, keyType, valueType, mergeFunction);
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
index 201eaab5..79ba448f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.Snapshot;
@@ -27,6 +28,8 @@ import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -37,6 +40,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -57,7 +61,7 @@ public class KeyValueFileStoreScanTest {
     private SnapshotManager snapshotManager;
 
     @BeforeEach
-    public void beforeEach() {
+    public void beforeEach() throws Exception {
         gen = new TestKeyValueGenerator();
         store =
                 TestFileStore.create(
@@ -69,6 +73,16 @@ public class KeyValueFileStoreScanTest {
                         TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         new DeduplicateMergeFunction());
         snapshotManager = store.snapshotManager();
+
+        SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+        schemaManager.commitNewVersion(
+                new UpdateSchema(
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+                        TestKeyValueGenerator.getPrimaryKeys(
+                                TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+                        Collections.emptyMap(),
+                        null));
     }
 
     @Test
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 9cc7ed32..620252bb 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -190,7 +190,6 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits =
                 table.newScan().withFilter(partitionFilter).withBucket(bucket).plan().splits;
         TableRead read = table.newRead();
-        getResult(read, splits, binaryRow(partition), bucket, STREAMING_ROW_TO_STRING);
 
         assertThat(getResult(read, splits, binaryRow(partition), bucket, STREAMING_ROW_TO_STRING))
                 .containsExactlyElementsOf(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 9b1495fe..8ff0b786 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -228,25 +228,46 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(GenericRowData.of(1, 60, 600L));
         commit.commit("2", write.prepareCommit(true));
 
-        write.close();
-
         PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
         List<Split> splits = table.newScan().plan().splits;
 
-        TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
+        // push down key filter a = 30
+        TableRead read = table.newRead().withFilter(builder.equal(1, 30));
+        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+
+        // push down value filter b = 300L
+        read = table.newRead().withFilter(builder.equal(2, 300L));
+        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+
+        // push down both key filter and value filter
+        read =
+                table.newRead()
+                        .withFilter(
+                                PredicateBuilder.or(builder.equal(1, 10), builder.equal(2, 300L)));
+        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(Arrays.asList("1|10|100", "1|20|200", "1|30|300", "1|40|400"));
+
+        // update pk 60, 10
+        write.write(GenericRowData.of(1, 60, 500L));
+        write.write(GenericRowData.of(1, 10, 10L));
+        commit.commit("3", write.prepareCommit(true));
+
+        write.close();
+
+        // cannot push down value filter b = 600L
+        splits = table.newScan().plan().splits;
+        read = table.newRead().withFilter(builder.equal(2, 600L));
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
                         Arrays.asList(
-                                "1|10|100",
+                                "1|10|10",
                                 "1|20|200",
                                 "1|30|300",
                                 "1|40|400",
                                 "1|50|500",
-                                "1|60|600"));
-
-        read = table.newRead().withFilter(builder.equal(1, 30));
-        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+                                "1|60|500"));
     }
 
     @Override
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
index 90618f1d..f908fae2 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
@@ -46,6 +46,8 @@ import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
@@ -65,7 +67,7 @@ public class AvroFileFormat extends FileFormat {
 
     @Override
     public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<Predicate> filters) {
+            RowType type, int[][] projection, @Nullable List<Predicate> filters) {
         // avro is a file format that keeps schemas in file headers,
         // if the schema given to the reader is not equal to the schema in header,
         // reader will automatically map the fields and give back records with our desired
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 880074c2..8b028210 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -38,6 +38,8 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.orc.TypeDescription;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -67,7 +69,7 @@ public class OrcFileFormat extends FileFormat {
 
     @Override
     public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<Predicate> filters) {
+            RowType type, int[][] projection, @Nullable List<Predicate> filters) {
         List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();
 
         if (filters != null) {