You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/07/20 08:30:32 UTC
[incubator-paimon] 01/02: [core] add file-format-per-level for paimon
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 78f78053075e0e88925d2c98bcfeaf1462201ca1
Author: zhangjun <zh...@126.com>
AuthorDate: Tue Jul 4 22:56:27 2023 +0800
[core] add file-format-per-level for paimon
This closes #1500
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 17 +++
.../java/org/apache/paimon/KeyValueFileStore.java | 22 ++++
.../paimon/io/KeyValueFileWriterFactory.java | 114 +++++++++++++++++----
.../apache/paimon/mergetree/MergeTreeWriter.java | 8 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 6 +-
.../apache/paimon/format/FileFormatSuffixTest.java | 2 +-
.../paimon/io/KeyValueFileReadWriteTest.java | 23 ++++-
.../apache/paimon/mergetree/LookupLevelsTest.java | 15 ++-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 16 ++-
.../apache/paimon/flink/CatalogTableITCase.java | 18 ++++
.../flink/source/TestChangelogDataReadWrite.java | 6 ++
12 files changed, 216 insertions(+), 37 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 36dba2b52..d5a09f48a 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -152,6 +152,12 @@ under the License.
<td><p>Enum</p></td>
<td>Specify the message format of data files, currently orc, parquet and avro are supported.<br /><br />Possible values:<ul><li>"orc": ORC file format.</li><li>"parquet": Parquet file format.</li><li>"avro": Avro file format.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>file.format.per.level</h5></td>
+ <td style="word-wrap: break-word;"></td>
+ <td>Map</td>
+ <td>Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used.</td>
+ </tr>
<tr>
<td><h5>full-compaction.delta-commits</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 3552bc21d..084cf45df 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -117,6 +117,17 @@ public class CoreOptions implements Serializable {
+ "could be NONE, ZLIB, SNAPPY, LZO, LZ4, for parquet file format, the compression value could be "
+ "UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD.");
+ public static final ConfigOption<Map<String, String>> FILE_FORMAT_PER_LEVEL =
+ key("file.format.per.level")
+ .mapType()
+ .defaultValue(new HashMap<>())
+ .withDescription(
+ "Define different file format for different level, you can add the conf like this:"
+ + " 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, "
+ + "the default format which set by `"
+ + FILE_FORMAT.key()
+ + "` will be used.");
+
public static final ConfigOption<String> FILE_COMPRESSION =
key("file.compression")
.stringType()
@@ -885,6 +896,12 @@ public class CoreOptions implements Serializable {
.collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()), Map.Entry::getValue));
}
+ public Map<Integer, String> fileFormatPerLevel() {
+ Map<String, String> levelFormats = options.get(FILE_FORMAT_PER_LEVEL);
+ return levelFormats.entrySet().stream()
+ .collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()), Map.Entry::getValue));
+ }
+
public String fileCompression() {
return options.get(FILE_COMPRESSION);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index ad0ad67dd..ba1ef81db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -35,11 +35,16 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.ValueEqualiserSupplier;
import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.function.Supplier;
import static org.apache.paimon.predicate.PredicateBuilder.and;
@@ -127,6 +132,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
valueEqualiserSupplier,
mfFactory,
pathFactory(),
+ format2PathFactory(),
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
indexFactory,
@@ -134,6 +140,22 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
keyValueFieldsExtractor);
}
+ private Map<String, FileStorePathFactory> format2PathFactory() {
+ Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
+ Set<String> formats = new HashSet<>(options.fileFormatPerLevel().values());
+ formats.add(options.fileFormat().getFormatIdentifier());
+ formats.forEach(
+ format ->
+ pathFactoryMap.put(
+ format,
+ new FileStorePathFactory(
+ options.path(),
+ partitionType,
+ options.partitionDefaultName(),
+ format)));
+ return pathFactoryMap;
+ }
+
private KeyValueFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index f3912daf5..a19db4aa5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -34,7 +34,9 @@ import org.apache.paimon.utils.StatsCollectorFactories;
import javax.annotation.Nullable;
+import java.util.HashMap;
import java.util.Map;
+import java.util.stream.Collectors;
/** A factory to create {@link FileWriter}s for writing {@link KeyValue} files. */
public class KeyValueFileWriterFactory {
@@ -43,12 +45,15 @@ public class KeyValueFileWriterFactory {
private final long schemaId;
private final RowType keyType;
private final RowType valueType;
- private final FormatWriterFactory writerFactory;
+ private final RowType recordType;
+ private final Map<String, FormatWriterFactory> writerFactoryMap;
@Nullable private final TableStatsExtractor tableStatsExtractor;
- private final DataFilePathFactory pathFactory;
+ private final Map<String, DataFilePathFactory> pathFactoryMap;
private final long suggestedFileSize;
private final Map<Integer, String> levelCompressions;
private final String fileCompression;
+ private final Map<Integer, String> levelFormats;
+ private final FileFormat fileFormat;
private final CoreOptions options;
private KeyValueFileWriterFactory(
@@ -56,23 +61,29 @@ public class KeyValueFileWriterFactory {
long schemaId,
RowType keyType,
RowType valueType,
- FormatWriterFactory writerFactory,
+ RowType recordType,
+ FileFormat fileFormat,
+ Map<String, FormatWriterFactory> writerFactoryMap,
@Nullable TableStatsExtractor tableStatsExtractor,
- DataFilePathFactory pathFactory,
+ Map<String, DataFilePathFactory> pathFactoryMap,
long suggestedFileSize,
Map<Integer, String> levelCompressions,
String fileCompression,
+ Map<Integer, String> levelFormats,
CoreOptions options) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.keyType = keyType;
this.valueType = valueType;
- this.writerFactory = writerFactory;
+ this.recordType = recordType;
+ this.fileFormat = fileFormat;
+ this.writerFactoryMap = writerFactoryMap;
this.tableStatsExtractor = tableStatsExtractor;
- this.pathFactory = pathFactory;
+ this.pathFactoryMap = pathFactoryMap;
this.suggestedFileSize = suggestedFileSize;
this.levelCompressions = levelCompressions;
this.fileCompression = fileCompression;
+ this.levelFormats = levelFormats;
this.options = options;
}
@@ -85,13 +96,18 @@ public class KeyValueFileWriterFactory {
}
@VisibleForTesting
- public DataFilePathFactory pathFactory() {
- return pathFactory;
+ public DataFilePathFactory pathFactory(String format) {
+ return pathFactoryMap.get(format);
}
public RollingFileWriter<KeyValue, DataFileMeta> createRollingMergeTreeFileWriter(int level) {
+ String fileFormat = getFileFormat(level);
return new RollingFileWriter<>(
- () -> createDataFileWriter(pathFactory.newPath(), level, getCompression(level)),
+ () ->
+ createDataFileWriter(
+ pathFactoryMap.get(fileFormat).newPath(),
+ level,
+ getCompression(level)),
suggestedFileSize);
}
@@ -103,32 +119,55 @@ public class KeyValueFileWriterFactory {
}
}
+ private String getFileFormat(int level) {
+ if (null == levelFormats) {
+ return fileFormat.getFormatIdentifier();
+ } else {
+ return levelFormats.getOrDefault(level, fileFormat.getFormatIdentifier());
+ }
+ }
+
public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWriter(int level) {
+
return new RollingFileWriter<>(
() ->
createDataFileWriter(
- pathFactory.newChangelogPath(), level, getCompression(level)),
+ pathFactoryMap.get(getFileFormat(level)).newChangelogPath(),
+ level,
+ getCompression(level)),
suggestedFileSize);
}
private KeyValueDataFileWriter createDataFileWriter(Path path, int level, String compression) {
KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType);
+ String fileFormat = getFileFormat(level);
return new KeyValueDataFileWriter(
fileIO,
- writerFactory,
+ writerFactoryMap.get(fileFormat),
path,
kvSerializer::toRow,
keyType,
valueType,
- tableStatsExtractor,
+ getTableStatsExtractor(fileFormat),
schemaId,
level,
compression,
options);
}
- public void deleteFile(String filename) {
- fileIO.deleteQuietly(pathFactory.toPath(filename));
+ private TableStatsExtractor getTableStatsExtractor(String fileFormat) {
+ return null == fileFormat
+ ? tableStatsExtractor
+ : FileFormat.fromIdentifier(fileFormat, options.toConfiguration())
+ .createStatsExtractor(
+ recordType,
+ StatsCollectorFactories.createStatsFactories(
+ options, recordType.getFieldNames()))
+ .orElse(null);
+ }
+
+ public void deleteFile(String filename, int level) {
+ fileIO.deleteQuietly(pathFactoryMap.get(getFileFormat(level)).toPath(filename));
}
public static Builder builder(
@@ -137,10 +176,16 @@ public class KeyValueFileWriterFactory {
RowType keyType,
RowType valueType,
FileFormat fileFormat,
- FileStorePathFactory pathFactory,
+ Map<String, FileStorePathFactory> format2PathFactory,
long suggestedFileSize) {
return new Builder(
- fileIO, schemaId, keyType, valueType, fileFormat, pathFactory, suggestedFileSize);
+ fileIO,
+ schemaId,
+ keyType,
+ valueType,
+ fileFormat,
+ format2PathFactory,
+ suggestedFileSize);
}
/** Builder of {@link KeyValueFileWriterFactory}. */
@@ -151,7 +196,7 @@ public class KeyValueFileWriterFactory {
private final RowType keyType;
private final RowType valueType;
private final FileFormat fileFormat;
- private final FileStorePathFactory pathFactory;
+ private final Map<String, FileStorePathFactory> format2PathFactory;
private final long suggestedFileSize;
private Builder(
@@ -160,14 +205,14 @@ public class KeyValueFileWriterFactory {
RowType keyType,
RowType valueType,
FileFormat fileFormat,
- FileStorePathFactory pathFactory,
+ Map<String, FileStorePathFactory> format2PathFactory,
long suggestedFileSize) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.keyType = keyType;
this.valueType = valueType;
this.fileFormat = fileFormat;
- this.pathFactory = pathFactory;
+ this.format2PathFactory = format2PathFactory;
this.suggestedFileSize = suggestedFileSize;
}
@@ -176,24 +221,51 @@ public class KeyValueFileWriterFactory {
int bucket,
Map<Integer, String> levelCompressions,
String fileCompression,
+ Map<Integer, String> levelFormats,
CoreOptions options) {
RowType recordType = KeyValue.schema(keyType, valueType);
+
+ Map<String, FormatWriterFactory> writerFactoryMap = new HashMap<>();
+ writerFactoryMap.put(
+ fileFormat.getFormatIdentifier(), fileFormat.createWriterFactory(recordType));
+ if (null != levelFormats) {
+ for (String fileFormat : levelFormats.values()) {
+ writerFactoryMap.putIfAbsent(
+ fileFormat,
+ FileFormat.fromIdentifier(fileFormat, options.toConfiguration())
+ .createWriterFactory(recordType));
+ }
+ }
+
+ Map<String, DataFilePathFactory> dataFilePathFactoryMap =
+ format2PathFactory.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ e ->
+ e.getValue()
+ .createDataFilePathFactory(
+ partition, bucket)));
+
return new KeyValueFileWriterFactory(
fileIO,
schemaId,
keyType,
valueType,
- fileFormat.createWriterFactory(recordType),
+ recordType,
+ fileFormat,
+ writerFactoryMap,
fileFormat
.createStatsExtractor(
recordType,
StatsCollectorFactories.createStatsFactories(
options, recordType.getFieldNames()))
.orElse(null),
- pathFactory.createDataFilePathFactory(partition, bucket),
+ dataFilePathFactoryMap,
suggestedFileSize,
levelCompressions,
fileCompression,
+ levelFormats,
options);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index dfe22f2fe..c43f97604 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -270,7 +270,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
// 2. This file is not the input of upgraded.
if (!compactBefore.containsKey(file.fileName())
&& !afterFiles.contains(file.fileName())) {
- writerFactory.deleteFile(file.fileName());
+ writerFactory.deleteFile(file.fileName(), file.level());
}
} else {
compactBefore.put(file.fileName(), file);
@@ -297,7 +297,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
newFiles.clear();
for (DataFileMeta file : newFilesChangelog) {
- writerFactory.deleteFile(file.fileName());
+ writerFactory.deleteFile(file.fileName(), file.level());
}
newFilesChangelog.clear();
@@ -312,12 +312,12 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
compactAfter.clear();
for (DataFileMeta file : compactChangelog) {
- writerFactory.deleteFile(file.fileName());
+ writerFactory.deleteFile(file.fileName(), file.level());
}
compactChangelog.clear();
for (DataFileMeta file : delete) {
- writerFactory.deleteFile(file.fileName());
+ writerFactory.deleteFile(file.fileName(), file.level());
}
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 4c5d6926c..06bb373cf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -62,6 +62,7 @@ import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
@@ -93,6 +94,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
Supplier<RecordEqualiser> valueEqualiserSupplier,
MergeFunctionFactory<KeyValue> mfFactory,
FileStorePathFactory pathFactory,
+ Map<String, FileStorePathFactory> format2PathFactory,
SnapshotManager snapshotManager,
FileStoreScan scan,
@Nullable IndexMaintainer.Factory<KeyValue> indexFactory,
@@ -119,7 +121,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
keyType,
valueType,
options.fileFormat(),
- pathFactory,
+ format2PathFactory,
options.targetFileSize());
this.keyComparatorSupplier = keyComparatorSupplier;
this.valueEqualiserSupplier = valueEqualiserSupplier;
@@ -148,6 +150,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
bucket,
options.fileCompressionPerLevel(),
options.fileCompression(),
+ options.fileFormatPerLevel(),
options);
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
@@ -212,6 +215,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
bucket,
options.fileCompressionPerLevel(),
options.fileCompression(),
+ options.fileFormatPerLevel(),
options);
MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, ioManager);
switch (options.changelogProducer()) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 09b36d2e6..4b8e98ec2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -56,7 +56,7 @@ public class FileFormatSuffixTest extends KeyValueFileReadWriteTest {
public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception {
String format = "avro";
KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format);
- Path path = writerFactory.pathFactory().newPath();
+ Path path = writerFactory.pathFactory(format).newPath();
Assertions.assertTrue(path.getPath().endsWith(format));
DataFilePathFactory dataFilePathFactory =
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index d612a1d1b..0e2e5a3b1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -49,8 +49,10 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
@@ -244,6 +246,17 @@ public class KeyValueFileReadWriteTest {
FileIO fileIO = FileIOFinder.find(path);
Options options = new Options();
options.set(CoreOptions.METADATA_STATS_MODE, "FULL");
+
+ Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
+ pathFactoryMap.put(format, pathFactory);
+ pathFactoryMap.put(
+ CoreOptions.FILE_FORMAT.defaultValue().toString(),
+ new FileStorePathFactory(
+ path,
+ RowType.of(),
+ CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
+ CoreOptions.FILE_FORMAT.defaultValue().toString()));
+
return KeyValueFileWriterFactory.builder(
fileIO,
0,
@@ -253,9 +266,15 @@ public class KeyValueFileReadWriteTest {
// if the written file size is really larger than suggested, so we use a
// special format which flushes for every added element
new FlushingFileFormat(format),
- pathFactory,
+ pathFactoryMap,
suggestedFileSize)
- .build(BinaryRow.EMPTY_ROW, 0, null, null, new CoreOptions(options));
+ .build(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ null,
+ null,
+ new HashMap<>(),
+ new CoreOptions(options));
}
private KeyValueFileReaderFactory createReaderFactory(
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 114ff92a3..3c6c19182 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -228,15 +228,24 @@ public class LookupLevelsTest {
private KeyValueFileWriterFactory createWriterFactory() {
Path path = new Path(tempDir.toUri().toString());
+ String identifier = "avro";
+ Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
+ pathFactoryMap.put(identifier, new FileStorePathFactory(path));
return KeyValueFileWriterFactory.builder(
FileIOFinder.find(path),
0,
keyType,
rowType,
- new FlushingFileFormat("avro"),
- new FileStorePathFactory(path),
+ new FlushingFileFormat(identifier),
+ pathFactoryMap,
TARGET_FILE_SIZE.defaultValue().getBytes())
- .build(BinaryRow.EMPTY_ROW, 0, null, null, new CoreOptions(new Options()));
+ .build(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ null,
+ null,
+ new HashMap<>(),
+ new CoreOptions(new Options()));
}
private KeyValueFileReaderFactory createReaderFactory() {
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index fd36afa8a..2983028b7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -105,6 +105,7 @@ public abstract class MergeTreeTestBase {
private KeyValueFileWriterFactory compactWriterFactory;
private MergeTreeWriter writer;
private SortEngine sortEngine;
+ private String identifier = "avro";
@BeforeEach
public void beforeEach() throws IOException {
@@ -113,7 +114,7 @@ public abstract class MergeTreeTestBase {
comparator = Comparator.comparingInt(o -> o.getInt(0));
sortEngine = getSortEngine();
recreateMergeTree(1024 * 1024);
- Path bucketDir = writerFactory.pathFactory().toPath("ignore").getParent();
+ Path bucketDir = writerFactory.pathFactory(identifier).toPath("ignore").getParent();
LocalFileIO.create().mkdirs(bucketDir);
}
@@ -142,7 +143,7 @@ public abstract class MergeTreeTestBase {
RowType keyType = new RowType(singletonList(new DataField(0, "k", new IntType())));
RowType valueType = new RowType(singletonList(new DataField(0, "v", new IntType())));
- FileFormat flushingAvro = new FlushingFileFormat("avro");
+ FileFormat flushingAvro = new FlushingFileFormat(identifier);
KeyValueFileReaderFactory.Builder readerFactoryBuilder =
KeyValueFileReaderFactory.builder(
LocalFileIO.create(),
@@ -173,6 +174,9 @@ public abstract class MergeTreeTestBase {
});
readerFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0);
compactReaderFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0);
+
+ Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
+ pathFactoryMap.put(identifier, pathFactory);
KeyValueFileWriterFactory.Builder writerFactoryBuilder =
KeyValueFileWriterFactory.builder(
LocalFileIO.create(),
@@ -180,7 +184,7 @@ public abstract class MergeTreeTestBase {
keyType,
valueType,
flushingAvro,
- pathFactory,
+ pathFactoryMap,
options.targetFileSize());
writerFactory =
writerFactoryBuilder.build(
@@ -188,6 +192,7 @@ public abstract class MergeTreeTestBase {
0,
options.fileCompressionPerLevel(),
options.fileCompression(),
+ options.fileFormatPerLevel(),
options);
compactWriterFactory =
writerFactoryBuilder.build(
@@ -195,6 +200,7 @@ public abstract class MergeTreeTestBase {
0,
options.fileCompressionPerLevel(),
options.fileCompression(),
+ options.fileFormatPerLevel(),
options);
writer = createMergeTreeWriter(Collections.emptyList());
}
@@ -406,7 +412,7 @@ public abstract class MergeTreeTestBase {
writer.close();
- Path bucketDir = writerFactory.pathFactory().toPath("ignore").getParent();
+ Path bucketDir = writerFactory.pathFactory(identifier).toPath("ignore").getParent();
Set<String> files =
Arrays.stream(LocalFileIO.create().listStatus(bucketDir))
.map(FileStatus::getPath)
@@ -502,7 +508,7 @@ public abstract class MergeTreeTestBase {
assertThat(remove).isTrue();
// See MergeTreeWriter.updateCompactResult
if (!newFileNames.contains(file.fileName()) && !afterFiles.contains(file.fileName())) {
- compactWriterFactory.deleteFile(file.fileName());
+ compactWriterFactory.deleteFile(file.fileName(), file.level());
}
}
compactedFiles.addAll(increment.compactIncrement().compactAfter());
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 746d097ab..e38bea757 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -343,6 +343,24 @@ public class CatalogTableITCase extends CatalogITCaseBase {
"Can not set the write-mode to append-only and changelog-producer at the same time.");
}
+ @Test
+ public void testFileFormatPerLevel() {
+ sql(
+ "CREATE TABLE T1 (a INT PRIMARY KEY NOT ENFORCED, b STRING) "
+ + "WITH ('num-sorted-run.compaction-trigger'='2',"
+ + "'file.format.per.level' = '0:avro,3:parquet',"
+ + " 'num-levels' = '4')");
+ sql("INSERT INTO T1 SELECT 1,'AAA'");
+ sql("INSERT INTO T1 SELECT 2,'BBB'");
+ sql("INSERT INTO T1 SELECT 3,'CCC'");
+ List<Row> rows = sql("SELECT * FROM T1");
+ assertThat(rows)
+ .containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC"));
+
+ rows = sql("SELECT level,file_format FROM T1$files");
+ assertThat(rows).containsExactlyInAnyOrder(Row.of(3, "parquet"), Row.of(0, "avro"));
+ }
+
@Test
public void testFilesTable() throws Exception {
sql(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 3106ed679..a8741b450 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -57,7 +57,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
@@ -171,6 +173,9 @@ public class TestChangelogDataReadWrite {
public RecordWriter<KeyValue> createMergeTreeWriter(BinaryRow partition, int bucket) {
CoreOptions options =
new CoreOptions(Collections.singletonMap(CoreOptions.FILE_FORMAT.key(), "avro"));
+
+ Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
+ pathFactoryMap.put("avro", pathFactory);
RecordWriter<KeyValue> writer =
new KeyValueFileStoreWrite(
LocalFileIO.create(),
@@ -183,6 +188,7 @@ public class TestChangelogDataReadWrite {
() -> EQUALISER,
DeduplicateMergeFunction.factory(),
pathFactory,
+ pathFactoryMap,
snapshotManager,
null, // not used, we only create an empty writer
null,