You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/07/20 08:30:32 UTC

[incubator-paimon] 01/02: [core] add file-format-per-level for paimon

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 78f78053075e0e88925d2c98bcfeaf1462201ca1
Author: zhangjun <zh...@126.com>
AuthorDate: Tue Jul 4 22:56:27 2023 +0800

    [core] add file-format-per-level for paimon
    
    This closes #1500
---
 .../shortcodes/generated/core_configuration.html   |   6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   |  17 +++
 .../java/org/apache/paimon/KeyValueFileStore.java  |  22 ++++
 .../paimon/io/KeyValueFileWriterFactory.java       | 114 +++++++++++++++++----
 .../apache/paimon/mergetree/MergeTreeWriter.java   |   8 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   |   6 +-
 .../apache/paimon/format/FileFormatSuffixTest.java |   2 +-
 .../paimon/io/KeyValueFileReadWriteTest.java       |  23 ++++-
 .../apache/paimon/mergetree/LookupLevelsTest.java  |  15 ++-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |  16 ++-
 .../apache/paimon/flink/CatalogTableITCase.java    |  18 ++++
 .../flink/source/TestChangelogDataReadWrite.java   |   6 ++
 12 files changed, 216 insertions(+), 37 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 36dba2b52..d5a09f48a 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -152,6 +152,12 @@ under the License.
             <td><p>Enum</p></td>
             <td>Specify the message format of data files, currently orc, parquet and avro are supported.<br /><br />Possible values:<ul><li>"orc": ORC file format.</li><li>"parquet": Parquet file format.</li><li>"avro": Avro file format.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>file.format.per.level</h5></td>
+            <td style="word-wrap: break-word;"></td>
+            <td>Map</td>
+            <td>Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used.</td>
+        </tr>
         <tr>
             <td><h5>full-compaction.delta-commits</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 3552bc21d..084cf45df 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -117,6 +117,17 @@ public class CoreOptions implements Serializable {
                                     + "could be NONE, ZLIB, SNAPPY, LZO, LZ4, for parquet file format, the compression value could be "
                                     + "UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD.");
 
+    public static final ConfigOption<Map<String, String>> FILE_FORMAT_PER_LEVEL =
+            key("file.format.per.level")
+                    .mapType()
+                    .defaultValue(new HashMap<>())
+                    .withDescription(
+                            "Define different file format for different level, you can add the conf like this:"
+                                    + " 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, "
+                                    + "the default format which set by `"
+                                    + FILE_FORMAT.key()
+                                    + "` will be used.");
+
     public static final ConfigOption<String> FILE_COMPRESSION =
             key("file.compression")
                     .stringType()
@@ -885,6 +896,12 @@ public class CoreOptions implements Serializable {
                 .collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()), Map.Entry::getValue));
     }
 
+    public Map<Integer, String> fileFormatPerLevel() {
+        Map<String, String> levelFormats = options.get(FILE_FORMAT_PER_LEVEL);
+        return levelFormats.entrySet().stream()
+                .collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()), Map.Entry::getValue));
+    }
+
     public String fileCompression() {
         return options.get(FILE_COMPRESSION);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index ad0ad67dd..ba1ef81db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -35,11 +35,16 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.KeyComparatorSupplier;
 import org.apache.paimon.utils.ValueEqualiserSupplier;
 
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.function.Supplier;
 
 import static org.apache.paimon.predicate.PredicateBuilder.and;
@@ -127,6 +132,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
                 valueEqualiserSupplier,
                 mfFactory,
                 pathFactory(),
+                format2PathFactory(),
                 snapshotManager(),
                 newScan(true).withManifestCacheFilter(manifestFilter),
                 indexFactory,
@@ -134,6 +140,22 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
                 keyValueFieldsExtractor);
     }
 
