You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/06 01:43:59 UTC

[flink-table-store] branch master updated: [FLINK-30569] File Format can not change with data file exists

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 63a27cd7 [FLINK-30569] File Format can not change with data file exists
63a27cd7 is described below

commit 63a27cd7af945839f67afaf6e946bcf617ad18a2
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Jan 6 09:43:54 2023 +0800

    [FLINK-30569] File Format can not change with data file exists
    
    This closes #459
---
 .../file/mergetree/MergeTreeBenchmark.java         |  2 +-
 .../source/TestChangelogDataReadWrite.java         |  2 +-
 .../org/apache/flink/table/store/CoreOptions.java  |  4 ++
 .../table/store/file/AppendOnlyFileStore.java      |  3 +-
 .../flink/table/store/file/KeyValueFileStore.java  |  3 +-
 .../table/store/file/io/DataFilePathFactory.java   |  9 ++++
 .../store/file/io/KeyValueFileReaderFactory.java   | 31 +++++++++-----
 .../file/operation/AppendOnlyFileStoreRead.java    | 24 ++++++-----
 .../file/operation/KeyValueFileStoreRead.java      |  6 +--
 .../file/operation/KeyValueFileStoreWrite.java     |  3 +-
 .../table/store/file/utils/BulkFormatMapping.java  | 19 ++++----
 .../table/store/format/FileFormatDiscover.java     | 45 +++++++++++++++++++
 .../apache/flink/table/store/format/FormatKey.java | 50 ++++++++++++++++++++++
 .../store/file/io/KeyValueFileReadWriteTest.java   |  4 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |  2 +-
 .../table/store/table/FileStoreTableTestBase.java  | 35 +++++++++++++++
 16 files changed, 202 insertions(+), 40 deletions(-)

