You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/06 01:43:59 UTC
[flink-table-store] branch master updated: [FLINK-30569] File Format can not change with data file exists
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 63a27cd7 [FLINK-30569] File Format can not change with data file exists
63a27cd7 is described below
commit 63a27cd7af945839f67afaf6e946bcf617ad18a2
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Jan 6 09:43:54 2023 +0800
[FLINK-30569] File Format can not change with data file exists
This closes #459
---
.../file/mergetree/MergeTreeBenchmark.java | 2 +-
.../source/TestChangelogDataReadWrite.java | 2 +-
.../org/apache/flink/table/store/CoreOptions.java | 4 ++
.../table/store/file/AppendOnlyFileStore.java | 3 +-
.../flink/table/store/file/KeyValueFileStore.java | 3 +-
.../table/store/file/io/DataFilePathFactory.java | 9 ++++
.../store/file/io/KeyValueFileReaderFactory.java | 31 +++++++++-----
.../file/operation/AppendOnlyFileStoreRead.java | 24 ++++++-----
.../file/operation/KeyValueFileStoreRead.java | 6 +--
.../file/operation/KeyValueFileStoreWrite.java | 3 +-
.../table/store/file/utils/BulkFormatMapping.java | 19 ++++----
.../table/store/format/FileFormatDiscover.java | 45 +++++++++++++++++++
.../apache/flink/table/store/format/FormatKey.java | 50 ++++++++++++++++++++++
.../store/file/io/KeyValueFileReadWriteTest.java | 4 +-
.../table/store/file/mergetree/MergeTreeTest.java | 2 +-
.../table/store/table/FileStoreTableTestBase.java | 35 +++++++++++++++
16 files changed, 202 insertions(+), 40 deletions(-)
diff --git a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java
index 70304ee3..a01ab499 100644
--- a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java
+++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/src/main/java/org/apache/flink/table/store/benchmark/file/mergetree/MergeTreeBenchmark.java
@@ -144,7 +144,7 @@ public class MergeTreeBenchmark {
0,
keyType,
valueType,
- flushingFormat,
+ ignore -> flushingFormat,
pathFactory,
new KeyValueFieldsExtractor() {
@Override
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 3df55343..514ea55a 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -127,7 +127,7 @@ public class TestChangelogDataReadWrite {
VALUE_TYPE,
COMPARATOR,
DeduplicateMergeFunction.factory(),
- avro,
+ ignore -> avro,
pathFactory,
EXTRACTOR);
return new KeyValueTableRead(read) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index ba61e105..368620d6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -405,6 +405,10 @@ public class CoreOptions implements Serializable {
this.options = options;
}
+ public Configuration toConfiguration() {
+ return options;
+ }
+
public Map<String, String> toMap() {
return options.toMap();
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index 0d889213..e59a82b6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreRead;
import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreWrite;
import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.format.FileFormatDiscover;
import org.apache.flink.table.types.logical.RowType;
import java.util.Comparator;
@@ -54,7 +55,7 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
@Override
public AppendOnlyFileStoreRead newRead() {
return new AppendOnlyFileStoreRead(
- schemaManager, schemaId, rowType, options.fileFormat(), pathFactory());
+ schemaManager, schemaId, rowType, FileFormatDiscover.of(options), pathFactory());
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 780eabb4..06e88064 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite;
import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
+import org.apache.flink.table.store.format.FileFormatDiscover;
import org.apache.flink.table.types.logical.RowType;
import java.util.Comparator;
@@ -77,7 +78,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
valueType,
newKeyComparator(),
mfFactory,
- options.fileFormat(),
+ FileFormatDiscover.of(options),
pathFactory(),
keyValueFieldsExtractor);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFilePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFilePathFactory.java
index f762963c..4c7019b0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFilePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFilePathFactory.java
@@ -73,4 +73,13 @@ public class DataFilePathFactory {
public static Path bucketPath(Path tablePath, String partition, int bucket) {
return new Path(tablePath + "/" + partition + "/bucket-" + bucket);
}
+
+ public static String formatIdentifier(String fileName) {
+ int index = fileName.lastIndexOf('.');
+ if (index == -1) {
+ throw new IllegalArgumentException(fileName + " is not a legal file name.");
+ }
+
+ return fileName.substring(index + 1);
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileReaderFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileReaderFactory.java
index 424847dd..22152a74 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileReaderFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileReaderFactory.java
@@ -27,7 +27,8 @@ import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.BulkFormatMapping;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormatDiscover;
+import org.apache.flink.table.store.format.FormatKey;
import org.apache.flink.table.store.utils.Projection;
import org.apache.flink.table.types.logical.RowType;
@@ -48,7 +49,7 @@ public class KeyValueFileReaderFactory {
private final RowType valueType;
private final BulkFormatMapping.BulkFormatMappingBuilder bulkFormatMappingBuilder;
- private final Map<Long, BulkFormatMapping> bulkFormatMappings;
+ private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final DataFilePathFactory pathFactory;
private KeyValueFileReaderFactory(
@@ -69,13 +70,15 @@ public class KeyValueFileReaderFactory {
public RecordReader<KeyValue> createRecordReader(long schemaId, String fileName, int level)
throws IOException {
+ String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName);
BulkFormatMapping bulkFormatMapping =
bulkFormatMappings.computeIfAbsent(
- schemaId,
+ new FormatKey(schemaId, formatIdentifier),
key -> {
TableSchema tableSchema = schemaManager.schema(this.schemaId);
- TableSchema dataSchema = schemaManager.schema(key);
- return bulkFormatMappingBuilder.build(tableSchema, dataSchema);
+ TableSchema dataSchema = schemaManager.schema(key.schemaId);
+ return bulkFormatMappingBuilder.build(
+ formatIdentifier, tableSchema, dataSchema);
});
return new KeyValueDataFileRecordReader(
bulkFormatMapping.getReaderFactory(),
@@ -91,11 +94,17 @@ public class KeyValueFileReaderFactory {
long schemaId,
RowType keyType,
RowType valueType,
- FileFormat fileFormat,
+ FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
KeyValueFieldsExtractor extractor) {
return new Builder(
- schemaManager, schemaId, keyType, valueType, fileFormat, pathFactory, extractor);
+ schemaManager,
+ schemaId,
+ keyType,
+ valueType,
+ formatDiscover,
+ pathFactory,
+ extractor);
}
/** Builder for {@link KeyValueFileReaderFactory}. */
@@ -105,7 +114,7 @@ public class KeyValueFileReaderFactory {
private final long schemaId;
private final RowType keyType;
private final RowType valueType;
- private final FileFormat fileFormat;
+ private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
private final KeyValueFieldsExtractor extractor;
@@ -120,14 +129,14 @@ public class KeyValueFileReaderFactory {
long schemaId,
RowType keyType,
RowType valueType,
- FileFormat fileFormat,
+ FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
KeyValueFieldsExtractor extractor) {
this.schemaManager = schemaManager;
this.schemaId = schemaId;
this.keyType = keyType;
this.valueType = valueType;
- this.fileFormat = fileFormat;
+ this.formatDiscover = formatDiscover;
this.pathFactory = pathFactory;
this.extractor = extractor;
@@ -167,7 +176,7 @@ public class KeyValueFileReaderFactory {
projectedKeyType,
projectedValueType,
BulkFormatMapping.newBuilder(
- fileFormat, extractor, keyProjection, valueProjection, filters),
+ formatDiscover, extractor, keyProjection, valueProjection, filters),
pathFactory.createDataFilePathFactory(partition, bucket));
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
index 2dc3038a..3e99d8db 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -30,7 +30,8 @@ import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.BulkFormatMapping;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormatDiscover;
+import org.apache.flink.table.store.format.FormatKey;
import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.utils.Projection;
import org.apache.flink.table.types.logical.RowType;
@@ -51,9 +52,9 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
private final SchemaManager schemaManager;
private final long schemaId;
private final RowType rowType;
- private final FileFormat fileFormat;
+ private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
- private final Map<Long, BulkFormatMapping> bulkFormatMappings;
+ private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private int[][] projection;
@@ -63,12 +64,12 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
SchemaManager schemaManager,
long schemaId,
RowType rowType,
- FileFormat fileFormat,
+ FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory) {
this.schemaManager = schemaManager;
this.schemaId = schemaId;
this.rowType = rowType;
- this.fileFormat = fileFormat;
+ this.formatDiscover = formatDiscover;
this.pathFactory = pathFactory;
this.bulkFormatMappings = new HashMap<>();
@@ -92,12 +93,13 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
pathFactory.createDataFilePathFactory(split.partition(), split.bucket());
List<ConcatRecordReader.ReaderSupplier<RowData>> suppliers = new ArrayList<>();
for (DataFileMeta file : split.files()) {
+ String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName());
BulkFormatMapping bulkFormatMapping =
bulkFormatMappings.computeIfAbsent(
- file.schemaId(),
+ new FormatKey(file.schemaId(), formatIdentifier),
key -> {
TableSchema tableSchema = schemaManager.schema(this.schemaId);
- TableSchema dataSchema = schemaManager.schema(key);
+ TableSchema dataSchema = schemaManager.schema(key.schemaId);
int[][] dataProjection =
SchemaEvolutionUtil.createDataProjection(
tableSchema.fields(),
@@ -111,7 +113,7 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
Projection.of(dataProjection).toTopLevelIndexes(),
dataSchema.fields());
List<Predicate> dataFilters =
- this.schemaId == key
+ this.schemaId == key.schemaId
? filters
: SchemaEvolutionUtil.createDataFilters(
tableSchema.fields(),
@@ -119,8 +121,10 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
filters);
return new BulkFormatMapping(
indexMapping,
- fileFormat.createReaderFactory(
- rowType, dataProjection, dataFilters));
+ formatDiscover
+ .discover(formatIdentifier)
+ .createReaderFactory(
+ rowType, dataProjection, dataFilters));
});
suppliers.add(
() ->
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index 18633be8..84a939f9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -37,7 +37,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderUtils;
-import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormatDiscover;
import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.utils.ProjectedRowData;
import org.apache.flink.table.types.logical.RowType;
@@ -83,7 +83,7 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
RowType valueType,
Comparator<RowData> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory,
- FileFormat fileFormat,
+ FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
KeyValueFieldsExtractor extractor) {
this.tableSchema = schemaManager.schema(schemaId);
@@ -93,7 +93,7 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
schemaId,
keyType,
valueType,
- fileFormat,
+ formatDiscover,
pathFactory,
extractor);
this.keyComparator = keyComparator;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index ec6f8453..ee245ab3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.format.FileFormatDiscover;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
@@ -89,7 +90,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
schemaId,
keyType,
valueType,
- options.fileFormat(),
+ FileFormatDiscover.of(options),
pathFactory,
extractor);
this.writerFactoryBuilder =
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/BulkFormatMapping.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/BulkFormatMapping.java
index 6f064e12..3e831c98 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/BulkFormatMapping.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/BulkFormatMapping.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
import org.apache.flink.table.store.file.schema.RowDataType;
import org.apache.flink.table.store.file.schema.SchemaEvolutionUtil;
import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormatDiscover;
import org.apache.flink.table.store.utils.Projection;
import org.apache.flink.table.types.logical.RowType;
@@ -56,37 +56,38 @@ public class BulkFormatMapping {
}
public static BulkFormatMappingBuilder newBuilder(
- FileFormat fileFormat,
+ FileFormatDiscover formatDiscover,
KeyValueFieldsExtractor extractor,
int[][] keyProjection,
int[][] valueProjection,
@Nullable List<Predicate> filters) {
return new BulkFormatMappingBuilder(
- fileFormat, extractor, keyProjection, valueProjection, filters);
+ formatDiscover, extractor, keyProjection, valueProjection, filters);
}
/** Builder to build {@link BulkFormatMapping}. */
public static class BulkFormatMappingBuilder {
- private final FileFormat fileFormat;
+ private final FileFormatDiscover formatDiscover;
private final KeyValueFieldsExtractor extractor;
private final int[][] keyProjection;
private final int[][] valueProjection;
@Nullable private final List<Predicate> filters;
private BulkFormatMappingBuilder(
- FileFormat fileFormat,
+ FileFormatDiscover formatDiscover,
KeyValueFieldsExtractor extractor,
int[][] keyProjection,
int[][] valueProjection,
@Nullable List<Predicate> filters) {
- this.fileFormat = fileFormat;
+ this.formatDiscover = formatDiscover;
this.extractor = extractor;
this.keyProjection = keyProjection;
this.valueProjection = valueProjection;
this.filters = filters;
}
- public BulkFormatMapping build(TableSchema tableSchema, TableSchema dataSchema) {
+ public BulkFormatMapping build(
+ String formatIdentifier, TableSchema tableSchema, TableSchema dataSchema) {
List<DataField> tableKeyFields = extractor.keyFields(tableSchema);
List<DataField> tableValueFields = extractor.valueFields(tableSchema);
int[][] tableProjection =
@@ -146,7 +147,9 @@ public class BulkFormatMapping {
tableSchema.fields(), dataSchema.fields(), filters);
return new BulkFormatMapping(
indexMapping,
- fileFormat.createReaderFactory(dataRecordType, dataProjection, dataFilters));
+ formatDiscover
+ .discover(formatIdentifier)
+ .createReaderFactory(dataRecordType, dataProjection, dataFilters));
}
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/format/FileFormatDiscover.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/format/FileFormatDiscover.java
new file mode 100644
index 00000000..8a98573b
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/format/FileFormatDiscover.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format;
+
+import org.apache.flink.table.store.CoreOptions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** A class to discover {@link FileFormat}. */
+public interface FileFormatDiscover {
+
+ static FileFormatDiscover of(CoreOptions options) {
+ Map<String, FileFormat> formats = new HashMap<>();
+ return new FileFormatDiscover() {
+
+ @Override
+ public FileFormat discover(String identifier) {
+ return formats.computeIfAbsent(identifier, this::create);
+ }
+
+ private FileFormat create(String identifier) {
+ return FileFormat.fromIdentifier(identifier, options.toConfiguration());
+ }
+ };
+ }
+
+ FileFormat discover(String identifier);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/format/FormatKey.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/format/FormatKey.java
new file mode 100644
index 00000000..42f7799c
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/format/FormatKey.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format;
+
+import java.util.Objects;
+
+/** Format Key for read a file. */
+public class FormatKey {
+
+ public final long schemaId;
+ public final String format;
+
+ public FormatKey(long schemaId, String format) {
+ this.schemaId = schemaId;
+ this.format = format;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FormatKey formatKey = (FormatKey) o;
+ return schemaId == formatKey.schemaId && Objects.equals(format, formatKey.format);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schemaId, format);
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/KeyValueFileReadWriteTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/KeyValueFileReadWriteTest.java
index 74f80dfd..f39a5a9b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/KeyValueFileReadWriteTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/KeyValueFileReadWriteTest.java
@@ -68,7 +68,7 @@ public class KeyValueFileReadWriteTest {
public void testReadNonExistentFile() {
KeyValueFileReaderFactory readerFactory =
createReaderFactory(tempDir.toString(), "avro", null, null);
- assertThatThrownBy(() -> readerFactory.createRecordReader(0, "dummy_file", 0))
+ assertThatThrownBy(() -> readerFactory.createRecordReader(0, "dummy_file.avro", 0))
.hasMessageContaining(
"you can configure 'snapshot.time-retained' option with a larger value.");
}
@@ -263,7 +263,7 @@ public class KeyValueFileReadWriteTest {
0,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
- new FlushingFileFormat(format),
+ ignore -> new FlushingFileFormat(format),
pathFactory,
new TestKeyValueGenerator.TestKeyValueFieldsExtractor());
if (keyProjection != null) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 0ef7fdd5..f80052c6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -141,7 +141,7 @@ public class MergeTreeTest {
0,
keyType,
valueType,
- flushingAvro,
+ ignore -> flushingAvro,
pathFactory,
new KeyValueFieldsExtractor() {
@Override
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 8965e97f..a19a4e79 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -69,6 +69,7 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.store.CoreOptions.BUCKET;
import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
import static org.apache.flink.table.store.CoreOptions.COMPACTION_MAX_FILE_NUM;
+import static org.apache.flink.table.store.CoreOptions.FILE_FORMAT;
import static org.apache.flink.table.store.CoreOptions.WRITE_COMPACTION_SKIP;
import static org.assertj.core.api.Assertions.assertThat;
@@ -143,6 +144,40 @@ public abstract class FileStoreTableTestBase {
assertThat(traceableFileSystem.openOutputStreams(pathPredicate)).isEmpty();
}
+ @Test
+ public void testChangeFormat() throws Exception {
+ FileStoreTable table = createFileStoreTable(conf -> conf.set(FILE_FORMAT, "orc"));
+
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(2, 20, 200L));
+ commit.commit(0, write.prepareCommit(true, 0));
+ write.close();
+ commit.close();
+
+ assertThat(getResult(table.newRead(), table.newScan().plan().splits(), BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+ table = createFileStoreTable(conf -> conf.set(FILE_FORMAT, "avro"));
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+ write.write(rowData(1, 11, 111L));
+ write.write(rowData(2, 22, 222L));
+ commit.commit(1, write.prepareCommit(true, 1));
+ write.close();
+ commit.close();
+
+ assertThat(getResult(table.newRead(), table.newScan().plan().splits(), BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+ "1|11|111|binary|varbinary|mapKey:mapVal|multiset",
+ "2|22|222|binary|varbinary|mapKey:mapVal|multiset");
+ }
+
@Test
public void testOverwrite() throws Exception {
FileStoreTable table = createFileStoreTable();