+    private Map<String, FileStorePathFactory> format2PathFactory() {
+        Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
+        Set<String> formats = new HashSet<>(options.fileFormatPerLevel().values());
+        formats.add(options.fileFormat().getFormatIdentifier());
+        formats.forEach(
+                format ->
+                        pathFactoryMap.put(
+                                format,
+                                new FileStorePathFactory(
+                                        options.path(),
+                                        partitionType,
+                                        options.partitionDefaultName(),
+                                        format)));
+        return pathFactoryMap;
+    }
+
     private KeyValueFileStoreScan newScan(boolean forWrite) {
         ScanBucketFilter bucketFilter =
                 new ScanBucketFilter(bucketKeyType) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index f3912daf5..a19db4aa5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -34,7 +34,9 @@ import org.apache.paimon.utils.StatsCollectorFactories;
 
 import javax.annotation.Nullable;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /** A factory to create {@link FileWriter}s for writing {@link KeyValue} files. */
 public class KeyValueFileWriterFactory {
@@ -43,12 +45,15 @@ public class KeyValueFileWriterFactory {
     private final long schemaId;
     private final RowType keyType;
     private final RowType valueType;
-    private final FormatWriterFactory writerFactory;
+    private final RowType recordType;
+    private final Map<String, FormatWriterFactory> writerFactoryMap;
     @Nullable private final TableStatsExtractor tableStatsExtractor;
-    private final DataFilePathFactory pathFactory;
+    private final Map<String, DataFilePathFactory> pathFactoryMap;
     private final long suggestedFileSize;
     private final Map<Integer, String> levelCompressions;
     private final String fileCompression;
+    private final Map<Integer, String> levelFormats;
+    private final FileFormat fileFormat;
     private final CoreOptions options;
 
     private KeyValueFileWriterFactory(
@@ -56,23 +61,29 @@ public class KeyValueFileWriterFactory {
             long schemaId,
             RowType keyType,
             RowType valueType,
-            FormatWriterFactory writerFactory,
+            RowType recordType,
+            FileFormat fileFormat,
+            Map<String, FormatWriterFactory> writerFactoryMap,
             @Nullable TableStatsExtractor tableStatsExtractor,
-            DataFilePathFactory pathFactory,
+            Map<String, DataFilePathFactory> pathFactoryMap,
             long suggestedFileSize,
             Map<Integer, String> levelCompressions,
             String fileCompression,
+            Map<Integer, String> levelFormats,
             CoreOptions options) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.keyType = keyType;
         this.valueType = valueType;
-        this.writerFactory = writerFactory;
+        this.recordType = recordType;
+        this.fileFormat = fileFormat;
+        this.writerFactoryMap = writerFactoryMap;
         this.tableStatsExtractor = tableStatsExtractor;
-        this.pathFactory = pathFactory;
+        this.pathFactoryMap = pathFactoryMap;
         this.suggestedFileSize = suggestedFileSize;
         this.levelCompressions = levelCompressions;
         this.fileCompression = fileCompression;
+        this.levelFormats = levelFormats;
         this.options = options;
     }
 
@@ -85,13 +96,18 @@ public class KeyValueFileWriterFactory {
     }
 
     @VisibleForTesting
-    public DataFilePathFactory pathFactory() {
-        return pathFactory;
+    public DataFilePathFactory pathFactory(String format) {
+        return pathFactoryMap.get(format);
     }
 
     public RollingFileWriter<KeyValue, DataFileMeta> createRollingMergeTreeFileWriter(int level) {
+        String fileFormat = getFileFormat(level);
         return new RollingFileWriter<>(
-                () -> createDataFileWriter(pathFactory.newPath(), level, getCompression(level)),
+                () ->
+                        createDataFileWriter(
+                                pathFactoryMap.get(fileFormat).newPath(),
+                                level,
+                                getCompression(level)),
                 suggestedFileSize);
     }
 
@@ -103,32 +119,55 @@ public class KeyValueFileWriterFactory {
         }
     }
 
+    private String getFileFormat(int level) {
+        if (null == levelFormats) {
+            return fileFormat.getFormatIdentifier();
+        } else {
+            return levelFormats.getOrDefault(level, fileFormat.getFormatIdentifier());
+        }
+    }
+
     public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWriter(int level) {
+
         return new RollingFileWriter<>(
                 () ->
                         createDataFileWriter(
-                                pathFactory.newChangelogPath(), level, getCompression(level)),
+                                pathFactoryMap.get(getFileFormat(level)).newChangelogPath(),
+                                level,
+                                getCompression(level)),
                 suggestedFileSize);
     }
 
     private KeyValueDataFileWriter createDataFileWriter(Path path, int level, String compression) {
         KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType);
+        String fileFormat = getFileFormat(level);
         return new KeyValueDataFileWriter(
                 fileIO,
-                writerFactory,
+                writerFactoryMap.get(fileFormat),
                 path,
                 kvSerializer::toRow,
                 keyType,
                 valueType,
-                tableStatsExtractor,
+                getTableStatsExtractor(fileFormat),
                 schemaId,
                 level,
                 compression,
                 options);
     }
 
-    public void deleteFile(String filename) {
-        fileIO.deleteQuietly(pathFactory.toPath(filename));
+    private TableStatsExtractor getTableStatsExtractor(String fileFormat) {
+        return null == fileFormat
+                ? tableStatsExtractor
+                : FileFormat.fromIdentifier(fileFormat, options.toConfiguration())
+                        .createStatsExtractor(
+                                recordType,
+                                StatsCollectorFactories.createStatsFactories(
+                                        options, recordType.getFieldNames()))
+                        .orElse(null);
+    }
+
+    public void deleteFile(String filename, int level) {
+        fileIO.deleteQuietly(pathFactoryMap.get(getFileFormat(level)).toPath(filename));
     }
 
     public static Builder builder(
@@ -137,10 +176,16 @@ public class KeyValueFileWriterFactory {
             RowType keyType,
             RowType valueType,
             FileFormat fileFormat,
-            FileStorePathFactory pathFactory,
+            Map<String, FileStorePathFactory> format2PathFactory,
             long suggestedFileSize) {
         return new Builder(
-                fileIO, schemaId, keyType, valueType, fileFormat, pathFactory, suggestedFileSize);
+                fileIO,
+                schemaId,
+                keyType,
+                valueType,
+                fileFormat,
+                format2PathFactory,
+                suggestedFileSize);
     }
 
     /** Builder of {@link KeyValueFileWriterFactory}. */
@@ -151,7 +196,7 @@ public class KeyValueFileWriterFactory {
         private final RowType keyType;
         private final RowType valueType;
         private final FileFormat fileFormat;
-        private final FileStorePathFactory pathFactory;
+        private final Map<String, FileStorePathFactory> format2PathFactory;
         private final long suggestedFileSize;
 
         private Builder(
@@ -160,14 +205,14 @@ public class KeyValueFileWriterFactory {
                 RowType keyType,
                 RowType valueType,
                 FileFormat fileFormat,
-                FileStorePathFactory pathFactory,
+                Map<String, FileStorePathFactory> format2PathFactory,
                 long suggestedFileSize) {
             this.fileIO = fileIO;
             this.schemaId = schemaId;
             this.keyType = keyType;
             this.valueType = valueType;
             this.fileFormat = fileFormat;
-            this.pathFactory = pathFactory;
+            this.format2PathFactory = format2PathFactory;
             this.suggestedFileSize = suggestedFileSize;
         }
 
@@ -176,24 +221,51 @@ public class KeyValueFileWriterFactory {
                 int bucket,
                 Map<Integer, String> levelCompressions,
                 String fileCompression,
+                Map<Integer, String> levelFormats,
                 CoreOptions options) {
             RowType recordType = KeyValue.schema(keyType, valueType);
+
+            Map<String, FormatWriterFactory> writerFactoryMap = new HashMap<>();
+            writerFactoryMap.put(
+                    fileFormat.getFormatIdentifier(), fileFormat.createWriterFactory(recordType));
+            if (null != levelFormats) {
+                for (String fileFormat : levelFormats.values()) {
+                    writerFactoryMap.putIfAbsent(
+                            fileFormat,
+                            FileFormat.fromIdentifier(fileFormat, options.toConfiguration())
+                                    .createWriterFactory(recordType));
+                }
+            }
+
+            Map<String, DataFilePathFactory> dataFilePathFactoryMap =
+                    format2PathFactory.entrySet().stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            Map.Entry::getKey,
+                                            e ->
+                                                    e.getValue()
+                                                            .createDataFilePathFactory(
+                                                                    partition, bucket)));
+
             return new KeyValueFileWriterFactory(
                     fileIO,
                     schemaId,
                     keyType,
                     valueType,
-                    fileFormat.createWriterFactory(recordType),
+                    recordType,
+                    fileFormat,
+                    writerFactoryMap,
                     fileFormat
                             .createStatsExtractor(
                                     recordType,
                                     StatsCollectorFactories.createStatsFactories(
                                             options, recordType.getFieldNames()))
                             .orElse(null),
-                    pathFactory.createDataFilePathFactory(partition, bucket),
+                    dataFilePathFactoryMap,
                     suggestedFileSize,
                     levelCompressions,
                     fileCompression,
+                    levelFormats,
                     options);
         }
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index dfe22f2fe..c43f97604 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -270,7 +270,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
                 // 2. This file is not the input of upgraded.
                 if (!compactBefore.containsKey(file.fileName())
                         && !afterFiles.contains(file.fileName())) {
-                    writerFactory.deleteFile(file.fileName());
+                    writerFactory.deleteFile(file.fileName(), file.level());
                 }
             } else {
                 compactBefore.put(file.fileName(), file);
@@ -297,7 +297,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
         newFiles.clear();
 
         for (DataFileMeta file : newFilesChangelog) {
-            writerFactory.deleteFile(file.fileName());
+            writerFactory.deleteFile(file.fileName(), file.level());
         }
         newFilesChangelog.clear();
 
@@ -312,12 +312,12 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
         compactAfter.clear();
 
         for (DataFileMeta file : compactChangelog) {
-            writerFactory.deleteFile(file.fileName());
+            writerFactory.deleteFile(file.fileName(), file.level());
         }
         compactChangelog.clear();
 
         for (DataFileMeta file : delete) {
-            writerFactory.deleteFile(file.fileName());
+            writerFactory.deleteFile(file.fileName(), file.level());
         }
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 4c5d6926c..06bb373cf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -62,6 +62,7 @@ import javax.annotation.Nullable;
 
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 
@@ -93,6 +94,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
             Supplier<RecordEqualiser> valueEqualiserSupplier,
             MergeFunctionFactory<KeyValue> mfFactory,
             FileStorePathFactory pathFactory,
+            Map<String, FileStorePathFactory> format2PathFactory,
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             @Nullable IndexMaintainer.Factory<KeyValue> indexFactory,
@@ -119,7 +121,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
                         keyType,
                         valueType,
                         options.fileFormat(),
-                        pathFactory,
+                        format2PathFactory,
                         options.targetFileSize());
         this.keyComparatorSupplier = keyComparatorSupplier;
         this.valueEqualiserSupplier = valueEqualiserSupplier;
@@ -148,6 +150,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
                         bucket,
                         options.fileCompressionPerLevel(),
                         options.fileCompression(),
+                        options.fileFormatPerLevel(),
                         options);
         Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
         Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