diff --git a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java
index 70304ee3..a01ab499 100644
--- a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java
+++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java
@@ -144,7 +144,7 @@ public class MergeTreeBenchmark {
                         0,
                         keyType,
                         valueType,
-                        flushingFormat,
+                        ignore -> flushingFormat,
                         pathFactory,
                         new KeyValueFieldsExtractor() {
                             @Override
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 3df55343..514ea55a 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -127,7 +127,7 @@ public class TestChangelogDataReadWrite {
                         VALUE_TYPE,
                         COMPARATOR,
                         DeduplicateMergeFunction.factory(),
-                        avro,
+                        ignore -> avro,
                         pathFactory,
                         EXTRACTOR);
         return new KeyValueTableRead(read) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index ba61e105..368620d6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -405,6 +405,10 @@ public class CoreOptions implements Serializable {
         this.options = options;
     }
 
+    public Configuration toConfiguration() {
+        return options;
+    }
+
     public Map<String, String> toMap() {
         return options.toMap();
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index 0d889213..e59a82b6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreRead;
 import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
 import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreWrite;
 import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.format.FileFormatDiscover;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Comparator;
@@ -54,7 +55,7 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
     @Override
     public AppendOnlyFileStoreRead newRead() {
         return new AppendOnlyFileStoreRead(
-                schemaManager, schemaId, rowType, options.fileFormat(), pathFactory());
+                schemaManager, schemaId, rowType, FileFormatDiscover.of(options), pathFactory());
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 780eabb4..06e88064 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite;
 import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
+import org.apache.flink.table.store.format.FileFormatDiscover;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Comparator;
@@ -77,7 +78,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
                 valueType,
                 newKeyComparator(),
                 mfFactory,
-                options.fileFormat(),
+                FileFormatDiscover.of(options),
                 pathFactory(),
                 keyValueFieldsExtractor);
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFilePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFilePathFactory.java
index f762963c..4c7019b0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFilePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFilePathFactory.java
@@ -73,4 +73,13 @@ public class DataFilePathFactory {
     public static Path bucketPath(Path tablePath, String partition, int bucket) {
         return new Path(tablePath + "/" + partition + "/bucket-" + bucket);
     }
+
+    public static String formatIdentifier(String fileName) {
+        int index = fileName.lastIndexOf('.');
+        if (index == -1) {
+            throw new IllegalArgumentException(fileName + " is not a legal file name.");
+        }
+
+        return fileName.substring(index + 1);
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileReaderFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileReaderFactory.java
index 424847dd..22152a74 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileReaderFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileReaderFactory.java
@@ -27,7 +27,8 @@ import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.BulkFormatMapping;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormatDiscover;
+import org.apache.flink.table.store.format.FormatKey;
 import org.apache.flink.table.store.utils.Projection;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -48,7 +49,7 @@ public class KeyValueFileReaderFactory {
     private final RowType valueType;
 
     private final BulkFormatMapping.BulkFormatMappingBuilder bulkFormatMappingBuilder;
-    private final Map<Long, BulkFormatMapping> bulkFormatMappings;
+    private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
     private final DataFilePathFactory pathFactory;
 
     private KeyValueFileReaderFactory(
@@ -69,13 +70,15 @@ public class KeyValueFileReaderFactory {
 
     public RecordReader<KeyValue> createRecordReader(long schemaId, String fileName, int level)
             throws IOException {
+        String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName);
         BulkFormatMapping bulkFormatMapping =
                 bulkFormatMappings.computeIfAbsent(
-                        schemaId,
+                        new FormatKey(schemaId, formatIdentifier),
                         key -> {
                             TableSchema tableSchema = schemaManager.schema(this.schemaId);
-                            TableSchema dataSchema = schemaManager.schema(key);
-                            return bulkFormatMappingBuilder.build(tableSchema, dataSchema);
+                            TableSchema dataSchema = schemaManager.schema(key.schemaId);
+                            return bulkFormatMappingBuilder.build(
+                                    formatIdentifier, tableSchema, dataSchema);
                         });
         return new KeyValueDataFileRecordReader(
                 bulkFormatMapping.getReaderFactory(),
@@ -91,11 +94,17 @@ public class KeyValueFileReaderFactory {
             long schemaId,
             RowType keyType,
             RowType valueType,
-            FileFormat fileFormat,
+            FileFormatDiscover formatDiscover,
             FileStorePathFactory pathFactory,
             KeyValueFieldsExtractor extractor) {
         return new Builder(
-                schemaManager, schemaId, keyType, valueType, fileFormat, pathFactory, extractor);
+                schemaManager,
+                schemaId,
+                keyType,
+                valueType,
+                formatDiscover,
+                pathFactory,
+                extractor);
     }
 
     /** Builder for {@link KeyValueFileReaderFactory}. */
@@ -105,7 +114,7 @@ public class KeyValueFileReaderFactory {
         private final long schemaId;
         private final RowType keyType;
         private final RowType valueType;
-        private final FileFormat fileFormat;
+        private final FileFormatDiscover formatDiscover;
         private final FileStorePathFactory pathFactory;
         private final KeyValueFieldsExtractor extractor;
 
@@ -120,14 +129,14 @@ public class KeyValueFileReaderFactory {
                 long schemaId,
                 RowType keyType,
                 RowType valueType,
-                FileFormat fileFormat,
+                FileFormatDiscover formatDiscover,
                 FileStorePathFactory pathFactory,
                 KeyValueFieldsExtractor extractor) {
             this.schemaManager = schemaManager;
             this.schemaId = schemaId;
             this.keyType = keyType;
             this.valueType = valueType;
-            this.fileFormat = fileFormat;
+            this.formatDiscover = formatDiscover;
             this.pathFactory = pathFactory;
             this.extractor = extractor;
 
@@ -167,7 +176,7 @@ public class KeyValueFileReaderFactory {
                     projectedKeyType,
                     projectedValueType,
                     BulkFormatMapping.newBuilder(
-                            fileFormat, extractor, keyProjection, valueProjection, filters),
+                            formatDiscover, extractor, keyProjection, valueProjection, filters),
                     pathFactory.createDataFilePathFactory(partition, bucket));
         }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
index 2dc3038a..3e99d8db 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -30,7 +30,8 @@ import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.BulkFormatMapping;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormatDiscover;
+import org.apache.flink.table.store.format.FormatKey;
 import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.store.utils.Projection;
 import org.apache.flink.table.types.logical.RowType;
@@ -51,9 +52,9 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
     private final SchemaManager schemaManager;
     private final long schemaId;
     private final RowType rowType;
-    private final FileFormat fileFormat;
+    private final FileFormatDiscover formatDiscover;
     private final FileStorePathFactory pathFactory;
-    private final Map<Long, BulkFormatMapping> bulkFormatMappings;
+    private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
 
     private int[][] projection;
 
@@ -63,12 +64,12 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
             SchemaManager schemaManager,
             long schemaId,
             RowType rowType,
-            FileFormat fileFormat,
+            FileFormatDiscover formatDiscover,
             FileStorePathFactory pathFactory) {
         this.schemaManager = schemaManager;
         this.schemaId = schemaId;
         this.rowType = rowType;
-        this.fileFormat = fileFormat;
+        this.formatDiscover = formatDiscover;
         this.pathFactory = pathFactory;
         this.bulkFormatMappings = new HashMap<>();
 
@@ -92,12 +93,13 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
                 pathFactory.createDataFilePathFactory(split.partition(), split.bucket());
         List<ConcatRecordReader.ReaderSupplier<RowData>> suppliers = new ArrayList<>();
         for (DataFileMeta file : split.files()) {
+            String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName());
             BulkFormatMapping bulkFormatMapping =
                     bulkFormatMappings.computeIfAbsent(
-                            file.schemaId(),
+                            new FormatKey(file.schemaId(), formatIdentifier),
                             key -> {
                                 TableSchema tableSchema = schemaManager.schema(this.schemaId);
-                                TableSchema dataSchema = schemaManager.schema(key);
+                                TableSchema dataSchema = schemaManager.schema(key.schemaId);
                                 int[][] dataProjection =
                                         SchemaEvolutionUtil.createDataProjection(
                                                 tableSchema.fields(),
@@ -111,7 +113,7 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
                                                 Projection.of(dataProjection).toTopLevelIndexes(),
                                                 dataSchema.fields());
                                 List<Predicate> dataFilters =
-                                        this.schemaId == key
+                                        this.schemaId == key.schemaId
                                                 ? filters
                                                 : SchemaEvolutionUtil.createDataFilters(
                                                         tableSchema.fields(),
@@ -119,8 +121,10 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
                                                         filters);
                                 return new BulkFormatMapping(
                                         indexMapping,
-                                        fileFormat.createReaderFactory(
-                                                rowType, dataProjection, dataFilters));
+                                        formatDiscover
+                                                .discover(formatIdentifier)
+                                                .createReaderFactory(
+                                                        rowType, dataProjection, dataFilters));
                             });
             suppliers.add(
                     () ->
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index 18633be8..84a939f9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -37,7 +37,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderUtils;
-import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormatDiscover;
 import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.store.utils.ProjectedRowData;
 import org.apache.flink.table.types.logical.RowType;
@@ -83,7 +83,7 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
             RowType valueType,
             Comparator<RowData> keyComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
-            FileFormat fileFormat,
+            FileFormatDiscover formatDiscover,
             FileStorePathFactory pathFactory,
             KeyValueFieldsExtractor extractor) {
         this.tableSchema = schemaManager.schema(schemaId);
@@ -93,7 +93,7 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
                         schemaId,
                         keyType,
                         valueType,
-                        fileFormat,
+                        formatDiscover,
                         pathFactory,
                         extractor);
         this.keyComparator = keyComparator;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index ec6f8453..ee245ab3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.format.FileFormatDiscover;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.slf4j.Logger;
@@ -89,7 +90,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
                         schemaId,
                         keyType,
                         valueType,
-                        options.fileFormat(),
+                        FileFormatDiscover.of(options),
                         pathFactory,
                         extractor);
         this.writerFactoryBuilder =
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/BulkFormatMapping.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/BulkFormatMapping.java
index 6f064e12..3e831c98 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/BulkFormatMapping.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/BulkFormatMapping.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
 import org.apache.flink.table.store.file.schema.RowDataType;
 import org.apache.flink.table.store.file.schema.SchemaEvolutionUtil;
 import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormatDiscover;
 import org.apache.flink.table.store.utils.Projection;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -56,37 +56,38 @@ public class BulkFormatMapping {
     }
 
     public static BulkFormatMappingBuilder newBuilder(
-            FileFormat fileFormat,
+            FileFormatDiscover formatDiscover,
             KeyValueFieldsExtractor extractor,
             int[][] keyProjection,
             int[][] valueProjection,
             @Nullable List<Predicate> filters) {
         return new BulkFormatMappingBuilder(
-                fileFormat, extractor, keyProjection, valueProjection, filters);
+                formatDiscover, extractor, keyProjection, valueProjection, filters);
     }
 
     /** Builder to build {@link BulkFormatMapping}. */
     public static class BulkFormatMappingBuilder {
-        private final FileFormat fileFormat;
+        private final FileFormatDiscover formatDiscover;
         private final KeyValueFieldsExtractor extractor;
         private final int[][] keyProjection;
         private final int[][] valueProjection;
         @Nullable private final List<Predicate> filters;
 
         private BulkFormatMappingBuilder(
-                FileFormat fileFormat,
+                FileFormatDiscover formatDiscover,
                 KeyValueFieldsExtractor extractor,
                 int[][] keyProjection,
                 int[][] valueProjection,
                 @Nullable List<Predicate> filters) {
-            this.fileFormat = fileFormat;
+            this.formatDiscover = formatDiscover;
             this.extractor = extractor;
             this.keyProjection = keyProjection;
             this.valueProjection = valueProjection;
             this.filters = filters;
         }
 
-        public BulkFormatMapping build(TableSchema tableSchema, TableSchema dataSchema) {
+        public BulkFormatMapping build(
+                String formatIdentifier, TableSchema tableSchema, TableSchema dataSchema) {
             List<DataField> tableKeyFields = extractor.keyFields(tableSchema);
             List<DataField> tableValueFields = extractor.valueFields(tableSchema);
             int[][] tableProjection =
@@ -146,7 +147,9 @@ public class BulkFormatMapping {
                                     tableSchema.fields(), dataSchema.fields(), filters);
             return new BulkFormatMapping(
                     indexMapping,
-                    fileFormat.createReaderFactory(dataRecordType, dataProjection, dataFilters));
+                    formatDiscover
+                            .discover(formatIdentifier)
+                            .createReaderFactory(dataRecordType, dataProjection, dataFilters));
         }
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/format/FileFormatDiscover.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/format/FileFormatDiscover.java
new file mode 100644
index 00000000..8a98573b
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/format/FileFormatDiscover.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format;
+
+import org.apache.flink.table.store.CoreOptions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** A class to discover {@link FileFormat}. */
+public interface FileFormatDiscover {
+
+    static FileFormatDiscover of(CoreOptions options) {
+        Map<String, FileFormat> formats = new HashMap<>();
+        return new FileFormatDiscover() {
+
+            @Override
+            public FileFormat discover(String identifier) {
+                return formats.computeIfAbsent(identifier, this::create);
+            }
+
+            private FileFormat create(String identifier) {
+                return FileFormat.fromIdentifier(identifier, options.toConfiguration());
+            }
+        };
+    }
+
+    FileFormat discover(String identifier);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/format/FormatKey.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/format/FormatKey.java
new file mode 100644
index 00000000..42f7799c
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/format/FormatKey.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format;
+
+import java.util.Objects;
+
+/** Format Key for read a file. */
+public class FormatKey {
+
+    public final long schemaId;
+    public final String format;
+
+    public FormatKey(long schemaId, String format) {
+        this.schemaId = schemaId;
+        this.format = format;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        FormatKey formatKey = (FormatKey) o;
+        return schemaId == formatKey.schemaId && Objects.equals(format, formatKey.format);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(schemaId, format);
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/KeyValueFileReadWriteTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/KeyValueFileReadWriteTest.java
index 74f80dfd..f39a5a9b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/KeyValueFileReadWriteTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/KeyValueFileReadWriteTest.java
@@ -68,7 +68,7 @@ public class KeyValueFileReadWriteTest {
     public void testReadNonExistentFile() {
         KeyValueFileReaderFactory readerFactory =
                 createReaderFactory(tempDir.toString(), "avro", null, null);
-        assertThatThrownBy(() -> readerFactory.createRecordReader(0, "dummy_file", 0))
+        assertThatThrownBy(() -> readerFactory.createRecordReader(0, "dummy_file.avro", 0))
                 .hasMessageContaining(
                         "you can configure 'snapshot.time-retained' option with a larger value.");
     }
@@ -263,7 +263,7 @@ public class KeyValueFileReadWriteTest {
                         0,
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.DEFAULT_ROW_TYPE,
-                        new FlushingFileFormat(format),
+                        ignore -> new FlushingFileFormat(format),
                         pathFactory,
                         new TestKeyValueGenerator.TestKeyValueFieldsExtractor());
         if (keyProjection != null) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 0ef7fdd5..f80052c6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -141,7 +141,7 @@ public class MergeTreeTest {
                         0,
                         keyType,
                         valueType,
-                        flushingAvro,
+                        ignore -> flushingAvro,
                         pathFactory,
                         new KeyValueFieldsExtractor() {
                             @Override
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 8965e97f..a19a4e79 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -69,6 +69,7 @@ import java.util.stream.Collectors;
 import static org.apache.flink.table.store.CoreOptions.BUCKET;
 import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
 import static org.apache.flink.table.store.CoreOptions.COMPACTION_MAX_FILE_NUM;
+import static org.apache.flink.table.store.CoreOptions.FILE_FORMAT;
 import static org.apache.flink.table.store.CoreOptions.WRITE_COMPACTION_SKIP;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -143,6 +144,40 @@ public abstract class FileStoreTableTestBase {
         assertThat(traceableFileSystem.openOutputStreams(pathPredicate)).isEmpty();
     }
 
+    @Test
+    public void testChangeFormat() throws Exception {
+        FileStoreTable table = createFileStoreTable(conf -> conf.set(FILE_FORMAT, "orc"));
+
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
+        commit.commit(0, write.prepareCommit(true, 0));
+        write.close();
+        commit.close();
+
+        assertThat(getResult(table.newRead(), table.newScan().plan().splits(), BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+        table = createFileStoreTable(conf -> conf.set(FILE_FORMAT, "avro"));
+        write = table.newWrite(commitUser);
+        commit = table.newCommit(commitUser);
+        write.write(rowData(1, 11, 111L));
+        write.write(rowData(2, 22, 222L));
+        commit.commit(1, write.prepareCommit(true, 1));
+        write.close();
+        commit.close();
+
+        assertThat(getResult(table.newRead(), table.newScan().plan().splits(), BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                        "1|11|111|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|22|222|binary|varbinary|mapKey:mapVal|multiset");
+    }
+
     @Test
     public void testOverwrite() throws Exception {
         FileStoreTable table = createFileStoreTable();