You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/03 00:11:05 UTC

[flink-table-store] branch master updated: [FLINK-27875] Introduce TableScan and TableRead as an abstraction layer above FileStore for reading RowData

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 1012ea20 [FLINK-27875] Introduce TableScan and TableRead as an abstraction layer above FileStore for reading RowData
1012ea20 is described below

commit 1012ea20cecbd6237acefc6427769bec164d5076
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Jun 3 08:11:00 2022 +0800

    [FLINK-27875] Introduce TableScan and TableRead as an abstraction layer above FileStore for reading RowData
    
    This closes #145
---
 .../flink/table/store/connector/TableStore.java    |  15 +-
 .../store/connector/TableStoreFactoryOptions.java  |  16 --
 .../store/connector/TableStoreManagedFactory.java  |   2 +-
 .../connector/source/FileStoreSourceTest.java      |   1 -
 .../flink/table/store/file/FileStoreImpl.java      |  12 +-
 .../flink/table/store/file/FileStoreOptions.java   |  12 ++
 .../store/file/predicate/PredicateBuilder.java     |  18 ++
 .../flink/table/store/file/schema/Schema.java      |  22 +++
 .../store/table/AppendOnlyFileStoreTable.java      |  89 ++++++++++
 .../table/ChangelogValueCountFileStoreTable.java   | 105 ++++++++++++
 .../table/ChangelogWithKeyFileStoreTable.java      | 130 +++++++++++++++
 .../flink/table/store/table/FileStoreTable.java    |  39 +++++
 .../table/store/table/FileStoreTableFactory.java   |  53 ++++++
 .../store/table/source/DefaultSplitGenerator.java  |  60 +++++++
 .../flink/table/store/table/source/Split.java      |  59 +++++++
 .../table/store/table/source/SplitGenerator.java   |  34 ++++
 .../flink/table/store/table/source/TableRead.java  |  72 ++++++++
 .../flink/table/store/table/source/TableScan.java  | 119 ++++++++++++++
 .../source/ValueContentRowDataRecordIterator.java  |  64 +++++++
 .../source/ValueCountRowDataRecordIterator.java    |  86 ++++++++++
 .../flink/table/store/file/TestFileStore.java      |   1 -
 .../store/file/predicate/PredicateBuilderTest.java |  53 ++++++
 .../table/store/file/utils/ReusingTestData.java    |   2 +-
 .../source/RowDataRecordIteratorTestBase.java      |  63 +++++++
 .../ValueContentRowDataRecordIteratorTest.java     |  51 ++++++
 .../ValueCountRowDataRecordIteratorTest.java       |  86 ++++++++++
 .../flink/table/store/TableStoreJobConf.java       |   1 +
 .../table/store/mapred/TableStoreInputFormat.java  | 183 ++++-----------------
 .../table/store/mapred/TableStoreInputSplit.java   |   8 +-
 .../table/store/mapred/TableStoreRecordReader.java |  15 +-
 .../flink/table/store/FileStoreTestHelper.java     |  75 ++++-----
 .../hive/TableStoreHiveStorageHandlerITCase.java   |  40 ++---
 .../store/mapred/TableStoreRecordReaderTest.java   |  28 ++--
 33 files changed, 1316 insertions(+), 298 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 3060323f..3e9b1f7c 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -180,7 +180,6 @@ public class TableStore {
         FileStoreOptions fileStoreOptions = new FileStoreOptions(options);
 
         return FileStoreImpl.createWithAppendOnly(
-                fileStoreOptions.path().toString(),
                 schema.id(),
                 fileStoreOptions,
                 user,
@@ -195,15 +194,9 @@ public class TableStore {
 
         if (trimmedPrimaryKeys.length == 0) {
             return FileStoreImpl.createWithValueCount(
-                    fileStoreOptions.path().toString(),
-                    schema.id(),
-                    fileStoreOptions,
-                    user,
-                    partitionType,
-                    type);
+                    schema.id(), fileStoreOptions, user, partitionType, type);
         } else {
             return FileStoreImpl.createWithPrimaryKey(
-                    fileStoreOptions.path().toString(),
                     schema.id(),
                     fileStoreOptions,
                     user,
@@ -215,7 +208,7 @@ public class TableStore {
     }
 
     private FileStore buildFileStore() {
-        WriteMode writeMode = options.get(TableStoreFactoryOptions.WRITE_MODE);
+        WriteMode writeMode = options.get(FileStoreOptions.WRITE_MODE);
 
         switch (writeMode) {
             case CHANGE_LOG:
@@ -320,7 +313,7 @@ public class TableStore {
         }
 
         private Source<RowData, ?, ?> buildSource() {
-            WriteMode writeMode = options.get(TableStoreFactoryOptions.WRITE_MODE);
+            WriteMode writeMode = options.get(FileStoreOptions.WRITE_MODE);
             if (isContinuous) {
                 if (schema.primaryKeys().size() > 0 && mergeEngine() == PARTIAL_UPDATE) {
                     throw new ValidationException(
@@ -411,7 +404,7 @@ public class TableStore {
         public DataStreamSink<?> build() {
             FileStore fileStore = buildFileStore();
             int numBucket = options.get(BUCKET);
-            WriteMode writeMode = options.get(TableStoreFactoryOptions.WRITE_MODE);
+            WriteMode writeMode = options.get(FileStoreOptions.WRITE_MODE);
 
             BucketStreamPartitioner partitioner =
                     new BucketStreamPartitioner(
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
index 2f429cce..b0aa9302 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -21,15 +21,11 @@ package org.apache.flink.table.store.connector;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.description.Description;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.store.file.WriteMode;
 
 import java.util.HashSet;
 import java.util.Set;
 
-import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
-
 /** Options for {@link TableStoreManagedFactory}. */
 public class TableStoreFactoryOptions {
 
@@ -67,18 +63,6 @@ public class TableStoreFactoryOptions {
                     .noDefaultValue()
                     .withDescription("The log system used to keep changes of the table.");
 
-    public static final ConfigOption<WriteMode> WRITE_MODE =
-            ConfigOptions.key("write-mode")
-                    .enumType(WriteMode.class)
-                    .defaultValue(WriteMode.CHANGE_LOG)
-                    .withDescription(
-                            Description.builder()
-                                    .text("Specify the write mode for table.")
-                                    .linebreak()
-                                    .list(formatEnumOption(WriteMode.APPEND_ONLY))
-                                    .list(formatEnumOption(WriteMode.CHANGE_LOG))
-                                    .build());
-
     public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;
 
     public static final ConfigOption<Integer> SCAN_PARALLELISM =
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
index 60547536..e858ca5e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
@@ -40,10 +40,10 @@ import java.util.Objects;
 import java.util.Optional;
 
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.WRITE_MODE;
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
 import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
 import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.file.FileStoreOptions.WRITE_MODE;
 import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
 
 /** Default implementation of {@link ManagedTableFactory}. */
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java
index 037c09e1..a33816a9 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java
@@ -146,7 +146,6 @@ public class FileStoreSourceTest {
         MergeFunction mergeFunction =
                 hasPk ? new DeduplicateMergeFunction() : new ValueCountMergeFunction();
         return new FileStoreImpl(
-                "/fake/path",
                 0,
                 new FileStoreOptions(new Configuration()),
                 WriteMode.CHANGE_LOG,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index f27808b1..6c10899b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.file;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
@@ -49,7 +48,6 @@ import java.util.stream.Collectors;
 /** File store implementation. */
 public class FileStoreImpl implements FileStore {
 
-    private final String tablePath;
     private final long schemaId;
     private final WriteMode writeMode;
     private final FileStoreOptions options;
@@ -61,7 +59,6 @@ public class FileStoreImpl implements FileStore {
     private final GeneratedRecordComparator genRecordComparator;
 
     public FileStoreImpl(
-            String tablePath,
             long schemaId,
             FileStoreOptions options,
             WriteMode writeMode,
@@ -70,7 +67,6 @@ public class FileStoreImpl implements FileStore {
             RowType keyType,
             RowType valueType,
             @Nullable MergeFunction mergeFunction) {
-        this.tablePath = tablePath;
         this.schemaId = schemaId;
         this.options = options;
         this.writeMode = writeMode;
@@ -86,7 +82,7 @@ public class FileStoreImpl implements FileStore {
 
     public FileStorePathFactory pathFactory() {
         return new FileStorePathFactory(
-                new Path(tablePath), partitionType, options.partitionDefaultName());
+                options.path(), partitionType, options.partitionDefaultName());
     }
 
     @VisibleForTesting
@@ -187,14 +183,12 @@ public class FileStoreImpl implements FileStore {
     }
 
     public static FileStoreImpl createWithAppendOnly(
-            String tablePath,
             long schemaId,
             FileStoreOptions options,
             String user,
             RowType partitionType,
             RowType rowType) {
         return new FileStoreImpl(
-                tablePath,
                 schemaId,
                 options,
                 WriteMode.APPEND_ONLY,
@@ -206,7 +200,6 @@ public class FileStoreImpl implements FileStore {
     }
 
     public static FileStoreImpl createWithPrimaryKey(
-            String tablePath,
             long schemaId,
             FileStoreOptions options,
             String user,
@@ -244,7 +237,6 @@ public class FileStoreImpl implements FileStore {
         }
 
         return new FileStoreImpl(
-                tablePath,
                 schemaId,
                 options,
                 WriteMode.CHANGE_LOG,
@@ -256,7 +248,6 @@ public class FileStoreImpl implements FileStore {
     }
 
     public static FileStoreImpl createWithValueCount(
-            String tablePath,
             long schemaId,
             FileStoreOptions options,
             String user,
@@ -267,7 +258,6 @@ public class FileStoreImpl implements FileStore {
                         new LogicalType[] {new BigIntType(false)}, new String[] {"_VALUE_COUNT"});
         MergeFunction mergeFunction = new ValueCountMergeFunction();
         return new FileStoreImpl(
-                tablePath,
                 schemaId,
                 options,
                 WriteMode.CHANGE_LOG,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 06dd7285..569ecc4b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -129,6 +129,18 @@ public class FileStoreOptions implements Serializable {
                                             formatEnumOption(MergeEngine.PARTIAL_UPDATE))
                                     .build());
 
+    public static final ConfigOption<WriteMode> WRITE_MODE =
+            ConfigOptions.key("write-mode")
+                    .enumType(WriteMode.class)
+                    .defaultValue(WriteMode.CHANGE_LOG)
+                    .withDescription(
+                            Description.builder()
+                                    .text("Specify the write mode for table.")
+                                    .linebreak()
+                                    .list(formatEnumOption(WriteMode.APPEND_ONLY))
+                                    .list(formatEnumOption(WriteMode.CHANGE_LOG))
+                                    .build());
+
     private final Configuration options;
 
     public static Set<ConfigOption<?>> allOptions() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
index b431d81e..03285ee0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -106,4 +107,21 @@ public class PredicateBuilder {
                         greaterOrEqual(idx, includedLowerBound),
                         lessOrEqual(idx, includedUpperBound)));
     }
+
+    public static List<Predicate> splitAnd(Predicate predicate) {
+        List<Predicate> result = new ArrayList<>();
+        splitAnd(predicate, result);
+        return result;
+    }
+
+    private static void splitAnd(Predicate predicate, List<Predicate> result) {
+        if (predicate instanceof CompoundPredicate
+                && ((CompoundPredicate) predicate).function().equals(And.INSTANCE)) {
+            for (Predicate child : ((CompoundPredicate) predicate).children()) {
+                splitAnd(child, result);
+            }
+        } else {
+            result.add(predicate);
+        }
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
index 0dbc08c0..438f4dac 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
@@ -84,6 +84,10 @@ public class Schema {
         return fields;
     }
 
+    public List<String> fieldNames() {
+        return fields.stream().map(DataField::name).collect(Collectors.toList());
+    }
+
     public int highestFieldId() {
         return highestFieldId;
     }
@@ -132,6 +136,24 @@ public class Schema {
         return (RowType) new RowDataType(fields).logicalType;
     }
 
+    public RowType logicalPartitionType() {
+        return projectedLogicalRowType(partitionKeys);
+    }
+
+    public RowType logicalPrimaryKeysType() {
+        return projectedLogicalRowType(primaryKeys);
+    }
+
+    private RowType projectedLogicalRowType(List<String> projectedFieldNames) {
+        List<String> fieldNames = fieldNames();
+        return (RowType)
+                new RowDataType(
+                                projectedFieldNames.stream()
+                                        .map(k -> fields.get(fieldNames.indexOf(k)))
+                                        .collect(Collectors.toList()))
+                        .logicalType;
+    }
+
     @Override
     public String toString() {
         return JsonSerdeUtil.toJson(this);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
new file mode 100644
index 00000000..53255ed7
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
+public class AppendOnlyFileStoreTable implements FileStoreTable {
+
+    private final Schema schema;
+    private final FileStoreImpl store;
+
+    AppendOnlyFileStoreTable(Schema schema, Configuration conf, String user) {
+        this.schema = schema;
+        this.store =
+                new FileStoreImpl(
+                        schema.id(),
+                        new FileStoreOptions(conf),
+                        WriteMode.APPEND_ONLY,
+                        user,
+                        schema.logicalPartitionType(),
+                        RowType.of(),
+                        schema.logicalRowType(),
+                        null);
+    }
+
+    @Override
+    public TableScan newScan(boolean incremental) {
+        FileStoreScan scan = store.newScan().withIncremental(incremental);
+        return new TableScan(scan, schema, store.pathFactory()) {
+            @Override
+            protected void withNonPartitionFilter(Predicate predicate) {
+                scan.withValueFilter(predicate);
+            }
+        };
+    }
+
+    @Override
+    public TableRead newRead(boolean incremental) {
+        return new TableRead(store.newRead()) {
+            @Override
+            public TableRead withProjection(int[][] projection) {
+                read.withValueProjection(projection);
+                return this;
+            }
+
+            @Override
+            protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(
+                    RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
+                return new ValueContentRowDataRecordIterator(kvRecordIterator);
+            }
+        };
+    }
+
+    @Override
+    public FileStore fileStore() {
+        return store;
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
new file mode 100644
index 00000000..feacd088
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode without primary keys. */
+public class ChangelogValueCountFileStoreTable implements FileStoreTable {
+
+    private final Schema schema;
+    private final FileStoreImpl store;
+
+    ChangelogValueCountFileStoreTable(Schema schema, Configuration conf, String user) {
+        this.schema = schema;
+        RowType countType =
+                RowType.of(
+                        new LogicalType[] {new BigIntType(false)}, new String[] {"_VALUE_COUNT"});
+        MergeFunction mergeFunction = new ValueCountMergeFunction();
+        this.store =
+                new FileStoreImpl(
+                        schema.id(),
+                        new FileStoreOptions(conf),
+                        WriteMode.CHANGE_LOG,
+                        user,
+                        schema.logicalPartitionType(),
+                        schema.logicalRowType(),
+                        countType,
+                        mergeFunction);
+    }
+
+    @Override
+    public TableScan newScan(boolean incremental) {
+        FileStoreScan scan = store.newScan().withIncremental(incremental);
+        return new TableScan(scan, schema, store.pathFactory()) {
+            @Override
+            protected void withNonPartitionFilter(Predicate predicate) {
+                scan.withKeyFilter(predicate);
+            }
+        };
+    }
+
+    @Override
+    public TableRead newRead(boolean incremental) {
+        FileStoreRead read = store.newRead().withDropDelete(!incremental);
+        return new TableRead(read) {
+            private int[][] projection = null;
+
+            @Override
+            public TableRead withProjection(int[][] projection) {
+                if (incremental) {
+                    read.withKeyProjection(projection);
+                } else {
+                    this.projection = projection;
+                }
+                return this;
+            }
+
+            @Override
+            protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(
+                    RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
+                return new ValueCountRowDataRecordIterator(kvRecordIterator, projection);
+            }
+        };
+    }
+
+    @Override
+    public FileStore fileStore() {
+        return store;
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
new file mode 100644
index 00000000..fd887f5d
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode with primary keys. */
+public class ChangelogWithKeyFileStoreTable implements FileStoreTable {
+
+    private final Schema schema;
+    private final FileStoreImpl store;
+
+    ChangelogWithKeyFileStoreTable(Schema schema, Configuration conf, String user) {
+        this.schema = schema;
+        RowType rowType = schema.logicalRowType();
+
+        // add _KEY_ prefix to avoid conflict with value
+        RowType keyType =
+                new RowType(
+                        schema.logicalPrimaryKeysType().getFields().stream()
+                                .map(
+                                        f ->
+                                                new RowType.RowField(
+                                                        "_KEY_" + f.getName(),
+                                                        f.getType(),
+                                                        f.getDescription().orElse(null)))
+                                .collect(Collectors.toList()));
+
+        FileStoreOptions.MergeEngine mergeEngine = conf.get(FileStoreOptions.MERGE_ENGINE);
+        MergeFunction mergeFunction;
+        switch (mergeEngine) {
+            case DEDUPLICATE:
+                mergeFunction = new DeduplicateMergeFunction();
+                break;
+            case PARTIAL_UPDATE:
+                List<LogicalType> fieldTypes = rowType.getChildren();
+                RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.size()];
+                for (int i = 0; i < fieldTypes.size(); i++) {
+                    fieldGetters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+                }
+                mergeFunction = new PartialUpdateMergeFunction(fieldGetters);
+                break;
+            default:
+                throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine);
+        }
+
+        this.store =
+                new FileStoreImpl(
+                        schema.id(),
+                        new FileStoreOptions(conf),
+                        WriteMode.CHANGE_LOG,
+                        user,
+                        schema.logicalPartitionType(),
+                        keyType,
+                        rowType,
+                        mergeFunction);
+    }
+
+    @Override
+    public TableScan newScan(boolean incremental) {
+        FileStoreScan scan = store.newScan().withIncremental(incremental);
+        return new TableScan(scan, schema, store.pathFactory()) {
+            @Override
+            protected void withNonPartitionFilter(Predicate predicate) {
+                scan.withValueFilter(predicate);
+            }
+        };
+    }
+
+    @Override
+    public TableRead newRead(boolean incremental) {
+        FileStoreRead read = store.newRead().withDropDelete(!incremental);
+        return new TableRead(read) {
+            @Override
+            public TableRead withProjection(int[][] projection) {
+                read.withValueProjection(projection);
+                return this;
+            }
+
+            @Override
+            protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(
+                    RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
+                return new ValueContentRowDataRecordIterator(kvRecordIterator);
+            }
+        };
+    }
+
+    @Override
+    public FileStore fileStore() {
+        return store;
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
new file mode 100644
index 00000000..b2542ce1
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+
+/**
+ * An abstraction layer above {@link org.apache.flink.table.store.file.FileStore} to provide reading
+ * and writing of {@link org.apache.flink.table.data.RowData}.
+ */
+public interface FileStoreTable {
+
+    TableScan newScan(boolean incremental);
+
+    TableRead newRead(boolean incremental);
+
+    // TODO remove this once TableWrite is introduced
+    @VisibleForTesting
+    FileStore fileStore();
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
new file mode 100644
index 00000000..9f9f1d08
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+
+/** Factory to create {@link FileStoreTable}. */
+public class FileStoreTableFactory {
+
+    public static FileStoreTable create(Configuration conf, String user) {
+        Path tablePath = FileStoreOptions.path(conf);
+        Schema schema =
+                new SchemaManager(tablePath)
+                        .latest()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalArgumentException(
+                                                "Schema file not found in location "
+                                                        + tablePath
+                                                        + ". Please create table first."));
+
+        if (conf.get(FileStoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
+            return new AppendOnlyFileStoreTable(schema, conf, user);
+        } else {
+            if (schema.primaryKeys().isEmpty()) {
+                return new ChangelogValueCountFileStoreTable(schema, conf, user);
+            } else {
+                return new ChangelogWithKeyFileStoreTable(schema, conf, user);
+            }
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DefaultSplitGenerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DefaultSplitGenerator.java
new file mode 100644
index 00000000..01107bb8
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DefaultSplitGenerator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Default implementation of {@link SplitGenerator}. The whole bucket is marked as one split. */
+public class DefaultSplitGenerator implements SplitGenerator {
+
+    private final FileStorePathFactory pathFactory;
+
+    public DefaultSplitGenerator(FileStorePathFactory pathFactory) {
+        this.pathFactory = pathFactory;
+    }
+
+    @Override
+    public List<Split> generate(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupedDataFileMetas) {
+        List<Split> splits = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entryWithPartition :
+                groupedDataFileMetas.entrySet()) {
+            BinaryRowData partition = entryWithPartition.getKey();
+            for (Map.Entry<Integer, List<DataFileMeta>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                int bucket = entryWithBucket.getKey();
+                splits.add(
+                        new Split(
+                                partition,
+                                bucket,
+                                entryWithBucket.getValue(),
+                                pathFactory
+                                        .createDataFilePathFactory(partition, bucket)
+                                        .bucketPath()));
+            }
+        }
+        return splits;
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
new file mode 100644
index 00000000..fcb12739
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import java.util.List;
+
+/** Input splits. Needed by most batch computation engines. */
+public class Split {
+
+    private final BinaryRowData partition;
+    private final int bucket;
+    private final List<DataFileMeta> files;
+
+    private final Path bucketPath;
+
+    public Split(BinaryRowData partition, int bucket, List<DataFileMeta> files, Path bucketPath) {
+        this.partition = partition;
+        this.bucket = bucket;
+        this.files = files;
+
+        this.bucketPath = bucketPath;
+    }
+
+    public BinaryRowData partition() {
+        return partition;
+    }
+
+    public int bucket() {
+        return bucket;
+    }
+
+    public List<DataFileMeta> files() {
+        return files;
+    }
+
+    public Path bucketPath() {
+        return bucketPath;
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
new file mode 100644
index 00000000..fb0ce382
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Generate {@link Split}s from a map with partition and bucket as keys and {@link DataFileMeta}s as
+ * values.
+ */
+public interface SplitGenerator {
+
+    List<Split> generate(Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupedDataFileMetas);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
new file mode 100644
index 00000000..c5989aa4
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.utils.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+/** An abstraction layer above {@link FileStoreRead} to provide reading of {@link RowData}. */
+public abstract class TableRead {
+
+    protected final FileStoreRead read;
+
+    protected TableRead(FileStoreRead read) {
+        this.read = read;
+    }
+
+    public abstract TableRead withProjection(int[][] projection);
+
+    public RecordReader<RowData> createReader(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
+        return new RowDataRecordReader(read.createReader(partition, bucket, files));
+    }
+
+    protected abstract RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(
+            RecordReader.RecordIterator<KeyValue> kvRecordIterator);
+
+    private class RowDataRecordReader implements RecordReader<RowData> {
+
+        private final RecordReader<KeyValue> wrapped;
+
+        private RowDataRecordReader(RecordReader<KeyValue> wrapped) {
+            this.wrapped = wrapped;
+        }
+
+        @Nullable
+        @Override
+        public RecordIterator<RowData> readBatch() throws IOException {
+            RecordIterator<KeyValue> batch = wrapped.readBatch();
+            return batch == null ? null : rowDataRecordIteratorFromKv(batch);
+        }
+
+        @Override
+        public void close() throws IOException {
+            wrapped.close();
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
new file mode 100644
index 00000000..efd6759e
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.CompoundPredicate;
+import org.apache.flink.table.store.file.predicate.LeafPredicate;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
+public abstract class TableScan {
+
+    protected final FileStoreScan scan;
+    private final Schema schema;
+    private final FileStorePathFactory pathFactory;
+
+    protected TableScan(FileStoreScan scan, Schema schema, FileStorePathFactory pathFactory) {
+        this.scan = scan;
+        this.schema = schema;
+        this.pathFactory = pathFactory;
+    }
+
+    public TableScan withSnapshot(long snapshotId) {
+        scan.withSnapshot(snapshotId);
+        return this;
+    }
+
+    public TableScan withFilter(Predicate predicate) {
+        List<String> partitionKeys = schema.partitionKeys();
+        int[] fieldIdxToPartitionIdx =
+                schema.fields().stream().mapToInt(f -> partitionKeys.indexOf(f.name())).toArray();
+
+        List<Predicate> partitionFilters = new ArrayList<>();
+        List<Predicate> nonPartitionFilters = new ArrayList<>();
+        for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
+            Optional<Predicate> mapped = mapToPartitionFilter(p, fieldIdxToPartitionIdx);
+            if (mapped.isPresent()) {
+                partitionFilters.add(mapped.get());
+            } else {
+                nonPartitionFilters.add(p);
+            }
+        }
+
+        scan.withPartitionFilter(new CompoundPredicate(And.INSTANCE, partitionFilters));
+        withNonPartitionFilter(new CompoundPredicate(And.INSTANCE, nonPartitionFilters));
+        return this;
+    }
+
+    public Plan plan() {
+        FileStoreScan.Plan plan = scan.plan();
+        return new Plan(
+                plan.snapshotId(),
+                new DefaultSplitGenerator(pathFactory).generate(plan.groupByPartFiles()));
+    }
+
+    protected abstract void withNonPartitionFilter(Predicate predicate);
+
+    private Optional<Predicate> mapToPartitionFilter(
+            Predicate predicate, int[] fieldIdxToPartitionIdx) {
+        if (predicate instanceof CompoundPredicate) {
+            CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
+            List<Predicate> children = new ArrayList<>();
+            for (Predicate child : compoundPredicate.children()) {
+                Optional<Predicate> mapped = mapToPartitionFilter(child, fieldIdxToPartitionIdx);
+                if (mapped.isPresent()) {
+                    children.add(mapped.get());
+                } else {
+                    return Optional.empty();
+                }
+            }
+            return Optional.of(new CompoundPredicate(compoundPredicate.function(), children));
+        } else {
+            LeafPredicate leafPredicate = (LeafPredicate) predicate;
+            int mapped = fieldIdxToPartitionIdx[leafPredicate.index()];
+            if (mapped >= 0) {
+                return Optional.of(
+                        new LeafPredicate(
+                                leafPredicate.function(), mapped, leafPredicate.literal()));
+            } else {
+                return Optional.empty();
+            }
+        }
+    }
+
+    /** Scanning plan containing snapshot ID and input splits. */
+    public static class Plan {
+        public final long snapshotId;
+        public final List<Split> splits;
+
+        private Plan(long snapshotId, List<Split> splits) {
+            this.snapshotId = snapshotId;
+            this.splits = splits;
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java
new file mode 100644
index 00000000..3f8b7d81
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+
+/** A {@link RecordReader.RecordIterator} mapping a {@link KeyValue} to its value. */
+public class ValueContentRowDataRecordIterator implements RecordReader.RecordIterator<RowData> {
+
+    private final RecordReader.RecordIterator<KeyValue> kvIterator;
+
+    public ValueContentRowDataRecordIterator(RecordReader.RecordIterator<KeyValue> kvIterator) {
+        this.kvIterator = kvIterator;
+    }
+
+    @Override
+    public RowData next() throws IOException {
+        KeyValue kv = kvIterator.next();
+        if (kv == null) {
+            return null;
+        }
+
+        RowData rowData = kv.value();
+        // kv.value() is reused, so we need to set row kind each time
+        switch (kv.valueKind()) {
+            case ADD:
+                rowData.setRowKind(RowKind.INSERT);
+                break;
+            case DELETE:
+                rowData.setRowKind(RowKind.DELETE);
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown value kind " + kv.valueKind().name());
+        }
+        return rowData;
+    }
+
+    @Override
+    public void releaseBatch() {
+        kvIterator.releaseBatch();
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java
new file mode 100644
index 00000000..1bb2f2a5
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * An {@link RecordReader.RecordIterator} mapping a {@link KeyValue} to several {@link RowData}
+ * according to its key. These {@link RowData}s are the same. The number of rows depends on the
+ * value of {@link KeyValue}.
+ */
+public class ValueCountRowDataRecordIterator implements RecordReader.RecordIterator<RowData> {
+
+    private final RecordReader.RecordIterator<KeyValue> kvIterator;
+    private final @Nullable ProjectedRowData projectedRowData;
+
+    private RowData rowData;
+    private long count;
+
+    public ValueCountRowDataRecordIterator(
+            RecordReader.RecordIterator<KeyValue> kvIterator, @Nullable int[][] projection) {
+        this.kvIterator = kvIterator;
+        this.projectedRowData = projection == null ? null : ProjectedRowData.from(projection);
+
+        this.rowData = null;
+        this.count = 0;
+    }
+
+    @Override
+    public RowData next() throws IOException {
+        while (true) {
+            if (count > 0) {
+                count--;
+                return rowData;
+            } else {
+                KeyValue kv = kvIterator.next();
+                if (kv == null) {
+                    return null;
+                }
+
+                if (projectedRowData == null) {
+                    rowData = kv.key();
+                } else {
+                    rowData = projectedRowData.replaceRow(kv.key());
+                }
+                long value = kv.value().getLong(0);
+                // kv.value() is reused, so we need to set row kind each time
+                if (value > 0) {
+                    rowData.setRowKind(RowKind.INSERT);
+                } else {
+                    rowData.setRowKind(RowKind.DELETE);
+                }
+                count = Math.abs(value);
+            }
+        }
+    }
+
+    @Override
+    public void releaseBatch() {
+        kvIterator.releaseBatch();
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 03a81bdd..e1688f2b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -111,7 +111,6 @@ public class TestFileStore extends FileStoreImpl {
             RowType valueType,
             MergeFunction mergeFunction) {
         super(
-                options.path().toString(),
                 0L,
                 options,
                 WriteMode.CHANGE_LOG,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
new file mode 100644
index 00000000..dd190998
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PredicateBuilder}. */
+public class PredicateBuilderTest {
+
+    @Test
+    public void testSplitAnd() {
+        Predicate child1 =
+                PredicateBuilder.or(
+                        PredicateBuilder.isNull(0),
+                        PredicateBuilder.isNull(1),
+                        PredicateBuilder.isNull(2));
+        Predicate child2 =
+                PredicateBuilder.and(
+                        PredicateBuilder.isNull(3),
+                        PredicateBuilder.isNull(4),
+                        PredicateBuilder.isNull(5));
+        Predicate child3 = PredicateBuilder.isNull(6);
+
+        assertThat(PredicateBuilder.splitAnd(PredicateBuilder.and(child1, child2, child3)))
+                .isEqualTo(
+                        Arrays.asList(
+                                child1,
+                                PredicateBuilder.isNull(3),
+                                PredicateBuilder.isNull(4),
+                                PredicateBuilder.isNull(5),
+                                child3));
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
index 3e6a53ac..25581938 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
@@ -33,7 +33,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * A simple test data structure which is mainly used for testing if other components handle reuse of
- * {@link KeyValue} correctly. Use along with {@link ReusingTestData}.
+ * {@link KeyValue} correctly. Use along with {@link ReusingKeyValue}.
  */
 public class ReusingTestData implements Comparable<ReusingTestData> {
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/RowDataRecordIteratorTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/RowDataRecordIteratorTestBase.java
new file mode 100644
index 00000000..e422514d
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/RowDataRecordIteratorTestBase.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.table.store.file.utils.TestReusingRecordReader;
+
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/** Tests for {@link RecordReader.RecordIterator} of {@link RowData}. */
+public abstract class RowDataRecordIteratorTestBase {
+
+    protected void testIterator(
+            List<ReusingTestData> input,
+            Function<RecordReader.RecordIterator<KeyValue>, RecordReader.RecordIterator<RowData>>
+                    rowDataIteratorSupplier,
+            BiConsumer<RowData, Integer> resultChecker)
+            throws Exception {
+        int cnt = 0;
+        TestReusingRecordReader recordReader = new TestReusingRecordReader(input);
+        while (true) {
+            RecordReader.RecordIterator<KeyValue> kvIterator = recordReader.readBatch();
+            if (kvIterator == null) {
+                break;
+            }
+            RecordReader.RecordIterator<RowData> rowDataIterator =
+                    rowDataIteratorSupplier.apply(kvIterator);
+            RowData rowData;
+            while (true) {
+                rowData = rowDataIterator.next();
+                if (rowData == null) {
+                    break;
+                }
+                resultChecker.accept(rowData, cnt);
+                cnt++;
+            }
+            rowDataIterator.releaseBatch();
+        }
+        recordReader.close();
+        recordReader.assertCleanUp();
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIteratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIteratorTest.java
new file mode 100644
index 00000000..601059f2
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIteratorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ValueContentRowDataRecordIterator}. */
+public class ValueContentRowDataRecordIteratorTest extends RowDataRecordIteratorTestBase {
+
+    @Test
+    public void testIterator() throws Exception {
+        List<ReusingTestData> input =
+                ReusingTestData.parse("1, 1, +, 100 | 2, 2, +, 200 | 1, 3, -, 100 | 2, 4, +, 300");
+        List<Long> expectedValues = Arrays.asList(100L, 200L, 100L, 300L);
+        List<RowKind> expectedRowKinds =
+                Arrays.asList(RowKind.INSERT, RowKind.INSERT, RowKind.DELETE, RowKind.INSERT);
+
+        testIterator(
+                input,
+                ValueContentRowDataRecordIterator::new,
+                (rowData, idx) -> {
+                    assertThat(rowData.getArity()).isEqualTo(1);
+                    assertThat(rowData.getLong(0)).isEqualTo(expectedValues.get(idx));
+                    assertThat(rowData.getRowKind()).isEqualTo(expectedRowKinds.get(idx));
+                });
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java
new file mode 100644
index 00000000..0f6dd6f5
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ValueCountRowDataRecordIterator}. */
+public class ValueCountRowDataRecordIteratorTest extends RowDataRecordIteratorTestBase {
+
+    @Test
+    public void testIteratorWithoutProjection() throws Exception {
+        List<ReusingTestData> input =
+                ReusingTestData.parse("1, 1, +, 3 | 2, 2, +, 1 | 1, 3, +, -2 | 2, 4, +, -1");
+        List<Integer> expectedValues = Arrays.asList(1, 1, 1, 2, 1, 1, 2);
+        List<RowKind> expectedRowKinds =
+                Arrays.asList(
+                        RowKind.INSERT,
+                        RowKind.INSERT,
+                        RowKind.INSERT,
+                        RowKind.INSERT,
+                        RowKind.DELETE,
+                        RowKind.DELETE,
+                        RowKind.DELETE);
+
+        testIterator(
+                input,
+                kvIterator -> new ValueCountRowDataRecordIterator(kvIterator, null),
+                (rowData, idx) -> {
+                    assertThat(rowData.getArity()).isEqualTo(1);
+                    assertThat(rowData.getInt(0)).isEqualTo(expectedValues.get(idx));
+                    assertThat(rowData.getRowKind()).isEqualTo(expectedRowKinds.get(idx));
+                });
+    }
+
+    @Test
+    public void testIteratorWithProjection() throws Exception {
+        List<ReusingTestData> input =
+                ReusingTestData.parse("1, 1, +, 3 | 2, 2, +, 1 | 1, 3, +, -2 | 2, 4, +, -1");
+        List<Integer> expectedValues = Arrays.asList(1, 1, 1, 2, 1, 1, 2);
+        List<RowKind> expectedRowKinds =
+                Arrays.asList(
+                        RowKind.INSERT,
+                        RowKind.INSERT,
+                        RowKind.INSERT,
+                        RowKind.INSERT,
+                        RowKind.DELETE,
+                        RowKind.DELETE,
+                        RowKind.DELETE);
+
+        testIterator(
+                input,
+                kvIterator ->
+                        new ValueCountRowDataRecordIterator(
+                                kvIterator, new int[][] {new int[] {0}, new int[] {0}}),
+                (rowData, idx) -> {
+                    assertThat(rowData.getArity()).isEqualTo(2);
+                    assertThat(rowData.getInt(0)).isEqualTo(expectedValues.get(idx));
+                    assertThat(rowData.getInt(1)).isEqualTo(expectedValues.get(idx));
+                    assertThat(rowData.getRowKind()).isEqualTo(expectedRowKinds.get(idx));
+                });
+    }
+}
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
index d65a857f..9af0648f 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
@@ -136,6 +136,7 @@ public class TableStoreJobConf {
     }
 
     public void updateFileStoreOptions(Configuration fileStoreOptions) {
+        fileStoreOptions.set(FileStoreOptions.PATH, getLocation());
         for (Map.Entry<String, String> entry :
                 jobConf.getPropsWithPrefix(INTERNAL_TBLPROPERTIES_PREFIX).entrySet()) {
             fileStoreOptions.setString(entry.getKey(), entry.getValue());
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 76352c90..25f9dcda 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -19,21 +19,13 @@
 package org.apache.flink.table.store.mapred;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.RowDataContainer;
 import org.apache.flink.table.store.SearchArgumentToPredicateConverter;
 import org.apache.flink.table.store.TableStoreJobConf;
-import org.apache.flink.table.store.file.FileStoreImpl;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.operation.FileStoreRead;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
 import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.TableScan;
 
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -46,15 +38,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /**
  * {@link InputFormat} for table store. It divides all files into {@link InputSplit}s (one split per
@@ -64,155 +49,45 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
 
     @Override
     public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
-        FileStoreWrapper wrapper = new FileStoreWrapper(jobConf);
-        FileStoreScan scan = wrapper.newScan();
-        List<TableStoreInputSplit> result = new ArrayList<>();
-        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> pe :
-                scan.plan().groupByPartFiles().entrySet()) {
-            for (Map.Entry<Integer, List<DataFileMeta>> be : pe.getValue().entrySet()) {
-                BinaryRowData partition = pe.getKey();
-                int bucket = be.getKey();
-                String bucketPath =
-                        wrapper.store
-                                .pathFactory()
-                                .createDataFilePathFactory(partition, bucket)
-                                .bucketPath()
-                                .toString();
-                TableStoreInputSplit split =
-                        new TableStoreInputSplit(partition, bucket, be.getValue(), bucketPath);
-                result.add(split);
-            }
-        }
-        return result.toArray(new TableStoreInputSplit[0]);
+        FileStoreTable table = createFileStoreTable(jobConf);
+        TableScan scan = table.newScan(false);
+        createPredicate(jobConf).ifPresent(scan::withFilter);
+        return scan.plan().splits.stream()
+                .map(TableStoreInputSplit::create)
+                .toArray(TableStoreInputSplit[]::new);
     }
 
     @Override
     public RecordReader<Void, RowDataContainer> getRecordReader(
             InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
-        FileStoreWrapper wrapper = new FileStoreWrapper(jobConf);
-        FileStoreRead read = wrapper.store.newRead();
+        FileStoreTable table = createFileStoreTable(jobConf);
         TableStoreInputSplit split = (TableStoreInputSplit) inputSplit;
-        org.apache.flink.table.store.file.utils.RecordReader<KeyValue> wrapped =
-                read.withDropDelete(true)
-                        .createReader(split.partition(), split.bucket(), split.files());
         long splitLength = split.getLength();
         return new TableStoreRecordReader(
-                wrapped,
-                !new TableStoreJobConf(jobConf).getPrimaryKeyNames().isPresent(),
+                table.newRead(false).createReader(split.partition(), split.bucket(), split.files()),
                 splitLength);
     }
 
-    private static class FileStoreWrapper {
-
-        private List<String> columnNames;
-        private List<LogicalType> columnTypes;
-
-        private FileStoreImpl store;
-        private boolean valueCountMode;
-        @Nullable private Predicate predicate;
-
-        private FileStoreWrapper(JobConf jobConf) {
-            createFileStore(jobConf);
-            createPredicate(jobConf);
-        }
-
-        private void createFileStore(JobConf jobConf) {
-            TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
-
-            String dbName = wrapper.getDbName();
-            String tableName = wrapper.getTableName();
-
-            Configuration options = new Configuration();
-            String tableLocation = wrapper.getLocation();
-            wrapper.updateFileStoreOptions(options);
-
-            String user = wrapper.getFileStoreUser();
-
-            columnNames = wrapper.getColumnNames();
-            columnTypes = wrapper.getColumnTypes();
-
-            List<String> partitionColumnNames = wrapper.getPartitionColumnNames();
-
-            RowType rowType =
-                    RowType.of(
-                            columnTypes.toArray(new LogicalType[0]),
-                            columnNames.toArray(new String[0]));
-            LogicalType[] partitionLogicalTypes =
-                    partitionColumnNames.stream()
-                            .map(s -> columnTypes.get(columnNames.indexOf(s)))
-                            .toArray(LogicalType[]::new);
-            RowType partitionType =
-                    RowType.of(partitionLogicalTypes, partitionColumnNames.toArray(new String[0]));
-
-            Optional<List<String>> optionalPrimaryKeyNames = wrapper.getPrimaryKeyNames();
-            if (optionalPrimaryKeyNames.isPresent()) {
-                Function<String, RowType.RowField> rowFieldMapper =
-                        s -> {
-                            int idx = columnNames.indexOf(s);
-                            Preconditions.checkState(
-                                    idx >= 0,
-                                    "Primary key column "
-                                            + s
-                                            + " not found in table "
-                                            + dbName
-                                            + "."
-                                            + tableName);
-                            return new RowType.RowField(s, columnTypes.get(idx));
-                        };
-                RowType primaryKeyType =
-                        new RowType(
-                                optionalPrimaryKeyNames.get().stream()
-                                        .map(rowFieldMapper)
-                                        .collect(Collectors.toList()));
-                store =
-                        FileStoreImpl.createWithPrimaryKey(
-                                tableLocation,
-                                0, // TODO
-                                new FileStoreOptions(options),
-                                user,
-                                partitionType,
-                                primaryKeyType,
-                                rowType,
-                                options.get(FileStoreOptions.MERGE_ENGINE));
-                valueCountMode = false;
-            } else {
-                store =
-                        FileStoreImpl.createWithValueCount(
-                                tableLocation,
-                                0, // TODO
-                                new FileStoreOptions(options),
-                                user,
-                                partitionType,
-                                rowType);
-                valueCountMode = true;
-            }
-        }
-
-        private void createPredicate(JobConf jobConf) {
-            String hiveFilter = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
-            if (hiveFilter == null) {
-                return;
-            }
+    private FileStoreTable createFileStoreTable(JobConf jobConf) {
+        TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
+        Configuration conf = new Configuration();
+        wrapper.updateFileStoreOptions(conf);
+        return FileStoreTableFactory.create(conf, wrapper.getFileStoreUser());
+    }
 
-            ExprNodeGenericFuncDesc exprNodeDesc =
-                    SerializationUtilities.deserializeObject(
-                            hiveFilter, ExprNodeGenericFuncDesc.class);
-            SearchArgument sarg = ConvertAstToSearchArg.create(jobConf, exprNodeDesc);
-            SearchArgumentToPredicateConverter converter =
-                    new SearchArgumentToPredicateConverter(sarg, columnNames, columnTypes);
-            predicate = converter.convert().orElse(null);
+    private Optional<Predicate> createPredicate(JobConf jobConf) {
+        String hiveFilter = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+        if (hiveFilter == null) {
+            return Optional.empty();
         }
 
-        private FileStoreScanImpl newScan() {
-            FileStoreScanImpl scan = store.newScan();
-            if (predicate != null) {
-                if (valueCountMode) {
-                    scan.withKeyFilter(predicate);
-                } else {
-                    scan.withValueFilter(predicate);
-                }
-            }
-            return scan;
-        }
+        TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
+        ExprNodeGenericFuncDesc exprNodeDesc =
+                SerializationUtilities.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
+        SearchArgument sarg = ConvertAstToSearchArg.create(jobConf, exprNodeDesc);
+        SearchArgumentToPredicateConverter converter =
+                new SearchArgumentToPredicateConverter(
+                        sarg, wrapper.getColumnNames(), wrapper.getColumnTypes());
+        return converter.convert();
     }
 }
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
index 924167b8..4bbef384 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
 import org.apache.flink.table.store.file.utils.SerializationUtils;
+import org.apache.flink.table.store.table.source.Split;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
@@ -48,7 +49,6 @@ public class TableStoreInputSplit extends FileSplit {
     private BinaryRowData partition;
     private int bucket;
     private List<DataFileMeta> files;
-
     private String bucketPath;
 
     // public no-argument constructor for deserialization
@@ -59,10 +59,14 @@ public class TableStoreInputSplit extends FileSplit {
         this.partition = partition;
         this.bucket = bucket;
         this.files = files;
-
         this.bucketPath = bucketPath;
     }
 
+    public static TableStoreInputSplit create(Split split) {
+        return new TableStoreInputSplit(
+                split.partition(), split.bucket(), split.files(), split.bucketPath().toString());
+    }
+
     public BinaryRowData partition() {
         return partition;
     }
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
index c5dbb2cb..bf1f493c 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
@@ -21,14 +21,11 @@ package org.apache.flink.table.store.mapred;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.RowDataContainer;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.utils.PrimaryKeyRowDataSupplier;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.file.utils.ValueCountRowDataSupplier;
 
 import org.apache.hadoop.mapred.RecordReader;
 
 import java.io.IOException;
-import java.util.function.Supplier;
 
 /**
  * Base {@link RecordReader} for table store. Reads {@link KeyValue}s from data files and picks out
@@ -36,28 +33,22 @@ import java.util.function.Supplier;
  */
 public class TableStoreRecordReader implements RecordReader<Void, RowDataContainer> {
 
-    private final RecordReaderIterator<KeyValue> iterator;
-    private final Supplier<RowData> rowDataSupplier;
+    private final RecordReaderIterator<RowData> iterator;
     private final long splitLength;
 
     private float progress;
 
     public TableStoreRecordReader(
-            org.apache.flink.table.store.file.utils.RecordReader<KeyValue> wrapped,
-            boolean valueCountMode,
+            org.apache.flink.table.store.file.utils.RecordReader<RowData> wrapped,
             long splitLength) {
         this.iterator = new RecordReaderIterator<>(wrapped);
-        this.rowDataSupplier =
-                valueCountMode
-                        ? new ValueCountRowDataSupplier(iterator::next)
-                        : new PrimaryKeyRowDataSupplier(iterator::next);
         this.splitLength = splitLength;
         this.progress = 0;
     }
 
     @Override
     public boolean next(Void key, RowDataContainer value) throws IOException {
-        RowData rowData = rowDataSupplier.get();
+        RowData rowData = iterator.next();
         if (rowData == null) {
             progress = 1;
             return false;
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
index c0b83c7c..13b007fc 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
@@ -20,23 +20,25 @@ package org.apache.flink.table.store;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.FileStoreImpl;
 import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
-import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
-import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
-import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
-import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Collections;
@@ -48,12 +50,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.BiFunction;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /** Helper class to write and read {@link RowData} with {@link FileStoreImpl}. */
 public class FileStoreTestHelper {
 
-    private final FileStoreImpl store;
+    private final FileStoreTable table;
+    private final FileStore store;
     private final BiFunction<RowData, RowData, BinaryRowData> partitionCalculator;
     private final Function<RowData, Integer> bucketCalculator;
     private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
@@ -61,24 +63,18 @@ public class FileStoreTestHelper {
 
     public FileStoreTestHelper(
             Configuration conf,
-            RowType partitionType,
-            RowType keyType,
-            RowType valueType,
-            MergeFunction mergeFunction,
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
             BiFunction<RowData, RowData, BinaryRowData> partitionCalculator,
-            Function<RowData, Integer> bucketCalculator) {
-        FileStoreOptions options = new FileStoreOptions(conf);
-        this.store =
-                new FileStoreImpl(
-                        options.path().toString(),
-                        0,
-                        options,
-                        WriteMode.CHANGE_LOG,
-                        UUID.randomUUID().toString(),
-                        partitionType,
-                        keyType,
-                        valueType,
-                        mergeFunction);
+            Function<RowData, Integer> bucketCalculator)
+            throws Exception {
+        Path tablePath = FileStoreOptions.path(conf);
+        new SchemaManager(tablePath)
+                .commitNewVersion(
+                        new UpdateSchema(rowType, partitionKeys, primaryKeys, new HashMap<>(), ""));
+        this.table = FileStoreTableFactory.create(conf, "user");
+        this.store = table.fileStore();
         this.partitionCalculator = partitionCalculator;
         this.bucketCalculator = bucketCalculator;
         this.writers = new HashMap<>();
@@ -94,7 +90,7 @@ public class FileStoreTestHelper {
                                 bucket,
                                 (b, w) -> {
                                     if (w == null) {
-                                        FileStoreWriteImpl write = store.newWrite();
+                                        FileStoreWrite write = store.newWrite();
                                         return write.createWriter(
                                                 partition, bucket, compactExecutor);
                                     } else {
@@ -119,21 +115,20 @@ public class FileStoreTestHelper {
             }
         }
         writers.clear();
-        FileStoreCommitImpl commit = store.newCommit();
+        FileStoreCommit commit = store.newCommit();
         commit.commit(committable, Collections.emptyMap());
     }
 
-    public Tuple2<RecordReader<KeyValue>, Long> read(BinaryRowData partition, int bucket)
+    public Tuple2<RecordReader<RowData>, Long> read(BinaryRowData partition, int bucket)
             throws Exception {
-        FileStoreScanImpl scan = store.newScan();
-        scan.withPartitionFilter(Collections.singletonList(partition)).withBucket(bucket);
-        List<ManifestEntry> files = scan.plan().files();
-        FileStoreReadImpl read = store.newRead();
-        RecordReader<KeyValue> wrapped =
-                read.createReader(
-                        partition,
-                        bucket,
-                        files.stream().map(ManifestEntry::file).collect(Collectors.toList()));
-        return Tuple2.of(wrapped, files.stream().mapToLong(e -> e.file().fileSize()).sum());
+        for (Split split : table.newScan(false).plan().splits) {
+            if (split.partition().equals(partition) && split.bucket() == bucket) {
+                return Tuple2.of(
+                        table.newRead(false).createReader(partition, bucket, split.files()),
+                        split.files().stream().mapToLong(DataFileMeta::fileSize).sum());
+            }
+        }
+        throw new IllegalArgumentException(
+                "Input split not found for partition " + partition + " and bucket " + bucket);
     }
 }
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 5f33a37e..2ff8d7a3 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -28,8 +28,6 @@ import org.apache.flink.table.data.binary.BinaryRowDataUtil;
 import org.apache.flink.table.store.FileStoreTestHelper;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -90,13 +88,6 @@ public class TableStoreHiveStorageHandlerITCase {
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
                         conf,
-                        RowType.of(),
-                        RowType.of(
-                                new LogicalType[] {
-                                    DataTypes.INT().getLogicalType(),
-                                    DataTypes.BIGINT().getLogicalType()
-                                },
-                                new String[] {"_KEY_a", "_KEY_b"}),
                         RowType.of(
                                 new LogicalType[] {
                                     DataTypes.INT().getLogicalType(),
@@ -104,7 +95,8 @@ public class TableStoreHiveStorageHandlerITCase {
                                     DataTypes.STRING().getLogicalType()
                                 },
                                 new String[] {"a", "b", "c"}),
-                        new DeduplicateMergeFunction(),
+                        Collections.emptyList(),
+                        Arrays.asList("a", "b"),
                         (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
                         k -> k.getInt(0) % 2);
 
@@ -166,7 +158,6 @@ public class TableStoreHiveStorageHandlerITCase {
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
                         conf,
-                        RowType.of(),
                         RowType.of(
                                 new LogicalType[] {
                                     DataTypes.INT().getLogicalType(),
@@ -174,10 +165,8 @@ public class TableStoreHiveStorageHandlerITCase {
                                     DataTypes.STRING().getLogicalType()
                                 },
                                 new String[] {"a", "b", "c"}),
-                        RowType.of(
-                                new LogicalType[] {DataTypes.BIGINT().getLogicalType()},
-                                new String[] {"_VALUE_COUNT"}),
-                        new ValueCountMergeFunction(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
                         (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
                         k -> k.getInt(0) % 2);
 
@@ -238,10 +227,6 @@ public class TableStoreHiveStorageHandlerITCase {
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
                         conf,
-                        RowType.of(),
-                        RowType.of(
-                                new LogicalType[] {DataTypes.INT().getLogicalType()},
-                                new String[] {"_KEY_f_int"}),
                         RowType.of(
                                 RandomGenericRowDataGenerator.TYPE_INFOS.stream()
                                         .map(
@@ -250,7 +235,8 @@ public class TableStoreHiveStorageHandlerITCase {
                                                                 .getLogicalType())
                                         .toArray(LogicalType[]::new),
                                 RandomGenericRowDataGenerator.FIELD_NAMES.toArray(new String[0])),
-                        new DeduplicateMergeFunction(),
+                        Collections.emptyList(),
+                        Collections.singletonList("f_int"),
                         (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
                         k -> 0);
 
@@ -382,14 +368,11 @@ public class TableStoreHiveStorageHandlerITCase {
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
                         conf,
-                        RowType.of(),
                         RowType.of(
                                 new LogicalType[] {DataTypes.INT().getLogicalType()},
                                 new String[] {"a"}),
-                        RowType.of(
-                                new LogicalType[] {DataTypes.BIGINT().getLogicalType()},
-                                new String[] {"_VALUE_COUNT"}),
-                        new ValueCountMergeFunction(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
                         (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
                         k -> 0);
 
@@ -474,17 +457,14 @@ public class TableStoreHiveStorageHandlerITCase {
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
                         conf,
-                        RowType.of(),
                         RowType.of(
                                 new LogicalType[] {
                                     DataTypes.DATE().getLogicalType(),
                                     DataTypes.TIMESTAMP(3).getLogicalType()
                                 },
                                 new String[] {"dt", "ts"}),
-                        RowType.of(
-                                new LogicalType[] {DataTypes.BIGINT().getLogicalType()},
-                                new String[] {"_VALUE_COUNT"}),
-                        new ValueCountMergeFunction(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
                         (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
                         k -> 0);
 
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 4fc6fa53..0edb2548 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -28,10 +28,7 @@ import org.apache.flink.table.data.binary.BinaryRowDataUtil;
 import org.apache.flink.table.store.FileStoreTestHelper;
 import org.apache.flink.table.store.RowDataContainer;
 import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -39,6 +36,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -59,17 +57,14 @@ public class TableStoreRecordReaderTest {
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
                         conf,
-                        RowType.of(),
-                        RowType.of(
-                                new LogicalType[] {DataTypes.BIGINT().getLogicalType()},
-                                new String[] {"_KEY_a"}),
                         RowType.of(
                                 new LogicalType[] {
                                     DataTypes.BIGINT().getLogicalType(),
                                     DataTypes.STRING().getLogicalType()
                                 },
                                 new String[] {"a", "b"}),
-                        new DeduplicateMergeFunction(),
+                        Collections.emptyList(),
+                        Collections.singletonList("a"),
                         (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
                         k -> 0);
 
@@ -95,8 +90,8 @@ public class TableStoreRecordReaderTest {
                 GenericRowData.of(2L, StringData.fromString("Hello")));
         helper.commit();
 
-        Tuple2<RecordReader<KeyValue>, Long> tuple = helper.read(BinaryRowDataUtil.EMPTY_ROW, 0);
-        TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, false, tuple.f1);
+        Tuple2<RecordReader<RowData>, Long> tuple = helper.read(BinaryRowDataUtil.EMPTY_ROW, 0);
+        TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, tuple.f1);
         RowDataContainer container = reader.createValue();
         Set<String> actual = new HashSet<>();
         while (reader.next(null, container)) {
@@ -119,17 +114,14 @@ public class TableStoreRecordReaderTest {
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
                         conf,
-                        RowType.of(),
                         RowType.of(
                                 new LogicalType[] {
                                     DataTypes.INT().getLogicalType(),
                                     DataTypes.STRING().getLogicalType()
                                 },
-                                new String[] {"_KEY_a", "_KEY_b"}),
-                        RowType.of(
-                                new LogicalType[] {DataTypes.BIGINT().getLogicalType()},
-                                new String[] {"_VALUE_COUNT"}),
-                        new ValueCountMergeFunction(),
+                                new String[] {"a", "b"}),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
                         (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
                         k -> 0);
 
@@ -159,8 +151,8 @@ public class TableStoreRecordReaderTest {
                 GenericRowData.of(1L));
         helper.commit();
 
-        Tuple2<RecordReader<KeyValue>, Long> tuple = helper.read(BinaryRowDataUtil.EMPTY_ROW, 0);
-        TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, true, tuple.f1);
+        Tuple2<RecordReader<RowData>, Long> tuple = helper.read(BinaryRowDataUtil.EMPTY_ROW, 0);
+        TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, tuple.f1);
         RowDataContainer container = reader.createValue();
         Map<String, Integer> actual = new HashMap<>();
         while (reader.next(null, container)) {