@@ -212,6 +215,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
                         bucket,
                         options.fileCompressionPerLevel(),
                         options.fileCompression(),
+                        options.fileFormatPerLevel(),
                         options);
         MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, ioManager);
         switch (options.changelogProducer()) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 09b36d2e6..4b8e98ec2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -56,7 +56,7 @@ public class FileFormatSuffixTest extends KeyValueFileReadWriteTest {
     public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception {
         String format = "avro";
         KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format);
-        Path path = writerFactory.pathFactory().newPath();
+        Path path = writerFactory.pathFactory(format).newPath();
         Assertions.assertTrue(path.getPath().endsWith(format));
 
         DataFilePathFactory dataFilePathFactory =
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index d612a1d1b..0e2e5a3b1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -49,8 +49,10 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
@@ -244,6 +246,17 @@ public class KeyValueFileReadWriteTest {
         FileIO fileIO = FileIOFinder.find(path);
         Options options = new Options();
         options.set(CoreOptions.METADATA_STATS_MODE, "FULL");
+
+        Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
+        pathFactoryMap.put(format, pathFactory);
+        pathFactoryMap.put(
+                CoreOptions.FILE_FORMAT.defaultValue().toString(),
+                new FileStorePathFactory(
+                        path,
+                        RowType.of(),
+                        CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
+                        CoreOptions.FILE_FORMAT.defaultValue().toString()));
+
         return KeyValueFileWriterFactory.builder(
                         fileIO,
                         0,
@@ -253,9 +266,15 @@ public class KeyValueFileReadWriteTest {
                         // if the written file size is really larger than suggested, so we use a
                         // special format which flushes for every added element
                         new FlushingFileFormat(format),
-                        pathFactory,
+                        pathFactoryMap,
                         suggestedFileSize)
-                .build(BinaryRow.EMPTY_ROW, 0, null, null, new CoreOptions(options));
+                .build(
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        null,
+                        null,
+                        new HashMap<>(),
+                        new CoreOptions(options));
     }
 
     private KeyValueFileReaderFactory createReaderFactory(
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 114ff92a3..3c6c19182 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -228,15 +228,24 @@ public class LookupLevelsTest {
 
     private KeyValueFileWriterFactory createWriterFactory() {
         Path path = new Path(tempDir.toUri().toString());
+        String identifier = "avro";
+        Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
+        pathFactoryMap.put(identifier, new FileStorePathFactory(path));
         return KeyValueFileWriterFactory.builder(
                         FileIOFinder.find(path),
                         0,
                         keyType,
                         rowType,
-                        new FlushingFileFormat("avro"),
-                        new FileStorePathFactory(path),
+                        new FlushingFileFormat(identifier),
+                        pathFactoryMap,
                         TARGET_FILE_SIZE.defaultValue().getBytes())
-                .build(BinaryRow.EMPTY_ROW, 0, null, null, new CoreOptions(new Options()));
+                .build(
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        null,
+                        null,
+                        new HashMap<>(),
+                        new CoreOptions(new Options()));
     }
 
     private KeyValueFileReaderFactory createReaderFactory() {
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index fd36afa8a..2983028b7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -105,6 +105,7 @@ public abstract class MergeTreeTestBase {
     private KeyValueFileWriterFactory compactWriterFactory;
     private MergeTreeWriter writer;
     private SortEngine sortEngine;
+    private String identifier = "avro";
 
     @BeforeEach
     public void beforeEach() throws IOException {
@@ -113,7 +114,7 @@ public abstract class MergeTreeTestBase {
         comparator = Comparator.comparingInt(o -> o.getInt(0));
         sortEngine = getSortEngine();
         recreateMergeTree(1024 * 1024);
-        Path bucketDir = writerFactory.pathFactory().toPath("ignore").getParent();
+        Path bucketDir = writerFactory.pathFactory(identifier).toPath("ignore").getParent();
         LocalFileIO.create().mkdirs(bucketDir);
     }
 
@@ -142,7 +143,7 @@ public abstract class MergeTreeTestBase {
         RowType keyType = new RowType(singletonList(new DataField(0, "k", new IntType())));
         RowType valueType = new RowType(singletonList(new DataField(0, "v", new IntType())));
 
-        FileFormat flushingAvro = new FlushingFileFormat("avro");
+        FileFormat flushingAvro = new FlushingFileFormat(identifier);
         KeyValueFileReaderFactory.Builder readerFactoryBuilder =
                 KeyValueFileReaderFactory.builder(
                         LocalFileIO.create(),
@@ -173,6 +174,9 @@ public abstract class MergeTreeTestBase {
                         });
         readerFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0);
         compactReaderFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0);
+
+        Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
+        pathFactoryMap.put(identifier, pathFactory);
         KeyValueFileWriterFactory.Builder writerFactoryBuilder =
                 KeyValueFileWriterFactory.builder(
                         LocalFileIO.create(),
@@ -180,7 +184,7 @@ public abstract class MergeTreeTestBase {
                         keyType,
                         valueType,
                         flushingAvro,
-                        pathFactory,
+                        pathFactoryMap,
                         options.targetFileSize());
         writerFactory =
                 writerFactoryBuilder.build(
@@ -188,6 +192,7 @@ public abstract class MergeTreeTestBase {
                         0,
                         options.fileCompressionPerLevel(),
                         options.fileCompression(),
+                        options.fileFormatPerLevel(),
                         options);
         compactWriterFactory =
                 writerFactoryBuilder.build(
@@ -195,6 +200,7 @@ public abstract class MergeTreeTestBase {
                         0,
                         options.fileCompressionPerLevel(),
                         options.fileCompression(),
+                        options.fileFormatPerLevel(),
                         options);
         writer = createMergeTreeWriter(Collections.emptyList());
     }
@@ -406,7 +412,7 @@ public abstract class MergeTreeTestBase {
 
         writer.close();
 
-        Path bucketDir = writerFactory.pathFactory().toPath("ignore").getParent();
+        Path bucketDir = writerFactory.pathFactory(identifier).toPath("ignore").getParent();
         Set<String> files =
                 Arrays.stream(LocalFileIO.create().listStatus(bucketDir))
                         .map(FileStatus::getPath)
@@ -502,7 +508,7 @@ public abstract class MergeTreeTestBase {
             assertThat(remove).isTrue();
             // See MergeTreeWriter.updateCompactResult
             if (!newFileNames.contains(file.fileName()) && !afterFiles.contains(file.fileName())) {
-                compactWriterFactory.deleteFile(file.fileName());
+                compactWriterFactory.deleteFile(file.fileName(), file.level());
             }
         }
         compactedFiles.addAll(increment.compactIncrement().compactAfter());
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 746d097ab..e38bea757 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -343,6 +343,24 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                         "Can not set the write-mode to append-only and changelog-producer at the same time.");
     }
 
+    @Test
+    public void testFileFormatPerLevel() {
+        sql(
+                "CREATE TABLE T1 (a INT PRIMARY KEY NOT ENFORCED, b STRING) "
+                        + "WITH ('num-sorted-run.compaction-trigger'='2',"
+                        + "'file.format.per.level' = '0:avro,3:parquet',"
+                        + " 'num-levels' = '4')");
+        sql("INSERT INTO T1 SELECT 1,'AAA'");
+        sql("INSERT INTO T1 SELECT 2,'BBB'");
+        sql("INSERT INTO T1 SELECT 3,'CCC'");
+        List<Row> rows = sql("SELECT * FROM T1");
+        assertThat(rows)
+                .containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC"));
+
+        rows = sql("SELECT level,file_format FROM T1$files");
+        assertThat(rows).containsExactlyInAnyOrder(Row.of(3, "parquet"), Row.of(0, "avro"));
+    }
+
     @Test
     public void testFilesTable() throws Exception {
         sql(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 3106ed679..a8741b450 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -57,7 +57,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.function.Function;
 
@@ -171,6 +173,9 @@ public class TestChangelogDataReadWrite {
     public RecordWriter<KeyValue> createMergeTreeWriter(BinaryRow partition, int bucket) {
         CoreOptions options =
                 new CoreOptions(Collections.singletonMap(CoreOptions.FILE_FORMAT.key(), "avro"));
+
+        Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
+        pathFactoryMap.put("avro", pathFactory);
         RecordWriter<KeyValue> writer =
                 new KeyValueFileStoreWrite(
                                 LocalFileIO.create(),
@@ -183,6 +188,7 @@ public class TestChangelogDataReadWrite {
                                 () -> EQUALISER,
                                 DeduplicateMergeFunction.factory(),
                                 pathFactory,
+                                pathFactoryMap,
                                 snapshotManager,
                                 null, // not used, we only create an empty writer
                                 null,