You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/30 06:41:50 UTC

[flink-table-store] branch master updated: [FLINK-28292] Get rid of BulkReaderFormatFactory and BulkWriterFormatFactory

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new c8c3cd1d [FLINK-28292] Get rid of BulkReaderFormatFactory and BulkWriterFormatFactory
c8c3cd1d is described below

commit c8c3cd1d93012d6eba6a7f7639435d489e0cffd8
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jun 30 14:41:47 2022 +0800

    [FLINK-28292] Get rid of BulkReaderFormatFactory and BulkWriterFormatFactory
    
    This closes #180
---
 flink-table-store-common/pom.xml                   |  23 +++
 .../flink/table/store/format}/FieldStats.java      |   2 +-
 .../table/store/format}/FieldStatsCollector.java   |  16 +-
 .../flink/table/store/format/FileFormat.java       | 107 ++++++++++++
 .../table/store}/format/FileFormatFactory.java     |   2 +-
 .../table/store/format}/FileStatsExtractor.java    |   2 +-
 .../utils/RowDataToObjectArrayConverter.java       |   2 +-
 .../store/format}/FileStatsExtractorTestBase.java  |   7 +-
 .../flink/table/store}/utils/RowDataUtilsTest.java |   0
 flink-table-store-connector/pom.xml                |  55 ++----
 .../source/TestChangelogDataReadWrite.java         |   8 +-
 flink-table-store-core/pom.xml                     |   4 +-
 .../flink/table/store/file/FileStoreOptions.java   |   8 +-
 .../table/store/file/data/AppendOnlyWriter.java    |   2 +-
 .../table/store/file/data/DataFileReader.java      |   2 +-
 .../table/store/file/data/DataFileWriter.java      |   6 +-
 .../flink/table/store/file/format/FileFormat.java  | 184 ---------------------
 .../table/store/file/format/FileFormatImpl.java    |  57 -------
 .../table/store/file/manifest/ManifestFile.java    |  11 +-
 .../table/store/file/manifest/ManifestList.java    |   2 +-
 .../file/operation/AbstractFileStoreScan.java      |   2 +-
 .../file/operation/AppendOnlyFileStoreRead.java    |   2 +-
 .../file/operation/AppendOnlyFileStoreWrite.java   |   2 +-
 .../store/file/operation/FileStoreCommitImpl.java  |   2 +-
 .../file/operation/KeyValueFileStoreRead.java      |   2 +-
 .../file/operation/KeyValueFileStoreWrite.java     |   2 +-
 .../flink/table/store/file/predicate/And.java      |   2 +-
 .../store/file/predicate/CompoundPredicate.java    |   2 +-
 .../flink/table/store/file/predicate/Equal.java    |   2 +-
 .../table/store/file/predicate/GreaterOrEqual.java |   2 +-
 .../table/store/file/predicate/GreaterThan.java    |   2 +-
 .../table/store/file/predicate/IsNotNull.java      |   2 +-
 .../flink/table/store/file/predicate/IsNull.java   |   2 +-
 .../store/file/predicate/LeafBinaryFunction.java   |   2 +-
 .../table/store/file/predicate/LeafFunction.java   |   2 +-
 .../table/store/file/predicate/LeafPredicate.java  |   2 +-
 .../store/file/predicate/LeafUnaryFunction.java    |   2 +-
 .../table/store/file/predicate/LessOrEqual.java    |   2 +-
 .../flink/table/store/file/predicate/LessThan.java |   2 +-
 .../flink/table/store/file/predicate/NotEqual.java |   2 +-
 .../flink/table/store/file/predicate/Or.java       |   2 +-
 .../table/store/file/predicate/Predicate.java      |   2 +-
 .../table/store/file/predicate/StartsWith.java     |   2 +-
 .../table/store/file/stats/BinaryTableStats.java   |   1 +
 .../file/stats/FieldStatsArraySerializer.java      |   3 +-
 .../flink/table/store/file/writer/Metric.java      |   2 +-
 .../table/store/file/writer/MetricFileWriter.java  |   8 +-
 .../flink/table/store/file/FileFormatTest.java     |  11 +-
 .../store/file/data/AppendOnlyWriterTest.java      |   8 +-
 .../store/file/data/DataFileTestDataGenerator.java |  12 +-
 .../store/file}/format/FileFormatSuffixTest.java   |  20 +--
 .../file/format/FileStatsExtractingAvroFormat.java |  27 ++-
 .../FileStatsExtractingAvroFormatFactory.java      |   2 +
 .../store/file/format/FlushingFileFormat.java      |  23 ++-
 .../store/file/manifest/ManifestFileMetaTest.java  |   6 +-
 .../store/file/manifest/ManifestFileTest.java      |   6 +-
 .../store/file/manifest/ManifestListTest.java      |   6 +-
 .../file/manifest/ManifestTestDataGenerator.java   |   7 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |   2 +-
 .../table/store/file/predicate/PredicateTest.java  |   2 +-
 .../store/file/stats/FieldStatsCollectorTest.java  |  10 +-
 .../table/store/file/stats/StatsTestUtils.java     |   1 +
 .../store/file/stats/TestFileStatsExtractor.java   |   7 +-
 ...che.flink.table.store.format.FileFormatFactory} |   0
 .../store/tests/FileStoreFlinkFormatE2eTest.java   |  63 -------
 flink-table-store-format/pom.xml                   |  30 ++--
 .../table/store/format/avro/AvroFileFormat.java    | 150 +++++++++++++++--
 .../store/format/avro/AvroFileFormatFactory.java   |   4 +-
 .../table/store/format/orc/OrcFileFormat.java      |  85 ++++++++--
 .../store/format/orc/OrcFileFormatFactory.java     |   6 +-
 .../store/format/orc/OrcFileStatsExtractor.java    |   4 +-
 ...che.flink.table.store.format.FileFormatFactory} |   0
 .../table/store/format/BulkFileFormatTest.java     |  16 +-
 .../format/orc/OrcFileStatsExtractorTest.java      |   7 +-
 74 files changed, 531 insertions(+), 544 deletions(-)

diff --git a/flink-table-store-common/pom.xml b/flink-table-store-common/pom.xml
index c6f5cbbf..f9e8a905 100644
--- a/flink-table-store-common/pom.xml
+++ b/flink-table-store-common/pom.xml
@@ -38,5 +38,28 @@ under the License.
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
similarity index 97%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
index 55a07424..d2ec5467 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.stats;
+package org.apache.flink.table.store.format;
 
 import javax.annotation.Nullable;
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStatsCollector.java
similarity index 85%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStatsCollector.java
index 5611a16f..e86fa406 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStatsCollector.java
@@ -16,12 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.stats;
+package org.apache.flink.table.store.format;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
-import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
+import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
 import org.apache.flink.table.types.logical.RowType;
 
 /** Collector to extract statistics of each fields from a series of records. */
@@ -32,7 +32,6 @@ public class FieldStatsCollector {
     private final long[] nullCounts;
     private final RowDataToObjectArrayConverter converter;
     private final TypeSerializer<Object>[] fieldSerializers;
-    private final FieldStatsArraySerializer fieldStatsArraySerializer;
 
     public FieldStatsCollector(RowType rowType) {
         int numFields = rowType.getFieldCount();
@@ -44,7 +43,6 @@ public class FieldStatsCollector {
         for (int i = 0; i < numFields; i++) {
             fieldSerializers[i] = InternalSerializers.create(rowType.getTypeAt(i));
         }
-        this.fieldStatsArraySerializer = new FieldStatsArraySerializer(rowType);
     }
 
     /**
@@ -76,15 +74,7 @@ public class FieldStatsCollector {
         }
     }
 
-    public BinaryTableStats extract() {
-        return toBinary(extractFieldStats());
-    }
-
-    public BinaryTableStats toBinary(FieldStats[] stats) {
-        return fieldStatsArraySerializer.toBinary(stats);
-    }
-
-    public FieldStats[] extractFieldStats() {
+    public FieldStats[] extract() {
         FieldStats[] stats = new FieldStats[nullCounts.length];
         for (int i = 0; i < stats.length; i++) {
             stats[i] = new FieldStats(minValues[i], maxValues[i], nullCounts[i]);
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
new file mode 100644
index 00000000..8bfcd2dd
--- /dev/null
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
+
+/**
+ * Factory class which creates reader and writer factories for specific file format.
+ *
+ * <p>NOTE: This class must be thread safe.
+ */
+public abstract class FileFormat {
+
+    protected String formatIdentifier;
+
+    protected FileFormat(String formatIdentifier) {
+        this.formatIdentifier = formatIdentifier;
+    }
+
+    public String getFormatIdentifier() {
+        return formatIdentifier;
+    }
+
+    /**
+     * Create a {@link BulkFormat} from the type, with projection pushed down.
+     *
+     * @param type Type without projection.
+     * @param projection See {@link org.apache.flink.table.connector.Projection#toNestedIndexes()}.
+     * @param filters A list of filters in conjunctive form for filtering on a best-effort basis.
+     */
+    public abstract BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+            RowType type, int[][] projection, List<ResolvedExpression> filters);
+
+    /** Create a {@link BulkWriter.Factory} from the type. */
+    public abstract BulkWriter.Factory<RowData> createWriterFactory(RowType type);
+
+    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(RowType rowType) {
+        int[][] projection = new int[rowType.getFieldCount()][];
+        for (int i = 0; i < projection.length; i++) {
+            projection[i] = new int[] {i};
+        }
+        return createReaderFactory(rowType, projection);
+    }
+
+    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+            RowType rowType, int[][] projection) {
+        return createReaderFactory(rowType, projection, new ArrayList<>());
+    }
+
+    public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+        return Optional.empty();
+    }
+
+    /** Create a {@link FileFormat} from table options. */
+    public static FileFormat fromTableOptions(
+            Configuration tableOptions, ConfigOption<String> formatOption) {
+        String formatIdentifier = tableOptions.get(formatOption);
+        DelegatingConfiguration formatOptions =
+                new DelegatingConfiguration(tableOptions, formatIdentifier + ".");
+        return fromIdentifier(formatIdentifier, formatOptions);
+    }
+
+    /** Create a {@link FileFormat} from format identifier and format options. */
+    public static FileFormat fromIdentifier(String formatIdentifier, Configuration formatOptions) {
+        ServiceLoader<FileFormatFactory> serviceLoader =
+                ServiceLoader.load(FileFormatFactory.class);
+        for (FileFormatFactory factory : serviceLoader) {
+            if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
+                return factory.create(formatOptions);
+            }
+        }
+        throw new ValidationException(
+                String.format(
+                        "Could not find any factories that implement '%s' in the classpath.",
+                        FileFormatFactory.class.getName()));
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormatFactory.java
similarity index 95%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormatFactory.java
index 088c11cd..f0ff0c01 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormatFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.format;
+package org.apache.flink.table.store.format;
 
 import org.apache.flink.configuration.Configuration;
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FileStatsExtractor.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileStatsExtractor.java
similarity index 95%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FileStatsExtractor.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileStatsExtractor.java
index 3f56d3b3..384a2737 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FileStatsExtractor.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileStatsExtractor.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.stats;
+package org.apache.flink.table.store.format;
 
 import org.apache.flink.core.fs.Path;
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataToObjectArrayConverter.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
similarity index 97%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataToObjectArrayConverter.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
index 0def1c1c..ad708451 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataToObjectArrayConverter.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.utils;
+package org.apache.flink.table.store.utils;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FileStatsExtractorTestBase.java b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
similarity index 97%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FileStatsExtractorTestBase.java
rename to flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
index 42b06c67..90a53f7f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FileStatsExtractorTestBase.java
+++ b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.stats;
+package org.apache.flink.table.store.format;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -29,7 +29,6 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.binary.BinaryStringData;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DecimalType;
@@ -50,7 +49,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link FileStatsExtractor}. */
+/** Tests for {@link org.apache.flink.table.store.format.FileStatsExtractor}. */
 public abstract class FileStatsExtractorTestBase {
 
     @TempDir java.nio.file.Path tempDir;
@@ -76,7 +75,7 @@ public abstract class FileStatsExtractorTestBase {
         for (GenericRowData row : data) {
             collector.collect(row);
         }
-        FieldStats[] expected = collector.extractFieldStats();
+        FieldStats[] expected = collector.extract();
 
         FileStatsExtractor extractor = format.createStatsExtractor(rowType).get();
         assertThat(extractor).isNotNull();
diff --git a/flink-table-store-common/src/test/java/org.apache.flink.table.store/utils/RowDataUtilsTest.java b/flink-table-store-common/src/test/java/org/apache/flink/table/store/utils/RowDataUtilsTest.java
similarity index 100%
rename from flink-table-store-common/src/test/java/org.apache.flink.table.store/utils/RowDataUtilsTest.java
rename to flink-table-store-common/src/test/java/org/apache/flink/table/store/utils/RowDataUtilsTest.java
diff --git a/flink-table-store-connector/pom.xml b/flink-table-store-connector/pom.xml
index dfde4ed4..6af82508 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -70,6 +70,21 @@ under the License.
 
         <!-- test dependencies -->
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-format</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Tests: Hadoop required by ORC -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-streaming-java</artifactId>
@@ -135,13 +150,6 @@ under the License.
             </exclusions>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-avro</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
@@ -157,38 +165,6 @@ under the License.
             <scope>test</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-orc</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- Tests: Hadoop required by ORC -->
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs-client</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>test</scope>
-        </dependency>
-
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
@@ -208,6 +184,7 @@ under the License.
             </exclusions>
             <scope>test</scope>
         </dependency>
+
         <dependency>
             <!-- include 2.0 server for tests  -->
             <groupId>org.apache.kafka</groupId>
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index feba9da2..a99d05c0 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
 import org.apache.flink.table.store.file.memory.MemoryOwner;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
@@ -39,6 +38,7 @@ import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.table.source.KeyValueTableRead;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
@@ -74,11 +74,7 @@ public class TestChangelogDataReadWrite {
     private final ExecutorService service;
 
     public TestChangelogDataReadWrite(String root, ExecutorService service) {
-        this.avro =
-                FileFormat.fromIdentifier(
-                        Thread.currentThread().getContextClassLoader(),
-                        "avro",
-                        new Configuration());
+        this.avro = FileFormat.fromIdentifier("avro", new Configuration());
         this.tablePath = new Path(root);
         this.pathFactory =
                 new FileStorePathFactory(
diff --git a/flink-table-store-core/pom.xml b/flink-table-store-core/pom.xml
index b880507d..fbabafff 100644
--- a/flink-table-store-core/pom.xml
+++ b/flink-table-store-core/pom.xml
@@ -85,8 +85,8 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-avro</artifactId>
-            <version>${flink.version}</version>
+            <artifactId>flink-table-store-format</artifactId>
+            <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 0d01be5d..43065064 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -27,8 +27,8 @@ import org.apache.flink.configuration.description.Description;
 import org.apache.flink.configuration.description.InlineElement;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
@@ -219,13 +219,11 @@ public class FileStoreOptions implements Serializable {
     }
 
     public FileFormat fileFormat() {
-        return FileFormat.fromTableOptions(
-                Thread.currentThread().getContextClassLoader(), options, FILE_FORMAT);
+        return FileFormat.fromTableOptions(options, FILE_FORMAT);
     }
 
     public FileFormat manifestFormat() {
-        return FileFormat.fromTableOptions(
-                Thread.currentThread().getContextClassLoader(), options, MANIFEST_FORMAT);
+        return FileFormat.fromTableOptions(options, MANIFEST_FORMAT);
     }
 
     public MemorySize manifestTargetSize() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
index 8451064d..8a7105e7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.data;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.stats.BinaryTableStats;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
@@ -32,6 +31,7 @@ import org.apache.flink.table.store.file.writer.Metric;
 import org.apache.flink.table.store.file.writer.MetricFileWriter;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.file.writer.RollingFileWriter;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
index 098cf430..45344989 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
@@ -27,11 +27,11 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializer;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
index 91ac96d9..4f348cc9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
@@ -26,11 +26,8 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializer;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.stats.BinaryTableStats;
-import org.apache.flink.table.store.file.stats.FieldStats;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.writer.BaseFileWriter;
@@ -38,6 +35,9 @@ import org.apache.flink.table.store.file.writer.FileWriter;
 import org.apache.flink.table.store.file.writer.Metric;
 import org.apache.flink.table.store.file.writer.MetricFileWriter;
 import org.apache.flink.table.store.file.writer.RollingFileWriter;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.CloseableIterator;
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
deleted file mode 100644
index ce7a698f..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.format;
-
-import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
-import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
-import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.ServiceLoader;
-
-import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
-
-/**
- * Factory class which creates reader and writer factories for specific file format.
- *
- * <p>NOTE: This class must be thread safe.
- */
-public abstract class FileFormat {
-
-    protected String formatIdentifier;
-
-    protected FileFormat(String formatIdentifier) {
-        this.formatIdentifier = formatIdentifier;
-    }
-
-    protected abstract BulkDecodingFormat<RowData> getDecodingFormat();
-
-    protected abstract EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat();
-
-    public String getFormatIdentifier() {
-        return formatIdentifier;
-    }
-
-    /**
-     * Create a {@link BulkFormat} from the type, with projection pushed down.
-     *
-     * @param type Type without projection.
-     * @param projection See {@link org.apache.flink.table.connector.Projection#toNestedIndexes()}.
-     * @param filters A list of filters in conjunctive form for filtering on a best-effort basis.
-     */
-    @SuppressWarnings("unchecked")
-    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<ResolvedExpression> filters) {
-        BulkDecodingFormat<RowData> decodingFormat = getDecodingFormat();
-        // TODO use ProjectingBulkFormat if not supported
-        Preconditions.checkState(
-                decodingFormat instanceof ProjectableDecodingFormat,
-                "Format "
-                        + decodingFormat.getClass().getName()
-                        + " does not support projection push down");
-        decodingFormat.applyFilters(filters);
-        return ((ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>) decodingFormat)
-                .createRuntimeDecoder(SOURCE_CONTEXT, fromLogicalToDataType(type), projection);
-    }
-
-    /** Create a {@link BulkWriter.Factory} from the type. */
-    public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
-        return getEncodingFormat().createRuntimeEncoder(SINK_CONTEXT, fromLogicalToDataType(type));
-    }
-
-    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(RowType rowType) {
-        int[][] projection = new int[rowType.getFieldCount()][];
-        for (int i = 0; i < projection.length; i++) {
-            projection[i] = new int[] {i};
-        }
-        return createReaderFactory(rowType, projection);
-    }
-
-    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType rowType, int[][] projection) {
-        return createReaderFactory(rowType, projection, new ArrayList<>());
-    }
-
-    public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
-        return Optional.empty();
-    }
-
-    /** Create a {@link FileFormatImpl} from table options. */
-    public static FileFormat fromTableOptions(
-            ClassLoader classLoader,
-            Configuration tableOptions,
-            ConfigOption<String> formatOption) {
-        String formatIdentifier = tableOptions.get(formatOption);
-        DelegatingConfiguration formatOptions =
-                new DelegatingConfiguration(tableOptions, formatIdentifier + ".");
-        return fromIdentifier(classLoader, formatIdentifier, formatOptions);
-    }
-
-    /** Create a {@link FileFormatImpl} from format identifier and format options. */
-    public static FileFormat fromIdentifier(
-            ClassLoader classLoader, String formatIdentifier, Configuration formatOptions) {
-        ServiceLoader<FileFormatFactory> serviceLoader =
-                ServiceLoader.load(FileFormatFactory.class);
-        for (FileFormatFactory factory : serviceLoader) {
-            if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
-                return factory.create(formatOptions);
-            }
-        }
-        return new FileFormatImpl(classLoader, formatIdentifier, formatOptions);
-    }
-
-    private static final DynamicTableSink.Context SINK_CONTEXT =
-            new DynamicTableSink.Context() {
-
-                @Override
-                public boolean isBounded() {
-                    return false;
-                }
-
-                @Override
-                public <T> TypeInformation<T> createTypeInformation(DataType consumedDataType) {
-                    return InternalTypeInfo.of(consumedDataType.getLogicalType());
-                }
-
-                @Override
-                public <T> TypeInformation<T> createTypeInformation(
-                        LogicalType consumedLogicalType) {
-                    return InternalTypeInfo.of(consumedLogicalType);
-                }
-
-                @Override
-                public DynamicTableSink.DataStructureConverter createDataStructureConverter(
-                        DataType consumedDataType) {
-                    throw new UnsupportedOperationException();
-                }
-            };
-
-    private static final DynamicTableSource.Context SOURCE_CONTEXT =
-            new DynamicTableSource.Context() {
-
-                @Override
-                public <T> TypeInformation<T> createTypeInformation(DataType consumedDataType) {
-                    return InternalTypeInfo.of(consumedDataType.getLogicalType());
-                }
-
-                @Override
-                public <T> TypeInformation<T> createTypeInformation(
-                        LogicalType producedLogicalType) {
-                    return InternalTypeInfo.of(producedLogicalType);
-                }
-
-                @Override
-                public DynamicTableSource.DataStructureConverter createDataStructureConverter(
-                        DataType consumedDataType) {
-                    throw new UnsupportedOperationException();
-                }
-            };
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
deleted file mode 100644
index ae2e27a0..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.format;
-
-import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
-import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
-import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.FactoryUtil;
-
-/** A {@link FileFormat} which discovers reader and writer from format identifier. */
-public class FileFormatImpl extends FileFormat {
-
-    private final BulkReaderFormatFactory readerFactory;
-    private final BulkWriterFormatFactory writerFactory;
-    private final ReadableConfig formatOptions;
-
-    public FileFormatImpl(
-            ClassLoader classLoader, String formatIdentifier, ReadableConfig formatOptions) {
-        super(formatIdentifier);
-        this.readerFactory =
-                FactoryUtil.discoverFactory(
-                        classLoader, BulkReaderFormatFactory.class, formatIdentifier);
-        this.writerFactory =
-                FactoryUtil.discoverFactory(
-                        classLoader, BulkWriterFormatFactory.class, formatIdentifier);
-        this.formatOptions = formatOptions;
-    }
-
-    protected BulkDecodingFormat<RowData> getDecodingFormat() {
-        return readerFactory.createDecodingFormat(null, formatOptions); // context is useless
-    }
-
-    @Override
-    protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
-        return writerFactory.createEncodingFormat(null, formatOptions); // context is useless
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index f77c697b..3e3c4f35 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -24,10 +24,8 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.stats.FieldStatsCollector;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
@@ -36,6 +34,9 @@ import org.apache.flink.table.store.file.writer.FileWriter;
 import org.apache.flink.table.store.file.writer.Metric;
 import org.apache.flink.table.store.file.writer.MetricFileWriter;
 import org.apache.flink.table.store.file.writer.RollingFileWriter;
+import org.apache.flink.table.store.format.FieldStatsCollector;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
@@ -153,6 +154,7 @@ public class ManifestFile {
     private class ManifestEntryWriter extends BaseFileWriter<ManifestEntry, ManifestFileMeta> {
 
         private final FieldStatsCollector partitionStatsCollector;
+        private final FieldStatsArraySerializer partitionStatsSerializer;
         private long numAddedFiles = 0;
         private long numDeletedFiles = 0;
 
@@ -161,6 +163,7 @@ public class ManifestFile {
             super(writerFactory, path);
 
             this.partitionStatsCollector = new FieldStatsCollector(partitionType);
+            this.partitionStatsSerializer = new FieldStatsArraySerializer(partitionType);
         }
 
         @Override
@@ -190,7 +193,7 @@ public class ManifestFile {
                     path.getFileSystem().getFileStatus(path).getLen(),
                     numAddedFiles,
                     numDeletedFiles,
-                    partitionStatsCollector.extract(),
+                    partitionStatsSerializer.toBinary(partitionStatsCollector.extract()),
                     schemaId);
         }
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
index 8d3cc827..41a25b98 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
@@ -25,10 +25,10 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.IOException;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 3fbb2653..dbec3d80 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -31,8 +31,8 @@ import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
index 346cac87..20b2805e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -25,11 +25,11 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.data.AppendOnlyReader;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.RowType;
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index 29cf3177..7d8cc440 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -23,11 +23,11 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.data.AppendOnlyWriter;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index f6a1404e..58b5739f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -34,8 +34,8 @@ import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateConverter;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.MetaFileWriter;
-import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.slf4j.Logger;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index f9eb3b2a..8deb78f3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileReader;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
@@ -31,6 +30,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.ProjectKeyRecordReader;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.RowType;
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index c546a5d7..0063c752 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileReader;
 import org.apache.flink.table.store.file.data.DataFileWriter;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.Levels;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
@@ -42,6 +41,7 @@ import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
index 313ffe77..dc256767 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
index 4be05b2a..d823da0d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 
 import java.io.Serializable;
 import java.util.List;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
index cf9a0f99..a59736ab 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
index e93857e1..4a59066d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
index 3e771ae0..7d517460 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
index 093ad663..1095112a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
index 5aa48423..fde5197d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java
index 0b91cb70..3f2b6c73 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.List;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
index 97bfcebd..00c852cc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.io.Serializable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
index 2e8d6c81..17a52f76 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import javax.annotation.Nullable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
index aa414c68..72dcfbb1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.List;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
index 784fef38..72382a53 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
index 5ef21753..46c02a84 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
index 71379516..2d3002ca 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
index 5a74bd6a..9ba91286 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
index 7e7c1faa..3f6748d8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 
 import java.io.Serializable;
 import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
index cf1b3db2..10b5faf6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.data.binary.BinaryStringData;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
index 5549358c..9651bcb9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.format.FieldStats;
 
 import javax.annotation.Nullable;
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
index 5a5a48a0..4ae0ce3c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -20,7 +20,8 @@ package org.apache.flink.table.store.file.stats;
 
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
-import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java
index f80381d0..b41545c3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.table.store.file.writer;
 
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 
 /** Metric information to describe the column's max-min values, record count etc. */
 public class Metric {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java
index 676e096a..26e26953 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java
@@ -24,10 +24,10 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.stats.FieldStats;
-import org.apache.flink.table.store.file.stats.FieldStatsCollector;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
 import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FieldStatsCollector;
+import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
@@ -127,7 +127,7 @@ public class MetricFileWriter<T> implements FileWriter<T, Metric> {
         if (fileStatsExtractor != null) {
             stats = fileStatsExtractor.extract(path);
         } else {
-            stats = fieldStatsCollector.extractFieldStats();
+            stats = fieldStatsCollector.extract();
         }
 
         return new Metric(stats, recordCount, length);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
index ce08143b..e46fd09f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
@@ -28,12 +28,10 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.format.FileFormatImpl;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.apache.avro.AvroRuntimeException;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -44,7 +42,7 @@ import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link FileFormatImpl}. */
+/** Test for {@link FileFormat}. */
 public class FileFormatTest {
 
     @Test
@@ -89,7 +87,7 @@ public class FileFormatTest {
                 createFileFormat("_unsupported").createWriterFactory(RowType.of(new IntType()));
         Path path = new Path(tempDir.toUri().toString(), "1.avro");
         Assertions.assertThrows(
-                AvroRuntimeException.class,
+                RuntimeException.class,
                 () ->
                         writerFactory.create(
                                 path.getFileSystem()
@@ -101,7 +99,6 @@ public class FileFormatTest {
         Configuration tableOptions = new Configuration();
         tableOptions.set(FileStoreOptions.FILE_FORMAT, "avro");
         tableOptions.setString("avro.codec", codec);
-        return FileFormat.fromTableOptions(
-                this.getClass().getClassLoader(), tableOptions, FileStoreOptions.FILE_FORMAT);
+        return FileFormat.fromTableOptions(tableOptions, FileStoreOptions.FILE_FORMAT);
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index 774676ad..5cfb2f73 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -26,11 +26,11 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryRowDataUtil;
 import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.stats.FieldStats;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -220,9 +220,7 @@ public class AppendOnlyWriterTest {
 
     private RecordWriter<RowData> createWriter(
             long targetFileSize, RowType writeSchema, long maxSeqNum) {
-        FileFormat fileFormat =
-                FileFormat.fromIdentifier(
-                        Thread.currentThread().getContextClassLoader(), AVRO, new Configuration());
+        FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Configuration());
 
         return new AppendOnlyWriter(
                 0, fileFormat, targetFileSize, writeSchema, maxSeqNum, pathFactory);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
index 86345713..b5369dca 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
@@ -21,7 +21,8 @@ package org.apache.flink.table.store.file.data;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.format.FieldStatsCollector;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -103,6 +104,11 @@ public class DataFileTestDataGenerator {
                 new FieldStatsCollector(TestKeyValueGenerator.KEY_TYPE);
         FieldStatsCollector valueStatsCollector =
                 new FieldStatsCollector(TestKeyValueGenerator.DEFAULT_ROW_TYPE);
+        FieldStatsArraySerializer keyStatsSerializer =
+                new FieldStatsArraySerializer(TestKeyValueGenerator.KEY_TYPE);
+        FieldStatsArraySerializer valueStatsSerializer =
+                new FieldStatsArraySerializer(TestKeyValueGenerator.DEFAULT_ROW_TYPE);
+
         long totalSize = 0;
         BinaryRowData minKey = null;
         BinaryRowData maxKey = null;
@@ -133,8 +139,8 @@ public class DataFileTestDataGenerator {
                         kvs.size(),
                         minKey,
                         maxKey,
-                        keyStatsCollector.extract(),
-                        valueStatsCollector.extract(),
+                        keyStatsSerializer.toBinary(keyStatsCollector.extract()),
+                        valueStatsSerializer.toBinary(valueStatsCollector.extract()),
                         minSequenceNumber,
                         maxSequenceNumber,
                         0,
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
similarity index 81%
rename from flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
index 75ba824c..2288c931 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.format;
+package org.apache.flink.table.store.file.format;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -27,16 +27,15 @@ import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
 import org.apache.flink.table.store.file.data.DataFileTest;
 import org.apache.flink.table.store.file.data.DataFileWriter;
-import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
 
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.List;
 
@@ -48,21 +47,16 @@ public class FileFormatSuffixTest extends DataFileTest {
                     new LogicalType[] {new IntType(), new VarCharType(), new VarCharType()},
                     new String[] {"id", "name", "dt"});
 
-    @ParameterizedTest
-    @ValueSource(strings = {"avro", "parquet", "orc"})
-    public void testFileSuffix(String format, @TempDir java.nio.file.Path tempDir)
-            throws Exception {
+    @Test
+    public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception {
+        String format = "avro";
         DataFileWriter dataFileWriter = createDataFileWriter(tempDir.toString(), format);
         Path path = dataFileWriter.pathFactory().newPath();
         Assertions.assertTrue(path.getPath().endsWith(format));
 
         DataFilePathFactory dataFilePathFactory =
                 new DataFilePathFactory(new Path(tempDir.toString()), "dt=1", 1, format);
-        FileFormat fileFormat =
-                FileFormat.fromIdentifier(
-                        Thread.currentThread().getContextClassLoader(),
-                        format,
-                        new Configuration());
+        FileFormat fileFormat = FileFormat.fromIdentifier(format, new Configuration());
         AppendOnlyWriter appendOnlyWriter =
                 new AppendOnlyWriter(0, fileFormat, 10, SCHEMA, 10, dataFilePathFactory);
         appendOnlyWriter.write(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
index 0bbec2e3..29ebd0c3 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
@@ -18,18 +18,39 @@
 
 package org.apache.flink.table.store.file.format;
 
+import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.store.file.stats.TestFileStatsExtractor;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.util.List;
 import java.util.Optional;
 
 /** An avro {@link FileFormat} for test. It provides a {@link FileStatsExtractor}. */
-public class FileStatsExtractingAvroFormat extends FileFormatImpl {
+public class FileStatsExtractingAvroFormat extends FileFormat {
+
+    private final FileFormat avro;
 
     public FileStatsExtractingAvroFormat() {
-        super(FileStatsExtractingAvroFormat.class.getClassLoader(), "avro", new Configuration());
+        super("avro");
+        avro = FileFormat.fromIdentifier("avro", new Configuration());
+    }
+
+    @Override
+    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+            RowType type, int[][] projection, List<ResolvedExpression> filters) {
+        return avro.createReaderFactory(type, projection, filters);
+    }
+
+    @Override
+    public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
+        return avro.createWriterFactory(type);
     }
 
     @Override
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
index 9c841766..ee89402a 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.store.file.format;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormatFactory;
 
 /** Factory to create {@link FileStatsExtractingAvroFormat}. */
 public class FileStatsExtractingAvroFormatFactory implements FileFormatFactory {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
index 874bec22..23569b21 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
@@ -20,12 +20,15 @@ package org.apache.flink.table.store.file.format;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.IOException;
+import java.util.List;
 
 /** A special {@link FileFormat} which flushes for every added element. */
 public class FlushingFileFormat extends FileFormat {
@@ -34,26 +37,20 @@ public class FlushingFileFormat extends FileFormat {
 
     public FlushingFileFormat(String identifier) {
         super(identifier);
-        this.format =
-                FileFormat.fromIdentifier(
-                        FlushingFileFormat.class.getClassLoader(), identifier, new Configuration());
+        this.format = FileFormat.fromIdentifier(identifier, new Configuration());
     }
 
     @Override
-    protected BulkDecodingFormat<RowData> getDecodingFormat() {
-        return format.getDecodingFormat();
-    }
-
-    @Override
-    protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
-        return format.getEncodingFormat();
+    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+            RowType type, int[][] projection, List<ResolvedExpression> filters) {
+        return format.createReaderFactory(type, projection, filters);
     }
 
     @Override
     public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
         return fsDataOutputStream -> {
             BulkWriter<RowData> wrapped =
-                    super.createWriterFactory(type).create(fsDataOutputStream);
+                    format.createWriterFactory(type).create(fsDataOutputStream);
             return new BulkWriter<RowData>() {
                 @Override
                 public void addElement(RowData rowData) throws IOException {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index cf89de9b..43811b7a 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -25,11 +25,11 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -61,9 +61,7 @@ public class ManifestFileMetaTest {
     private ManifestFile manifestFile;
 
     public ManifestFileMetaTest() {
-        this.avro =
-                FileFormat.fromIdentifier(
-                        ManifestFileMetaTest.class.getClassLoader(), "avro", new Configuration());
+        this.avro = FileFormat.fromIdentifier("avro", new Configuration());
     }
 
     @BeforeEach
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index 1f6cf287..3ad75724 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -22,11 +22,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.format.FileFormat;
 
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.io.TempDir;
@@ -44,9 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class ManifestFileTest {
 
     private final ManifestTestDataGenerator gen = ManifestTestDataGenerator.builder().build();
-    private final FileFormat avro =
-            FileFormat.fromIdentifier(
-                    ManifestFileTest.class.getClassLoader(), "avro", new Configuration());
+    private final FileFormat avro = FileFormat.fromIdentifier("avro", new Configuration());
 
     @TempDir java.nio.file.Path tempDir;
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index 38a4d68d..6064c1cf 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -23,9 +23,9 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.format.FileFormat;
 
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.io.TempDir;
@@ -41,9 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class ManifestListTest {
 
     private final ManifestTestDataGenerator gen = ManifestTestDataGenerator.builder().build();
-    private final FileFormat avro =
-            FileFormat.fromIdentifier(
-                    ManifestListTest.class.getClassLoader(), "avro", new Configuration());
+    private final FileFormat avro = FileFormat.fromIdentifier("avro", new Configuration());
 
     @TempDir java.nio.file.Path tempDir;
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
index 2eff6284..a2cfa068 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
@@ -22,7 +22,8 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.data.DataFileTestDataGenerator;
-import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.format.FieldStatsCollector;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
@@ -83,6 +84,8 @@ public class ManifestTestDataGenerator {
 
         FieldStatsCollector collector =
                 new FieldStatsCollector(TestKeyValueGenerator.DEFAULT_PART_TYPE);
+        FieldStatsArraySerializer serializer =
+                new FieldStatsArraySerializer(TestKeyValueGenerator.DEFAULT_PART_TYPE);
 
         long numAddedFiles = 0;
         long numDeletedFiles = 0;
@@ -100,7 +103,7 @@ public class ManifestTestDataGenerator {
                 entries.size() * 100L,
                 numAddedFiles,
                 numDeletedFiles,
-                collector.extract(),
+                serializer.toBinary(collector.extract()),
                 0);
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 69ca3a6f..96f4554e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileReader;
 import org.apache.flink.table.store.file.data.DataFileWriter;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.format.FlushingFileFormat;
 import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
@@ -43,6 +42,7 @@ import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
index 3d396ee5..45617789 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
index c3d54cf8..62bf433d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.store.file.stats;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FieldStatsCollector;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -44,7 +46,7 @@ public class FieldStatsCollectorTest {
                         1,
                         StringData.fromString("Flink"),
                         new GenericArrayData(new int[] {1, 10})));
-        assertThat(collector.extractFieldStats())
+        assertThat(collector.extract())
                 .isEqualTo(
                         new FieldStats[] {
                             new FieldStats(1, 1, 0),
@@ -56,7 +58,7 @@ public class FieldStatsCollectorTest {
                         });
 
         collector.collect(GenericRowData.of(3, null, new GenericArrayData(new int[] {3, 30})));
-        assertThat(collector.extractFieldStats())
+        assertThat(collector.extract())
                 .isEqualTo(
                         new FieldStats[] {
                             new FieldStats(1, 3, 0),
@@ -72,7 +74,7 @@ public class FieldStatsCollectorTest {
                         null,
                         StringData.fromString("Apache"),
                         new GenericArrayData(new int[] {2, 20})));
-        assertThat(collector.extractFieldStats())
+        assertThat(collector.extract())
                 .isEqualTo(
                         new FieldStats[] {
                             new FieldStats(1, 3, 1),
@@ -84,7 +86,7 @@ public class FieldStatsCollectorTest {
                         });
 
         collector.collect(GenericRowData.of(2, StringData.fromString("Batch"), null));
-        assertThat(collector.extractFieldStats())
+        assertThat(collector.extract())
                 .isEqualTo(
                         new FieldStats[] {
                             new FieldStats(1, 3, 1),
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
index 1bcc955e..e099e9ec 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file.stats;
 
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java
index 564855d9..4d854aef 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java
@@ -22,9 +22,12 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FieldStatsCollector;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.IOException;
@@ -53,7 +56,7 @@ public class TestFileStatsExtractor implements FileStatsExtractor {
         for (RowData record : records) {
             statsCollector.collect(record);
         }
-        return statsCollector.extractFieldStats();
+        return statsCollector.extract();
     }
 
     private static class IdentityObjectSerializer extends ObjectSerializer<RowData> {
diff --git a/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
similarity index 100%
rename from flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
rename to flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java
deleted file mode 100644
index 0ceee859..00000000
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.tests;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.UUID;
-
-/**
- * Test that file store supports format not included in flink-table-store-format but provided by
- * Flink itself.
- */
-public class FileStoreFlinkFormatE2eTest extends E2eTestBase {
-
-    @Test
-    public void testCsv() throws Exception {
-        String tableStoreDdl =
-                "CREATE TABLE IF NOT EXISTS table_store (\n"
-                        + "    a INT,\n"
-                        + "    b VARCHAR\n"
-                        + ") WITH (\n"
-                        + "    'bucket' = '3',\n"
-                        + "    'root-path' = '%s',\n"
-                        + "    'file.format' = 'csv'\n"
-                        + ");";
-        tableStoreDdl =
-                String.format(
-                        tableStoreDdl,
-                        TEST_DATA_DIR + "/" + UUID.randomUUID().toString() + ".store");
-
-        runSql("INSERT INTO table_store VALUES (1, 'Hi'), (2, 'Hello');", tableStoreDdl);
-        runSql(
-                "INSERT INTO result1 SELECT * FROM table_store;",
-                tableStoreDdl,
-                createResultSink("result1", "a INT, b VARCHAR"));
-        checkResult("1, Hi", "2, Hello");
-    }
-
-    private void runSql(String sql, String... ddls) throws Exception {
-        runSql(
-                "SET 'execution.runtime-mode' = 'batch';\n"
-                        + "SET 'table.dml-sync' = 'true';\n"
-                        + String.join("\n", ddls)
-                        + "\n"
-                        + sql);
-    }
-}
diff --git a/flink-table-store-format/pom.xml b/flink-table-store-format/pom.xml
index 1b582ec9..80fb0611 100644
--- a/flink-table-store-format/pom.xml
+++ b/flink-table-store-format/pom.xml
@@ -34,7 +34,7 @@ under the License.
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-store-core</artifactId>
+            <artifactId>flink-table-store-common</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -43,23 +43,30 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-files</artifactId>
+            <artifactId>flink-table-common</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-common</artifactId>
+            <artifactId>flink-table-runtime</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
+        <!-- format dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-avro</artifactId>
             <version>${flink.version}</version>
-            <scope>provided</scope>
         </dependency>
 
         <dependency>
@@ -80,7 +87,6 @@ under the License.
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-orc</artifactId>
             <version>${flink.version}</version>
-            <scope>provided</scope>
         </dependency>
 
         <dependency>
@@ -101,7 +107,6 @@ under the License.
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>${hadoop.version}</version>
-            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.avro</groupId>
@@ -122,16 +127,9 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-store-core</artifactId>
+            <artifactId>flink-table-store-common</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
             <type>test-jar</type>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-runtime</artifactId>
-            <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
 
@@ -238,6 +236,10 @@ under the License.
                                 </filter>
                             </filters>
                             <relocations>
+                                <relocation>
+                                    <pattern>org.apache.avro</pattern>
+                                    <shadedPattern>org.apache.flink.avro.shaded.org.apache.avro</shadedPattern>
+                                </relocation>
                                 <relocation>
                                     <pattern>org.apache.flink.formats.avro</pattern>
                                     <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.formats.avro</shadedPattern>
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
index 9afe772e..4a778a63 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
@@ -19,31 +19,161 @@
 package org.apache.flink.table.store.format.avro;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.formats.avro.AbstractAvroBulkFormat;
+import org.apache.flink.formats.avro.AvroBuilder;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.AvroWriterFactory;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 
-/** Avro {@link FileFormat}. */
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC;
+
+/** Avro {@link FileFormat}. The main code is copied from Flink {@code AvroFileFormatFactory}. */
 public class AvroFileFormat extends FileFormat {
 
-    private final org.apache.flink.formats.avro.AvroFileFormatFactory factory;
     private final ReadableConfig formatOptions;
 
     public AvroFileFormat(ReadableConfig formatOptions) {
         super(org.apache.flink.formats.avro.AvroFileFormatFactory.IDENTIFIER);
-        this.factory = new org.apache.flink.formats.avro.AvroFileFormatFactory();
         this.formatOptions = formatOptions;
     }
 
     @Override
-    protected BulkDecodingFormat<RowData> getDecodingFormat() {
-        return factory.createDecodingFormat(null, formatOptions); // context is useless
+    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+            RowType type, int[][] projection, List<ResolvedExpression> filters) {
+        // avro is a file format that keeps schemas in file headers,
+        // if the schema given to the reader is not equal to the schema in header,
+        // reader will automatically map the fields and give back records with our desired
+        // schema
+        //
+        // for detailed discussion see comments in https://github.com/apache/flink/pull/18657
+        LogicalType producedType = Projection.of(projection).project(type);
+        return new AvroGenericRecordBulkFormat(
+                (RowType) producedType.copy(false), InternalTypeInfo.of(producedType));
     }
 
     @Override
-    protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
-        return factory.createEncodingFormat(null, formatOptions); // context is useless
+    public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
+        return new RowDataAvroWriterFactory(type, formatOptions.get(AVRO_OUTPUT_CODEC));
+    }
+
+    private static class AvroGenericRecordBulkFormat
+            extends AbstractAvroBulkFormat<GenericRecord, RowData, FileSourceSplit> {
+
+        private static final long serialVersionUID = 1L;
+
+        private final RowType producedRowType;
+        private final TypeInformation<RowData> producedTypeInfo;
+
+        public AvroGenericRecordBulkFormat(
+                RowType producedRowType, TypeInformation<RowData> producedTypeInfo) {
+            super(AvroSchemaConverter.convertToSchema(producedRowType));
+            this.producedRowType = producedRowType;
+            this.producedTypeInfo = producedTypeInfo;
+        }
+
+        @Override
+        protected GenericRecord createReusedAvroRecord() {
+            return new GenericData.Record(readerSchema);
+        }
+
+        @Override
+        protected Function<GenericRecord, RowData> createConverter() {
+            AvroToRowDataConverters.AvroToRowDataConverter converter =
+                    AvroToRowDataConverters.createRowConverter(producedRowType);
+            return record -> record == null ? null : (GenericRowData) converter.convert(record);
+        }
+
+        @Override
+        public TypeInformation<RowData> getProducedType() {
+            return producedTypeInfo;
+        }
+    }
+
+    /**
+     * A {@link BulkWriter.Factory} to convert {@link RowData} to {@link GenericRecord} and wrap
+     * {@link AvroWriterFactory}.
+     */
+    private static class RowDataAvroWriterFactory implements BulkWriter.Factory<RowData> {
+
+        private static final long serialVersionUID = 1L;
+
+        private final AvroWriterFactory<GenericRecord> factory;
+        private final RowType rowType;
+
+        private RowDataAvroWriterFactory(RowType rowType, String codec) {
+            this.rowType = rowType;
+            this.factory =
+                    new AvroWriterFactory<>(
+                            new AvroBuilder<GenericRecord>() {
+                                @Override
+                                public DataFileWriter<GenericRecord> createWriter(OutputStream out)
+                                        throws IOException {
+                                    Schema schema = AvroSchemaConverter.convertToSchema(rowType);
+                                    DatumWriter<GenericRecord> datumWriter =
+                                            new GenericDatumWriter<>(schema);
+                                    DataFileWriter<GenericRecord> dataFileWriter =
+                                            new DataFileWriter<>(datumWriter);
+
+                                    if (codec != null) {
+                                        dataFileWriter.setCodec(CodecFactory.fromString(codec));
+                                    }
+                                    dataFileWriter.create(schema, out);
+                                    return dataFileWriter;
+                                }
+                            });
+        }
+
+        @Override
+        public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException {
+            BulkWriter<GenericRecord> writer = factory.create(out);
+            RowDataToAvroConverters.RowDataToAvroConverter converter =
+                    RowDataToAvroConverters.createConverter(rowType);
+            Schema schema = AvroSchemaConverter.convertToSchema(rowType);
+            return new BulkWriter<RowData>() {
+
+                @Override
+                public void addElement(RowData element) throws IOException {
+                    GenericRecord record = (GenericRecord) converter.convert(schema, element);
+                    writer.addElement(record);
+                }
+
+                @Override
+                public void flush() throws IOException {
+                    writer.flush();
+                }
+
+                @Override
+                public void finish() throws IOException {
+                    writer.finish();
+                }
+            };
+        }
     }
 }
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java
index 26c5ad56..c541eaa7 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.store.format.avro;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.format.FileFormatFactory;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormatFactory;
 
 /** Factory to create {@link AvroFileFormat}. */
 public class AvroFileFormatFactory implements FileFormatFactory {
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index c9066946..b5fd5c5e 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -21,24 +21,43 @@ package org.apache.flink.table.store.format.orc;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.orc.OrcColumnarRowInputFormat;
+import org.apache.flink.orc.OrcFilters;
+import org.apache.flink.orc.OrcSplitReaderUtil;
+import org.apache.flink.orc.shim.OrcShim;
+import org.apache.flink.orc.vector.RowDataVectorizer;
+import org.apache.flink.orc.writer.OrcBulkWriterFactory;
+import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractor;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.orc.TypeDescription;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.table.store.format.orc.OrcFileFormatFactory.IDENTIFIER;
 
-/** Orc {@link FileFormat}. */
+/** Orc {@link FileFormat}. The main code is copied from Flink {@code OrcFileFormatFactory}. */
 public class OrcFileFormat extends FileFormat {
 
-    private final org.apache.flink.orc.OrcFileFormatFactory factory;
     private final Configuration formatOptions;
 
     public OrcFileFormat(Configuration formatOptions) {
         super(org.apache.flink.orc.OrcFileFormatFactory.IDENTIFIER);
-        this.factory = new org.apache.flink.orc.OrcFileFormatFactory();
         this.formatOptions = formatOptions;
     }
 
@@ -48,17 +67,57 @@ public class OrcFileFormat extends FileFormat {
     }
 
     @Override
-    protected BulkDecodingFormat<RowData> getDecodingFormat() {
-        return factory.createDecodingFormat(null, formatOptions); // context is useless
+    public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+        return Optional.of(new OrcFileStatsExtractor(type));
     }
 
     @Override
-    protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
-        return factory.createEncodingFormat(null, formatOptions); // context is useless
+    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+            RowType type, int[][] projection, List<ResolvedExpression> filters) {
+        List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();
+
+        if (filters != null) {
+            for (Expression pred : filters) {
+                OrcFilters.Predicate orcPred = OrcFilters.toOrcPredicate(pred);
+                if (orcPred != null) {
+                    orcPredicates.add(orcPred);
+                }
+            }
+        }
+
+        Properties properties = getOrcProperties(formatOptions);
+        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+        properties.forEach((k, v) -> conf.set(k.toString(), v.toString()));
+
+        return OrcColumnarRowInputFormat.createPartitionedFormat(
+                OrcShim.defaultShim(),
+                conf,
+                type,
+                Collections.emptyList(),
+                null,
+                Projection.of(projection).toTopLevelIndexes(),
+                orcPredicates,
+                VectorizedColumnBatch.DEFAULT_SIZE,
+                InternalTypeInfo::of);
     }
 
     @Override
-    public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
-        return Optional.of(new OrcFileStatsExtractor(type));
+    public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
+        LogicalType[] orcTypes = type.getChildren().toArray(new LogicalType[0]);
+
+        TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(type);
+
+        return new OrcBulkWriterFactory<>(
+                new RowDataVectorizer(typeDescription.toString(), orcTypes),
+                getOrcProperties(formatOptions),
+                new org.apache.hadoop.conf.Configuration());
+    }
+
+    private static Properties getOrcProperties(ReadableConfig options) {
+        Properties orcProperties = new Properties();
+        Properties properties = new Properties();
+        ((org.apache.flink.configuration.Configuration) options).addAllToProperties(properties);
+        properties.forEach((k, v) -> orcProperties.put(IDENTIFIER + "." + k, v));
+        return orcProperties;
     }
 }
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java
index 9e544e49..d580ce61 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java
@@ -19,16 +19,18 @@
 package org.apache.flink.table.store.format.orc;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.file.format.FileFormatFactory;
+import org.apache.flink.table.store.format.FileFormatFactory;
 
 import java.util.Properties;
 
 /** Factory to create {@link OrcFileFormat}. */
 public class OrcFileFormatFactory implements FileFormatFactory {
 
+    public static final String IDENTIFIER = "orc";
+
     @Override
     public String identifier() {
-        return "orc";
+        return IDENTIFIER;
     }
 
     @Override
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
index 1acd68d6..044a26e1 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
@@ -22,8 +22,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.store.file.stats.FieldStats;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.utils.DateTimeUtils;
diff --git a/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory b/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
similarity index 100%
rename from flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
rename to flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/BulkFileFormatTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/BulkFileFormatTest.java
index 91e70b81..eefcec76 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/BulkFileFormatTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/BulkFileFormatTest.java
@@ -28,8 +28,6 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -55,22 +53,10 @@ public class BulkFileFormatTest {
         testFormatWriteRead(tempDir, "orc", "snappy");
     }
 
-    @Test
-    public void testParquet(@TempDir java.nio.file.Path tempDir) throws IOException {
-        testFormatWriteRead(tempDir, "parquet", "snappy");
-    }
-
-    @Test
-    public void testCsv(@TempDir java.nio.file.Path tempDir) throws IOException {
-        testFormatWriteRead(tempDir, "csv", "snappy");
-    }
-
     public FileFormat createFileFormat(String format, String codec) {
         Configuration tableOptions = new Configuration();
-        tableOptions.set(FileStoreOptions.FILE_FORMAT, format);
         tableOptions.setString(format + ".codec", codec);
-        return FileFormat.fromTableOptions(
-                this.getClass().getClassLoader(), tableOptions, FileStoreOptions.FILE_FORMAT);
+        return FileFormat.fromIdentifier(format, tableOptions);
     }
 
     public void testFormatWriteRead(
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
index 1ee12ccc..eaac4b29 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.store.format.orc;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.stats.FileStatsExtractorTestBase;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractorTestBase;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BooleanType;
@@ -41,8 +41,7 @@ public class OrcFileStatsExtractorTest extends FileStatsExtractorTestBase {
 
     @Override
     protected FileFormat createFormat() {
-        return FileFormat.fromIdentifier(
-                OrcFileStatsExtractorTest.class.getClassLoader(), "orc", new Configuration());
+        return FileFormat.fromIdentifier("orc", new Configuration());
     }
 
     @Override