You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/15 10:44:08 UTC
[flink-table-store] branch master updated: [FLINK-27957] Extract AppendOnlyFileStore out of KeyValueFileStore
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new a3e52f85 [FLINK-27957] Extract AppendOnlyFileStore out of KeyValueFileStore
a3e52f85 is described below
commit a3e52f85900758f6c6f4a751c1ac14d012e833a8
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Jun 15 18:44:03 2022 +0800
[FLINK-27957] Extract AppendOnlyFileStore out of KeyValueFileStore
This closes #148
---
.../store/connector/sink/StoreSinkCompactor.java | 6 +-
.../store/connector/sink/StoreSinkWriter.java | 6 +-
.../table/store/connector/sink/StoreSinkTest.java | 13 +-
.../table/store/connector/sink/TestFileStore.java | 44 ++-
.../store/connector/sink/TestFileStoreTable.java | 19 +-
.../source/FileStoreSourceSplitReaderTest.java | 17 +-
.../source/TestChangelogDataReadWrite.java | 28 +-
.../flink/table/store/file/AbstractFileStore.java | 122 +++++++++
.../table/store/file/AppendOnlyFileStore.java | 72 +++++
.../apache/flink/table/store/file/FileStore.java | 24 +-
.../flink/table/store/file/FileStoreImpl.java | 295 ---------------------
.../apache/flink/table/store/file/KeyValue.java | 4 +
.../flink/table/store/file/KeyValueFileStore.java | 96 +++++++
.../table/store/file/data/AppendOnlyReader.java | 78 ++++++
.../file/{writer => data}/AppendOnlyWriter.java | 27 +-
.../store/file/mergetree/MergeTreeReader.java | 4 +-
.../store/file/mergetree/MergeTreeWriter.java | 9 +-
.../file/mergetree/compact/ConcatRecordReader.java | 21 +-
...oreScanImpl.java => AbstractFileStoreScan.java} | 38 +--
.../file/operation/AbstractFileStoreWrite.java | 66 +++++
.../file/operation/AppendOnlyFileStoreRead.java | 88 ++++++
.../file/operation/AppendOnlyFileStoreScan.java | 64 +++++
.../file/operation/AppendOnlyFileStoreWrite.java | 89 +++++++
.../table/store/file/operation/FileStoreRead.java | 35 +--
.../table/store/file/operation/FileStoreScan.java | 4 -
.../table/store/file/operation/FileStoreWrite.java | 15 +-
...oreReadImpl.java => KeyValueFileStoreRead.java} | 73 ++---
.../file/operation/KeyValueFileStoreScan.java | 63 +++++
...eWriteImpl.java => KeyValueFileStoreWrite.java} | 88 +-----
.../table/store/file/writer/CompactWriter.java | 7 +-
.../table/store/file/writer/RecordWriter.java | 8 +-
.../store/table/AppendOnlyFileStoreTable.java | 49 ++--
.../table/ChangelogValueCountFileStoreTable.java | 27 +-
.../table/ChangelogWithKeyFileStoreTable.java | 27 +-
.../{TableWrite.java => AbstractTableWrite.java} | 45 ++--
.../flink/table/store/table/sink/TableWrite.java | 124 +--------
.../{TableRead.java => KeyValueTableRead.java} | 26 +-
.../flink/table/store/table/source/TableRead.java | 47 +---
.../flink/table/store/table/source/TableScan.java | 2 +-
.../flink/table/store/file/TestFileStore.java | 22 +-
.../{writer => data}/AppendOnlyWriterTest.java | 23 +-
.../table/store/file/mergetree/MergeTreeTest.java | 5 +-
...eadTest.java => KeyValueFileStoreReadTest.java} | 6 +-
...canTest.java => KeyValueFileStoreScanTest.java} | 27 +-
.../store/file/operation/TestCommitThread.java | 6 +-
.../store/table/AppendOnlyFileStoreTableTest.java | 19 +-
.../ChangelogValueCountFileStoreTableTest.java | 23 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 22 +-
.../table/store/format/FileFormatSuffixTest.java | 6 +-
49 files changed, 1100 insertions(+), 929 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
index d5fcf374..a0f7e1a3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
@@ -53,14 +53,14 @@ public class StoreSinkCompactor implements StatefulPrecommittingSinkWriter<Void>
private final int subTaskId;
private final int numOfParallelInstances;
- private final FileStore fileStore;
+ private final FileStore<?> fileStore;
private final Map<String, String> partitionSpec;
private final ExecutorService compactExecutor;
public StoreSinkCompactor(
int subTaskId,
int numOfParallelInstances,
- FileStore fileStore,
+ FileStore<?> fileStore,
Map<String, String> partitionSpec) {
this.subTaskId = subTaskId;
this.numOfParallelInstances = numOfParallelInstances;
@@ -120,7 +120,7 @@ public class StoreSinkCompactor implements StatefulPrecommittingSinkWriter<Void>
bucket,
subTaskId);
}
- RecordWriter writer =
+ RecordWriter<?> writer =
fileStore
.newWrite()
.createCompactWriter(
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 8229ea55..c3c6a7db 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.log.LogWriteCallback;
+import org.apache.flink.table.store.table.sink.AbstractTableWrite;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
@@ -143,8 +144,9 @@ public class StoreSinkWriter<WriterStateT>
}
}
+ @SuppressWarnings("unchecked")
@VisibleForTesting
- Map<BinaryRowData, Map<Integer, RecordWriter>> writers() {
- return write.writers();
+ Map<BinaryRowData, Map<Integer, RecordWriter<?>>> writers() {
+ return ((AbstractTableWrite) write).writers();
}
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index 2a6f03ea..edd19a5a 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -31,12 +31,12 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
import org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
+import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -105,15 +105,8 @@ public class StoreSinkTest {
new HashMap<>(),
""));
- RowType keyType = hasPk ? schema.logicalTrimmedPrimaryKeysType() : schema.logicalRowType();
- RowType valueType =
- hasPk
- ? schema.logicalRowType()
- : new RowType(
- Collections.singletonList(
- new RowType.RowField("COUNT", new BigIntType(false))));
RowType partitionType = schema.logicalPartitionType();
- fileStore = new TestFileStore(hasPk, keyType, valueType, partitionType);
+ fileStore = new TestFileStore(hasPk, partitionType);
table = new TestFileStoreTable(fileStore, schema);
}
@@ -241,7 +234,7 @@ public class StoreSinkTest {
}
List<Committable> committables = ((StoreSinkWriter) writer).prepareCommit();
- Map<BinaryRowData, Map<Integer, RecordWriter>> writers =
+ Map<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> writers =
new HashMap<>(((StoreSinkWriter) writer).writers());
assertThat(writers.size()).isGreaterThan(0);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 639d5294..109c692c 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.connector.sink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.mergetree.Increment;
@@ -51,31 +51,27 @@ import static org.apache.flink.table.store.file.mergetree.compact.CompactManager
import static org.apache.flink.table.store.file.stats.StatsTestUtils.newEmptyTableStats;
/** Test {@link FileStore}. */
-public class TestFileStore implements FileStore {
+public class TestFileStore implements FileStore<KeyValue> {
public final Set<ManifestCommittable> committed = new HashSet<>();
public final Map<BinaryRowData, Map<Integer, List<String>>> committedFiles = new HashMap<>();
public final boolean hasPk;
- private final RowType keyType;
- private final RowType valueType;
private final RowType partitionType;
public boolean expired = false;
- public TestFileStore(boolean hasPk, RowType keyType, RowType valueType, RowType partitionType) {
+ public TestFileStore(boolean hasPk, RowType partitionType) {
this.hasPk = hasPk;
- this.keyType = keyType;
- this.valueType = valueType;
this.partitionType = partitionType;
}
@Override
- public FileStoreWrite newWrite() {
- return new FileStoreWrite() {
+ public FileStoreWrite<KeyValue> newWrite() {
+ return new FileStoreWrite<KeyValue>() {
@Override
- public RecordWriter createWriter(
+ public RecordWriter<KeyValue> createWriter(
BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
TestRecordWriter writer = new TestRecordWriter(hasPk);
writer.records.addAll(
@@ -87,7 +83,7 @@ public class TestFileStore implements FileStore {
}
@Override
- public RecordWriter createEmptyWriter(
+ public RecordWriter<KeyValue> createEmptyWriter(
BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
return new TestRecordWriter(hasPk);
}
@@ -104,7 +100,7 @@ public class TestFileStore implements FileStore {
}
@Override
- public FileStoreRead newRead() {
+ public FileStoreRead<KeyValue> newRead() {
throw new UnsupportedOperationException();
}
@@ -128,16 +124,6 @@ public class TestFileStore implements FileStore {
};
}
- @Override
- public RowType keyType() {
- return keyType;
- }
-
- @Override
- public RowType valueType() {
- return valueType;
- }
-
@Override
public RowType partitionType() {
return partitionType;
@@ -153,7 +139,7 @@ public class TestFileStore implements FileStore {
throw new UnsupportedOperationException();
}
- static class TestRecordWriter implements RecordWriter {
+ static class TestRecordWriter implements RecordWriter<KeyValue> {
final List<String> records = new ArrayList<>();
final boolean hasPk;
@@ -186,17 +172,17 @@ public class TestFileStore implements FileStore {
}
@Override
- public void write(ValueKind valueKind, RowData key, RowData value) {
+ public void write(KeyValue kv) {
if (!hasPk) {
- assert value.getArity() == 1;
- assert value.getLong(0) >= -1L;
+ assert kv.value().getArity() == 1;
+ assert kv.value().getLong(0) >= -1L;
}
records.add(
- valueKind.toString()
+ kv.valueKind().toString()
+ "-key-"
- + rowToString(key, true)
+ + rowToString(kv.key(), true)
+ "-value-"
- + rowToString(value, false));
+ + rowToString(kv.value(), false));
}
@Override
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
index 6a870f1f..49a387ac 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
@@ -19,12 +19,13 @@
package org.apache.flink.table.store.connector.sink;
import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.AbstractTableWrite;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableCommit;
@@ -71,22 +72,24 @@ public class TestFileStoreTable implements FileStoreTable {
@Override
public TableWrite newWrite() {
- return new TableWrite(store.newWrite(), new SinkRecordConverter(2, schema)) {
+ return new AbstractTableWrite<KeyValue>(
+ store.newWrite(), new SinkRecordConverter(2, schema)) {
@Override
- protected void writeSinkRecord(SinkRecord record, RecordWriter writer)
+ protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
throws Exception {
boolean isInsert =
record.row().getRowKind() == RowKind.INSERT
|| record.row().getRowKind() == RowKind.UPDATE_AFTER;
+ KeyValue kv = new KeyValue();
if (store.hasPk) {
- writer.write(
- isInsert ? ValueKind.ADD : ValueKind.DELETE,
+ kv.replace(
record.primaryKey(),
+ isInsert ? ValueKind.ADD : ValueKind.DELETE,
record.row());
} else {
- writer.write(
- ValueKind.ADD, record.row(), GenericRowData.of(isInsert ? 1L : -1L));
+ kv.replace(record.row(), ValueKind.ADD, GenericRowData.of(isInsert ? 1L : -1L));
}
+ writer.write(kv);
}
};
}
@@ -97,7 +100,7 @@ public class TestFileStoreTable implements FileStoreTable {
}
@Override
- public FileStore store() {
+ public TestFileStore store() {
return store;
}
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index cfe688bc..9e3e17ae 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.writer.RecordWriter;
@@ -135,11 +136,21 @@ public class FileStoreSourceSplitReaderTest {
new FileStoreSourceSplitReader(rw.createReadWithKey().withIncremental(true));
List<Tuple2<Long, Long>> input = kvs();
- RecordWriter writer = rw.createMergeTreeWriter(row(1), 0);
+ RecordWriter<KeyValue> writer = rw.createMergeTreeWriter(row(1), 0);
for (Tuple2<Long, Long> tuple2 : input) {
- writer.write(ValueKind.ADD, GenericRowData.of(tuple2.f0), GenericRowData.of(tuple2.f1));
+ writer.write(
+ new KeyValue()
+ .replace(
+ GenericRowData.of(tuple2.f0),
+ ValueKind.ADD,
+ GenericRowData.of(tuple2.f1)));
}
- writer.write(ValueKind.DELETE, GenericRowData.of(222L), GenericRowData.of(333L));
+ writer.write(
+ new KeyValue()
+ .replace(
+ GenericRowData.of(222L),
+ ValueKind.DELETE,
+ GenericRowData.of(333L)));
List<DataFileMeta> files = writer.prepareCommit().newFiles();
writer.close();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 4aa5f332..d4d26f50 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -27,19 +27,18 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.operation.FileStoreRead;
-import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
-import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
@@ -100,18 +99,17 @@ public class TestChangelogDataReadWrite {
private TableRead createRead(
Function<RecordReader.RecordIterator<KeyValue>, RecordReader.RecordIterator<RowData>>
rowDataIteratorCreator) {
- FileStoreRead read =
- new FileStoreReadImpl(
+ KeyValueFileStoreRead read =
+ new KeyValueFileStoreRead(
new SchemaManager(tablePath),
0,
- WriteMode.CHANGE_LOG,
KEY_TYPE,
VALUE_TYPE,
COMPARATOR,
new DeduplicateMergeFunction(),
avro,
pathFactory);
- return new TableRead(read) {
+ return new KeyValueTableRead(read) {
@Override
public TableRead withProjection(int[][] projection) {
throw new UnsupportedOperationException();
@@ -135,19 +133,23 @@ public class TestChangelogDataReadWrite {
BinaryRowData partition, int bucket, List<Tuple2<Long, Long>> kvs) throws Exception {
Preconditions.checkNotNull(
service, "ExecutorService must be provided if writeFiles is needed");
- RecordWriter writer = createMergeTreeWriter(partition, bucket);
+ RecordWriter<KeyValue> writer = createMergeTreeWriter(partition, bucket);
for (Tuple2<Long, Long> tuple2 : kvs) {
- writer.write(ValueKind.ADD, GenericRowData.of(tuple2.f0), GenericRowData.of(tuple2.f1));
+ writer.write(
+ new KeyValue()
+ .replace(
+ GenericRowData.of(tuple2.f0),
+ ValueKind.ADD,
+ GenericRowData.of(tuple2.f1)));
}
List<DataFileMeta> files = writer.prepareCommit().newFiles();
writer.close();
return new ArrayList<>(files);
}
- public RecordWriter createMergeTreeWriter(BinaryRowData partition, int bucket) {
+ public RecordWriter<KeyValue> createMergeTreeWriter(BinaryRowData partition, int bucket) {
MergeTreeOptions options = new MergeTreeOptions(new Configuration());
- return new FileStoreWriteImpl(
- WriteMode.CHANGE_LOG,
+ return new KeyValueFileStoreWrite(
new SchemaManager(tablePath),
0,
KEY_TYPE,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
new file mode 100644
index 00000000..4e3338f5
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
+import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Base {@link FileStore} implementation.
+ *
+ * @param <T> type of record to read and write.
+ */
+public abstract class AbstractFileStore<T> implements FileStore<T> {
+
+ protected final SchemaManager schemaManager;
+ protected final long schemaId;
+ protected final FileStoreOptions options;
+ protected final String user;
+ protected final RowType partitionType;
+
+ public AbstractFileStore(
+ SchemaManager schemaManager,
+ long schemaId,
+ FileStoreOptions options,
+ String user,
+ RowType partitionType) {
+ this.schemaManager = schemaManager;
+ this.schemaId = schemaId;
+ this.options = options;
+ this.user = user;
+ this.partitionType = partitionType;
+ }
+
+ public FileStorePathFactory pathFactory() {
+ return new FileStorePathFactory(
+ options.path(),
+ partitionType,
+ options.partitionDefaultName(),
+ options.fileFormat().getFormatIdentifier());
+ }
+
+ @Override
+ public SnapshotManager snapshotManager() {
+ return new SnapshotManager(options.path());
+ }
+
+ @VisibleForTesting
+ public ManifestFile.Factory manifestFileFactory() {
+ return new ManifestFile.Factory(
+ schemaManager,
+ schemaId,
+ partitionType,
+ options.manifestFormat(),
+ pathFactory(),
+ options.manifestTargetSize().getBytes());
+ }
+
+ @VisibleForTesting
+ public ManifestList.Factory manifestListFactory() {
+ return new ManifestList.Factory(partitionType, options.manifestFormat(), pathFactory());
+ }
+
+ @Override
+ public RowType partitionType() {
+ return partitionType;
+ }
+
+ public FileStoreOptions options() {
+ return options;
+ }
+
+ @Override
+ public FileStoreCommitImpl newCommit() {
+ return new FileStoreCommitImpl(
+ schemaId,
+ user,
+ partitionType,
+ pathFactory(),
+ snapshotManager(),
+ manifestFileFactory(),
+ manifestListFactory(),
+ newScan(),
+ options.bucket(),
+ options.manifestTargetSize(),
+ options.manifestMergeMinCount());
+ }
+
+ @Override
+ public FileStoreExpireImpl newExpire() {
+ return new FileStoreExpireImpl(
+ options.snapshotNumRetainMin(),
+ options.snapshotNumRetainMax(),
+ options.snapshotTimeRetain().toMillis(),
+ pathFactory(),
+ snapshotManager(),
+ manifestFileFactory(),
+ manifestListFactory());
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
new file mode 100644
index 00000000..d4c0ac43
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreRead;
+import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
+import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreWrite;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link FileStore} for reading and writing {@link RowData}. */
+public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
+
+ private final RowType rowType;
+
+ public AppendOnlyFileStore(
+ SchemaManager schemaManager,
+ long schemaId,
+ FileStoreOptions options,
+ String user,
+ RowType partitionType,
+ RowType rowType) {
+ super(schemaManager, schemaId, options, user, partitionType);
+ this.rowType = rowType;
+ }
+
+ @Override
+ public AppendOnlyFileStoreScan newScan() {
+ return new AppendOnlyFileStoreScan(
+ partitionType,
+ rowType,
+ snapshotManager(),
+ manifestFileFactory(),
+ manifestListFactory(),
+ options.bucket());
+ }
+
+ @Override
+ public AppendOnlyFileStoreRead newRead() {
+ return new AppendOnlyFileStoreRead(
+ schemaManager, schemaId, rowType, options.fileFormat(), pathFactory());
+ }
+
+ @Override
+ public AppendOnlyFileStoreWrite newWrite() {
+ return new AppendOnlyFileStoreWrite(
+ schemaId,
+ rowType,
+ options.fileFormat(),
+ pathFactory(),
+ snapshotManager(),
+ newScan(),
+ options.mergeTreeOptions().targetFileSize);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
index c6c205ff..9df275aa 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
@@ -28,24 +28,24 @@ import org.apache.flink.table.types.logical.RowType;
import java.io.Serializable;
-/** File store interface. */
-public interface FileStore extends Serializable {
-
- FileStoreWrite newWrite();
-
- FileStoreRead newRead();
+/**
+ * File store interface.
+ *
+ * @param <T> type of record to read and write.
+ */
+public interface FileStore<T> extends Serializable {
- FileStoreCommit newCommit();
+ SnapshotManager snapshotManager();
- FileStoreExpire newExpire();
+ RowType partitionType();
FileStoreScan newScan();
- SnapshotManager snapshotManager();
+ FileStoreRead<T> newRead();
- RowType keyType();
+ FileStoreWrite<T> newWrite();
- RowType valueType();
+ FileStoreCommit newCommit();
- RowType partitionType();
+ FileStoreExpire newExpire();
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
deleted file mode 100644
index f391f3b4..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.manifest.ManifestFile;
-import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
-import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
-import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
-import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
-import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
-import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-
-import javax.annotation.Nullable;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-/** File store implementation. */
-public class FileStoreImpl implements FileStore {
-
- private final SchemaManager schemaManager;
- private final long schemaId;
- private final WriteMode writeMode;
- private final FileStoreOptions options;
- private final String user;
- private final RowType partitionType;
- private final RowType keyType;
- private final RowType valueType;
- private final Supplier<Comparator<RowData>> keyComparatorSupplier;
- @Nullable private final MergeFunction mergeFunction;
-
- public FileStoreImpl(
- SchemaManager schemaManager,
- long schemaId,
- FileStoreOptions options,
- WriteMode writeMode,
- String user,
- RowType partitionType,
- RowType keyType,
- RowType valueType,
- @Nullable MergeFunction mergeFunction) {
- this.schemaManager = schemaManager;
- this.schemaId = schemaId;
- this.options = options;
- this.writeMode = writeMode;
- this.user = user;
- this.partitionType = partitionType;
- this.keyType = keyType;
- this.valueType = valueType;
- this.mergeFunction = mergeFunction;
- this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
- }
-
- public FileStorePathFactory pathFactory() {
- return new FileStorePathFactory(
- options.path(),
- partitionType,
- options.partitionDefaultName(),
- options.fileFormat().getFormatIdentifier());
- }
-
- public FileStoreOptions options() {
- return options;
- }
-
- @VisibleForTesting
- public ManifestFile.Factory manifestFileFactory() {
- return new ManifestFile.Factory(
- schemaManager,
- schemaId,
- partitionType,
- options.manifestFormat(),
- pathFactory(),
- options.manifestTargetSize().getBytes());
- }
-
- @VisibleForTesting
- public ManifestList.Factory manifestListFactory() {
- return new ManifestList.Factory(partitionType, options.manifestFormat(), pathFactory());
- }
-
- @Override
- public FileStoreWriteImpl newWrite() {
- return new FileStoreWriteImpl(
- writeMode,
- schemaManager,
- schemaId,
- keyType,
- valueType,
- keyComparatorSupplier,
- mergeFunction,
- options.fileFormat(),
- pathFactory(),
- snapshotManager(),
- newScan(),
- options.mergeTreeOptions());
- }
-
- @Override
- public FileStoreReadImpl newRead() {
- return new FileStoreReadImpl(
- schemaManager,
- schemaId,
- writeMode,
- keyType,
- valueType,
- keyComparatorSupplier.get(),
- mergeFunction,
- options.fileFormat(),
- pathFactory());
- }
-
- @Override
- public FileStoreCommitImpl newCommit() {
- return new FileStoreCommitImpl(
- schemaId,
- user,
- partitionType,
- pathFactory(),
- snapshotManager(),
- manifestFileFactory(),
- manifestListFactory(),
- newScan(),
- options.bucket(),
- options.manifestTargetSize(),
- options.manifestMergeMinCount());
- }
-
- @Override
- public FileStoreExpireImpl newExpire() {
- return new FileStoreExpireImpl(
- options.snapshotNumRetainMin(),
- options.snapshotNumRetainMax(),
- options.snapshotTimeRetain().toMillis(),
- pathFactory(),
- snapshotManager(),
- manifestFileFactory(),
- manifestListFactory());
- }
-
- @Override
- public FileStoreScanImpl newScan() {
- return new FileStoreScanImpl(
- partitionType,
- keyType,
- valueType,
- snapshotManager(),
- manifestFileFactory(),
- manifestListFactory(),
- options.bucket());
- }
-
- @Override
- public SnapshotManager snapshotManager() {
- return new SnapshotManager(options.path());
- }
-
- @Override
- public RowType keyType() {
- return keyType;
- }
-
- @Override
- public RowType valueType() {
- return valueType;
- }
-
- @Override
- public RowType partitionType() {
- return partitionType;
- }
-
- public static FileStoreImpl createWithAppendOnly(
- SchemaManager schemaManager,
- long schemaId,
- FileStoreOptions options,
- String user,
- RowType partitionType,
- RowType rowType) {
- return new FileStoreImpl(
- schemaManager,
- schemaId,
- options,
- WriteMode.APPEND_ONLY,
- user,
- partitionType,
- RowType.of(),
- rowType,
- null);
- }
-
- public static FileStoreImpl createWithPrimaryKey(
- SchemaManager schemaManager,
- long schemaId,
- FileStoreOptions options,
- String user,
- RowType partitionType,
- RowType primaryKeyType,
- RowType rowType,
- FileStoreOptions.MergeEngine mergeEngine) {
- // add _KEY_ prefix to avoid conflict with value
- RowType keyType =
- new RowType(
- primaryKeyType.getFields().stream()
- .map(
- f ->
- new RowType.RowField(
- "_KEY_" + f.getName(),
- f.getType(),
- f.getDescription().orElse(null)))
- .collect(Collectors.toList()));
-
- MergeFunction mergeFunction;
- switch (mergeEngine) {
- case DEDUPLICATE:
- mergeFunction = new DeduplicateMergeFunction();
- break;
- case PARTIAL_UPDATE:
- List<LogicalType> fieldTypes = rowType.getChildren();
- RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.size()];
- for (int i = 0; i < fieldTypes.size(); i++) {
- fieldGetters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
- }
- mergeFunction = new PartialUpdateMergeFunction(fieldGetters);
- break;
- default:
- throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine);
- }
-
- return new FileStoreImpl(
- schemaManager,
- schemaId,
- options,
- WriteMode.CHANGE_LOG,
- user,
- partitionType,
- keyType,
- rowType,
- mergeFunction);
- }
-
- public static FileStoreImpl createWithValueCount(
- SchemaManager schemaManager,
- long schemaId,
- FileStoreOptions options,
- String user,
- RowType partitionType,
- RowType rowType) {
- RowType countType =
- RowType.of(
- new LogicalType[] {new BigIntType(false)}, new String[] {"_VALUE_COUNT"});
- MergeFunction mergeFunction = new ValueCountMergeFunction();
- return new FileStoreImpl(
- schemaManager,
- schemaId,
- options,
- WriteMode.CHANGE_LOG,
- user,
- partitionType,
- rowType,
- countType,
- mergeFunction);
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index a4703760..a5baa708 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -46,6 +46,10 @@ public class KeyValue {
return this;
}
+ public KeyValue replace(RowData key, ValueKind valueKind, RowData value) {
+ return replace(key, -1, valueKind, value);
+ }
+
public KeyValue replace(RowData key, long sequenceNumber, ValueKind valueKind, RowData value) {
this.key = key;
this.sequenceNumber = sequenceNumber;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
new file mode 100644
index 00000000..287f25d4
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Comparator;
+import java.util.function.Supplier;
+
+/** {@link FileStore} for querying and updating {@link KeyValue}s. */
+public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
+
+ private final RowType keyType;
+ private final RowType valueType;
+ private final Supplier<Comparator<RowData>> keyComparatorSupplier;
+ private final MergeFunction mergeFunction;
+
+ public KeyValueFileStore(
+ SchemaManager schemaManager,
+ long schemaId,
+ FileStoreOptions options,
+ String user,
+ RowType partitionType,
+ RowType keyType,
+ RowType valueType,
+ MergeFunction mergeFunction) {
+ super(schemaManager, schemaId, options, user, partitionType);
+ this.keyType = keyType;
+ this.valueType = valueType;
+ this.mergeFunction = mergeFunction;
+ this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
+ }
+
+ @Override
+ public KeyValueFileStoreScan newScan() {
+ return new KeyValueFileStoreScan(
+ partitionType,
+ keyType,
+ snapshotManager(),
+ manifestFileFactory(),
+ manifestListFactory(),
+ options.bucket());
+ }
+
+ @Override
+ public KeyValueFileStoreRead newRead() {
+ return new KeyValueFileStoreRead(
+ schemaManager,
+ schemaId,
+ keyType,
+ valueType,
+ keyComparatorSupplier.get(),
+ mergeFunction,
+ options.fileFormat(),
+ pathFactory());
+ }
+
+ @Override
+ public KeyValueFileStoreWrite newWrite() {
+ return new KeyValueFileStoreWrite(
+ schemaManager,
+ schemaId,
+ keyType,
+ valueType,
+ keyComparatorSupplier,
+ mergeFunction,
+ options.fileFormat(),
+ pathFactory(),
+ snapshotManager(),
+ newScan(),
+ options.mergeTreeOptions());
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
new file mode 100644
index 00000000..86ffaeed
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** Reads {@link RowData} from data files. */
+public class AppendOnlyReader implements RecordReader<RowData> {
+
+ private final BulkFormat.Reader<RowData> reader;
+
+ public AppendOnlyReader(Path path, BulkFormat<RowData, FileSourceSplit> readerFactory)
+ throws IOException {
+ long fileSize = FileUtils.getFileSize(path);
+ FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize, 0, fileSize);
+ this.reader = readerFactory.createReader(FileUtils.DEFAULT_READER_CONFIG, split);
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<RowData> readBatch() throws IOException {
+ BulkFormat.RecordIterator<RowData> iterator = reader.readBatch();
+ return iterator == null ? null : new AppendOnlyRecordIterator(iterator);
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ private static class AppendOnlyRecordIterator implements RecordReader.RecordIterator<RowData> {
+
+ private final BulkFormat.RecordIterator<RowData> iterator;
+
+ private AppendOnlyRecordIterator(BulkFormat.RecordIterator<RowData> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public RowData next() throws IOException {
+ RecordAndPosition<RowData> result = iterator.next();
+
+ // TODO schema evolution
+ return result == null ? null : result.getRecord();
+ }
+
+ @Override
+ public void releaseBatch() {
+ iterator.releaseBatch();
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
similarity index 86%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
index a3842c77..3c3f9e7e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
@@ -17,19 +17,23 @@
* under the License.
*/
-package org.apache.flink.table.store.file.writer;
+package org.apache.flink.table.store.file.data;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.data.DataFilePathFactory;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.stats.BinaryTableStats;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.writer.BaseFileWriter;
+import org.apache.flink.table.store.file.writer.FileWriter;
+import org.apache.flink.table.store.file.writer.Metric;
+import org.apache.flink.table.store.file.writer.MetricFileWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.file.writer.RollingFileWriter;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -42,7 +46,7 @@ import java.util.function.Supplier;
* A {@link RecordWriter} implementation that only accepts records which are always insert
* operations and don't have any unique keys or sort keys.
*/
-public class AppendOnlyWriter implements RecordWriter {
+public class AppendOnlyWriter implements RecordWriter<RowData> {
private final long schemaId;
private final long targetFileSize;
private final DataFilePathFactory pathFactory;
@@ -78,13 +82,12 @@ public class AppendOnlyWriter implements RecordWriter {
}
@Override
- public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+ public void write(RowData rowData) throws Exception {
Preconditions.checkArgument(
- valueKind == ValueKind.ADD,
- "Append-only writer cannot accept ValueKind: %s",
- valueKind);
-
- writer.write(value);
+ rowData.getRowKind() == RowKind.INSERT,
+ "Append-only writer can only accept insert row kind, but current row kind is: %s",
+ rowData.getRowKind());
+ writer.write(rowData);
}
@Override
@@ -99,6 +102,8 @@ public class AppendOnlyWriter implements RecordWriter {
writer = createRollingRowWriter();
}
+ System.out.println(newFiles);
+
return Increment.forAppend(newFiles);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
index 66fe012a..fbf17b83 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
@@ -52,7 +52,7 @@ public class MergeTreeReader implements RecordReader<KeyValue> {
throws IOException {
this.dropDelete = dropDelete;
- List<ReaderSupplier> readers = new ArrayList<>();
+ List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
for (List<SortedRun> section : sections) {
readers.add(
() ->
@@ -117,7 +117,7 @@ public class MergeTreeReader implements RecordReader<KeyValue> {
public static RecordReader<KeyValue> readerForRun(SortedRun run, DataFileReader dataFileReader)
throws IOException {
- List<ReaderSupplier> readers = new ArrayList<>();
+ List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
for (DataFileMeta file : run.files()) {
readers.add(() -> dataFileReader.read(file.fileName()));
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 0c62407d..677b02ff 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.mergetree;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileWriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
@@ -40,7 +39,7 @@ import java.util.Set;
import java.util.stream.Collectors;
/** A {@link RecordWriter} to write records and generate {@link Increment}. */
-public class MergeTreeWriter implements RecordWriter {
+public class MergeTreeWriter implements RecordWriter<KeyValue> {
private final MemTable memTable;
@@ -100,12 +99,12 @@ public class MergeTreeWriter implements RecordWriter {
}
@Override
- public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+ public void write(KeyValue kv) throws Exception {
long sequenceNumber = newSequenceNumber();
- boolean success = memTable.put(sequenceNumber, valueKind, key, value);
+ boolean success = memTable.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
flush();
- success = memTable.put(sequenceNumber, valueKind, key, value);
+ success = memTable.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
throw new RuntimeException("Mem table is too small to hold a single element.");
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java
index b9b86053..3cafd7a5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.file.mergetree.compact;
-import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.util.Preconditions;
@@ -34,29 +33,29 @@ import java.util.Queue;
* input list is already sorted by key and sequence number, and the key intervals do not overlap
* each other.
*/
-public class ConcatRecordReader implements RecordReader<KeyValue> {
+public class ConcatRecordReader<T> implements RecordReader<T> {
- private final Queue<ReaderSupplier> queue;
+ private final Queue<ReaderSupplier<T>> queue;
- private RecordReader<KeyValue> current;
+ private RecordReader<T> current;
- protected ConcatRecordReader(List<ReaderSupplier> readerFactories) {
+ protected ConcatRecordReader(List<ReaderSupplier<T>> readerFactories) {
readerFactories.forEach(
supplier ->
Preconditions.checkNotNull(supplier, "Reader factory must not be null."));
this.queue = new LinkedList<>(readerFactories);
}
- public static RecordReader<KeyValue> create(List<ReaderSupplier> readers) throws IOException {
- return readers.size() == 1 ? readers.get(0).get() : new ConcatRecordReader(readers);
+ public static <R> RecordReader<R> create(List<ReaderSupplier<R>> readers) throws IOException {
+ return readers.size() == 1 ? readers.get(0).get() : new ConcatRecordReader<>(readers);
}
@Nullable
@Override
- public RecordIterator<KeyValue> readBatch() throws IOException {
+ public RecordIterator<T> readBatch() throws IOException {
while (true) {
if (current != null) {
- RecordIterator<KeyValue> iterator = current.readBatch();
+ RecordIterator<T> iterator = current.readBatch();
if (iterator != null) {
return iterator;
}
@@ -79,7 +78,7 @@ public class ConcatRecordReader implements RecordReader<KeyValue> {
/** Supplier to get {@link RecordReader}. */
@FunctionalInterface
- public interface ReaderSupplier {
- RecordReader<KeyValue> get() throws IOException;
+ public interface ReaderSupplier<T> {
+ RecordReader<T> get() throws IOException;
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
similarity index 89%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 873bcf2a..02d28007 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -46,11 +46,9 @@ import java.util.function.Function;
import java.util.stream.Collectors;
/** Default implementation of {@link FileStoreScan}. */
-public class FileStoreScanImpl implements FileStoreScan {
+public abstract class AbstractFileStoreScan implements FileStoreScan {
private final FieldStatsArraySerializer partitionStatsConverter;
- private final FieldStatsArraySerializer keyStatsConverter;
- private final FieldStatsArraySerializer valueStatsConverter;
private final RowDataToObjectArrayConverter partitionConverter;
private final SnapshotManager snapshotManager;
private final ManifestFile.Factory manifestFileFactory;
@@ -58,25 +56,19 @@ public class FileStoreScanImpl implements FileStoreScan {
private final int numOfBuckets;
private Predicate partitionFilter;
- private Predicate keyFilter;
- private Predicate valueFilter;
private Long specifiedSnapshotId = null;
private Integer specifiedBucket = null;
private List<ManifestFileMeta> specifiedManifests = null;
private boolean isIncremental = false;
- public FileStoreScanImpl(
+ public AbstractFileStoreScan(
RowType partitionType,
- RowType keyType,
- RowType valueType,
SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets) {
this.partitionStatsConverter = new FieldStatsArraySerializer(partitionType);
- this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
- this.valueStatsConverter = new FieldStatsArraySerializer(valueType);
this.partitionConverter = new RowDataToObjectArrayConverter(partitionType);
this.snapshotManager = snapshotManager;
this.manifestFileFactory = manifestFileFactory;
@@ -117,18 +109,6 @@ public class FileStoreScanImpl implements FileStoreScan {
}
}
- @Override
- public FileStoreScan withKeyFilter(Predicate predicate) {
- this.keyFilter = predicate;
- return this;
- }
-
- @Override
- public FileStoreScan withValueFilter(Predicate predicate) {
- this.valueFilter = predicate;
- return this;
- }
-
@Override
public FileStoreScan withBucket(int bucket) {
this.specifiedBucket = bucket;
@@ -250,6 +230,10 @@ public class FileStoreScanImpl implements FileStoreScan {
}
private boolean filterManifestEntry(ManifestEntry entry) {
+ return filterByPartitionAndBucket(entry) && filterByStats(entry);
+ }
+
+ private boolean filterByPartitionAndBucket(ManifestEntry entry) {
if (specifiedBucket != null) {
Preconditions.checkState(
specifiedBucket < entry.totalBuckets(),
@@ -257,17 +241,11 @@ public class FileStoreScanImpl implements FileStoreScan {
}
return (partitionFilter == null
|| partitionFilter.test(partitionConverter.convert(entry.partition())))
- && (keyFilter == null
- || keyFilter.test(
- entry.file().rowCount(),
- entry.file().keyStats().fields(keyStatsConverter)))
- && (valueFilter == null
- || valueFilter.test(
- entry.file().rowCount(),
- entry.file().valueStats().fields(valueStatsConverter)))
&& (specifiedBucket == null || entry.bucket() == specifiedBucket);
}
+ protected abstract boolean filterByStats(ManifestEntry entry);
+
private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) {
return manifestFileFactory.create().read(manifest.fileName());
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
new file mode 100644
index 00000000..c3e09745
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Base {@link FileStoreWrite} implementation.
+ *
+ * @param <T> type of record to write.
+ */
+public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
+
+ private final SnapshotManager snapshotManager;
+ private final FileStoreScan scan;
+
+ protected AbstractFileStoreWrite(SnapshotManager snapshotManager, FileStoreScan scan) {
+ this.snapshotManager = snapshotManager;
+ this.scan = scan;
+ }
+
+ protected List<DataFileMeta> scanExistingFileMetas(BinaryRowData partition, int bucket) {
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ List<DataFileMeta> existingFileMetas = Lists.newArrayList();
+ if (latestSnapshotId != null) {
+ // Concat all the DataFileMeta of existing files into existingFileMetas.
+ scan.withSnapshot(latestSnapshotId)
+ .withPartitionFilter(Collections.singletonList(partition)).withBucket(bucket)
+ .plan().files().stream()
+ .map(ManifestEntry::file)
+ .forEach(existingFileMetas::add);
+ }
+ return existingFileMetas;
+ }
+
+ protected long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
+ return fileMetas.stream()
+ .map(DataFileMeta::maxSequenceNumber)
+ .max(Long::compare)
+ .orElse(-1L);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
new file mode 100644
index 00000000..d8a77d63
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.AppendOnlyReader;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** {@link FileStoreRead} for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */
+public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
+
+ private final SchemaManager schemaManager;
+ private final long schemaId;
+ private final RowType rowType;
+ private final FileFormat fileFormat;
+ private final FileStorePathFactory pathFactory;
+
+ private int[][] projection;
+
+ public AppendOnlyFileStoreRead(
+ SchemaManager schemaManager,
+ long schemaId,
+ RowType rowType,
+ FileFormat fileFormat,
+ FileStorePathFactory pathFactory) {
+ this.schemaManager = schemaManager;
+ this.schemaId = schemaId;
+ this.rowType = rowType;
+ this.fileFormat = fileFormat;
+ this.pathFactory = pathFactory;
+
+ this.projection = Projection.range(0, rowType.getFieldCount()).toNestedIndexes();
+ }
+
+ public FileStoreRead<RowData> withProjection(int[][] projectedFields) {
+ projection = projectedFields;
+ return this;
+ }
+
+ @Override
+ public RecordReader<RowData> createReader(
+ BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
+ BulkFormat<RowData, FileSourceSplit> readerFactory =
+ fileFormat.createReaderFactory(rowType, projection);
+ DataFilePathFactory dataFilePathFactory =
+ pathFactory.createDataFilePathFactory(partition, bucket);
+ List<ConcatRecordReader.ReaderSupplier<RowData>> suppliers = new ArrayList<>();
+ for (DataFileMeta file : files) {
+ suppliers.add(
+ () ->
+ new AppendOnlyReader(
+ dataFilePathFactory.toPath(file.fileName()), readerFactory));
+ }
+
+ return ConcatRecordReader.create(suppliers);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
new file mode 100644
index 00000000..521a91e4
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link FileStoreScan} for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */
+public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
+
+ private final FieldStatsArraySerializer rowStatsConverter;
+
+ private Predicate filter;
+
+ public AppendOnlyFileStoreScan(
+ RowType partitionType,
+ RowType rowType,
+ SnapshotManager snapshotManager,
+ ManifestFile.Factory manifestFileFactory,
+ ManifestList.Factory manifestListFactory,
+ int numOfBuckets) {
+ super(
+ partitionType,
+ snapshotManager,
+ manifestFileFactory,
+ manifestListFactory,
+ numOfBuckets);
+ this.rowStatsConverter = new FieldStatsArraySerializer(rowType);
+ }
+
+ public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
+ this.filter = predicate;
+ return this;
+ }
+
+ @Override
+ protected boolean filterByStats(ManifestEntry entry) {
+ return filter == null
+ || filter.test(
+ entry.file().rowCount(),
+ entry.file().valueStats().fields(rowStatsConverter));
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
new file mode 100644
index 00000000..76a780f9
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.AppendOnlyWriter;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/** {@link FileStoreWrite} for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */
+public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
+
+ private final long schemaId;
+ private final RowType rowType;
+ private final FileFormat fileFormat;
+ private final FileStorePathFactory pathFactory;
+ private final long targetFileSize;
+
+ public AppendOnlyFileStoreWrite(
+ long schemaId,
+ RowType rowType,
+ FileFormat fileFormat,
+ FileStorePathFactory pathFactory,
+ SnapshotManager snapshotManager,
+ FileStoreScan scan,
+ long targetFileSize) {
+ super(snapshotManager, scan);
+ this.schemaId = schemaId;
+ this.rowType = rowType;
+ this.fileFormat = fileFormat;
+ this.pathFactory = pathFactory;
+ this.targetFileSize = targetFileSize;
+ }
+
+ @Override
+ public RecordWriter<RowData> createWriter(
+ BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+ return createWriter(
+ partition, bucket, getMaxSequenceNumber(scanExistingFileMetas(partition, bucket)));
+ }
+
+ @Override
+ public RecordWriter<RowData> createEmptyWriter(
+ BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+ return createWriter(partition, bucket, -1L);
+ }
+
+ @Override
+ public RecordWriter<RowData> createCompactWriter(
+ BinaryRowData partition,
+ int bucket,
+ ExecutorService compactExecutor,
+ List<DataFileMeta> restoredFiles) {
+ throw new UnsupportedOperationException(
+ "Currently append only write mode does not support compaction.");
+ }
+
+ private RecordWriter<RowData> createWriter(
+ BinaryRowData partition, int bucket, long maxSeqNum) {
+ DataFilePathFactory factory = pathFactory.createDataFilePathFactory(partition, bucket);
+ return new AppendOnlyWriter(
+ schemaId, fileFormat, targetFileSize, rowType, maxSeqNum, factory);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
index b224529b..fbe02fea 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
@@ -19,37 +19,20 @@
package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.utils.RecordReader;
import java.io.IOException;
import java.util.List;
-/** Read operation which provides {@link RecordReader} creation. */
-public interface FileStoreRead {
-
- /** With drop delete records. */
- FileStoreRead withDropDelete(boolean dropDelete);
-
- /** With key nested projection. */
- FileStoreRead withKeyProjection(int[][] projectedFields);
-
- /** With value nested projection. */
- FileStoreRead withValueProjection(int[][] projectedFields);
+/**
+ * Read operation which provides {@link RecordReader} creation.
+ *
+ * @param <T> type of record to read.
+ */
+public interface FileStoreRead<T> {
- /**
- * Create a {@link RecordReader} from partition and bucket and files.
- *
- * <p>The resulting reader has the following characteristics:
- *
- * <ul>
- * <li>If {@link FileStoreRead#withKeyProjection} is called, key-values produced by this
- * reader may be unordered and may contain duplicated keys.
- * <li>If {@link FileStoreRead#withKeyProjection} is not called, key-values produced by this
- * reader is guaranteed to be ordered by keys and does not contain duplicated keys.
- * </ul>
- */
- RecordReader<KeyValue> createReader(
- BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException;
+ /** Create a {@link RecordReader} from partition and bucket and files. */
+ RecordReader<T> createReader(BinaryRowData partition, int bucket, List<DataFileMeta> files)
+ throws IOException;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 7a5eaa91..3d21746b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -41,10 +41,6 @@ public interface FileStoreScan {
FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);
- FileStoreScan withKeyFilter(Predicate predicate);
-
- FileStoreScan withValueFilter(Predicate predicate);
-
FileStoreScan withBucket(int bucket);
FileStoreScan withSnapshot(long snapshotId);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index d9e020b1..930bce95 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -25,18 +25,23 @@ import org.apache.flink.table.store.file.writer.RecordWriter;
import java.util.List;
import java.util.concurrent.ExecutorService;
-/** Write operation which provides {@link RecordWriter} creation. */
-public interface FileStoreWrite {
+/**
+ * Write operation which provides {@link RecordWriter} creation.
+ *
+ * @param <T> type of record to write.
+ */
+public interface FileStoreWrite<T> {
/** Create a {@link RecordWriter} from partition and bucket. */
- RecordWriter createWriter(BinaryRowData partition, int bucket, ExecutorService compactExecutor);
+ RecordWriter<T> createWriter(
+ BinaryRowData partition, int bucket, ExecutorService compactExecutor);
/** Create an empty {@link RecordWriter} from partition and bucket. */
- RecordWriter createEmptyWriter(
+ RecordWriter<T> createEmptyWriter(
BinaryRowData partition, int bucket, ExecutorService compactExecutor);
/** Create a compact {@link RecordWriter} from partition, bucket and restore files. */
- RecordWriter createCompactWriter(
+ RecordWriter<T> createCompactWriter(
BinaryRowData partition,
int bucket,
ExecutorService compactExecutor,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
similarity index 66%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index 5d6ef5ca..12cc5ed9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
import org.apache.flink.table.store.file.format.FileFormat;
@@ -33,102 +32,78 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
-/** Default implementation of {@link FileStoreRead}. */
-public class FileStoreReadImpl implements FileStoreRead {
+/**
+ * {@link FileStoreRead} implementation for {@link
+ * org.apache.flink.table.store.file.KeyValueFileStore}.
+ */
+public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
private final DataFileReader.Factory dataFileReaderFactory;
- private final WriteMode writeMode;
private final Comparator<RowData> keyComparator;
- @Nullable private final MergeFunction mergeFunction;
+ private final MergeFunction mergeFunction;
private boolean keyProjected;
private boolean dropDelete = true;
- public FileStoreReadImpl(
+ public KeyValueFileStoreRead(
SchemaManager schemaManager,
long schemaId,
- WriteMode writeMode,
RowType keyType,
RowType valueType,
Comparator<RowData> keyComparator,
- @Nullable MergeFunction mergeFunction,
+ MergeFunction mergeFunction,
FileFormat fileFormat,
FileStorePathFactory pathFactory) {
this.dataFileReaderFactory =
new DataFileReader.Factory(
schemaManager, schemaId, keyType, valueType, fileFormat, pathFactory);
- this.writeMode = writeMode;
this.keyComparator = keyComparator;
this.mergeFunction = mergeFunction;
this.keyProjected = false;
}
- @Override
- public FileStoreRead withDropDelete(boolean dropDelete) {
- Preconditions.checkArgument(
- writeMode != WriteMode.APPEND_ONLY || !dropDelete,
- "Cannot drop delete message for append-only table.");
+ public KeyValueFileStoreRead withDropDelete(boolean dropDelete) {
this.dropDelete = dropDelete;
return this;
}
- @Override
- public FileStoreRead withKeyProjection(int[][] projectedFields) {
+ public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
dataFileReaderFactory.withKeyProjection(projectedFields);
keyProjected = true;
return this;
}
- @Override
- public FileStoreRead withValueProjection(int[][] projectedFields) {
+ public KeyValueFileStoreRead withValueProjection(int[][] projectedFields) {
dataFileReaderFactory.withValueProjection(projectedFields);
return this;
}
+ /**
+ * The resulting reader has the following characteristics:
+ *
+ * <ul>
+ * <li>If {@link KeyValueFileStoreRead#withKeyProjection} is called, key-values produced by
+ * this reader may be unordered and may contain duplicated keys.
+ * <li>If {@link KeyValueFileStoreRead#withKeyProjection} is not called, key-values produced
+ * by this reader is guaranteed to be ordered by keys and does not contain duplicated
+ * keys.
+ * </ul>
+ */
@Override
public RecordReader<KeyValue> createReader(
BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
- switch (writeMode) {
- case APPEND_ONLY:
- return createAppendOnlyReader(partition, bucket, files);
-
- case CHANGE_LOG:
- return createMergeTreeReader(partition, bucket, files);
-
- default:
- throw new UnsupportedOperationException("Unknown write mode: " + writeMode);
- }
- }
-
- private RecordReader<KeyValue> createAppendOnlyReader(
- BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
- DataFileReader dataFileReader = dataFileReaderFactory.create(partition, bucket);
- List<ConcatRecordReader.ReaderSupplier> suppliers = new ArrayList<>();
- for (DataFileMeta file : files) {
- suppliers.add(() -> dataFileReader.read(file.fileName()));
- }
-
- return ConcatRecordReader.create(suppliers);
- }
-
- private RecordReader<KeyValue> createMergeTreeReader(
- BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
DataFileReader dataFileReader = dataFileReaderFactory.create(partition, bucket);
if (keyProjected) {
// key projection has been applied, so data file readers will not return key-values in
- // order,
- // we have to return the raw file contents without merging
- List<ConcatRecordReader.ReaderSupplier> suppliers = new ArrayList<>();
+ // order, we have to return the raw file contents without merging
+ List<ConcatRecordReader.ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
for (DataFileMeta file : files) {
suppliers.add(() -> dataFileReader.read(file.fileName()));
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
new file mode 100644
index 00000000..bdbf23fd
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link FileStoreScan} for {@link org.apache.flink.table.store.file.KeyValueFileStore}. */
+public class KeyValueFileStoreScan extends AbstractFileStoreScan {
+
+ private final FieldStatsArraySerializer keyStatsConverter;
+
+ private Predicate keyFilter;
+
+ public KeyValueFileStoreScan(
+ RowType partitionType,
+ RowType keyType,
+ SnapshotManager snapshotManager,
+ ManifestFile.Factory manifestFileFactory,
+ ManifestList.Factory manifestListFactory,
+ int numOfBuckets) {
+ super(
+ partitionType,
+ snapshotManager,
+ manifestFileFactory,
+ manifestListFactory,
+ numOfBuckets);
+ this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
+ }
+
+ public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
+ this.keyFilter = predicate;
+ return this;
+ }
+
+ @Override
+ protected boolean filterByStats(ManifestEntry entry) {
+ return keyFilter == null
+ || keyFilter.test(
+ entry.file().rowCount(), entry.file().keyStats().fields(keyStatsConverter));
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
similarity index 68%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index b28653f1..86d68ef5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -20,13 +20,11 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.data.DataFilePathFactory;
import org.apache.flink.table.store.file.data.DataFileReader;
import org.apache.flink.table.store.file.data.DataFileWriter;
import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.mergetree.Levels;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
@@ -41,13 +39,10 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.AppendOnlyWriter;
import org.apache.flink.table.store.file.writer.CompactWriter;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -55,25 +50,16 @@ import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
-/** Default implementation of {@link FileStoreWrite}. */
-public class FileStoreWriteImpl implements FileStoreWrite {
+/** {@link FileStoreWrite} for {@link org.apache.flink.table.store.file.KeyValueFileStore}. */
+public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
- private final WriteMode writeMode;
- private final SchemaManager schemaManager;
- private final long schemaId;
- private final RowType valueType;
private final DataFileReader.Factory dataFileReaderFactory;
private final DataFileWriter.Factory dataFileWriterFactory;
- private final FileFormat fileFormat;
private final Supplier<Comparator<RowData>> keyComparatorSupplier;
private final MergeFunction mergeFunction;
- private final FileStorePathFactory pathFactory;
- private final SnapshotManager snapshotManager;
- private final FileStoreScan scan;
private final MergeTreeOptions options;
- public FileStoreWriteImpl(
- WriteMode writeMode,
+ public KeyValueFileStoreWrite(
SchemaManager schemaManager,
long schemaId,
RowType keyType,
@@ -85,9 +71,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
SnapshotManager snapshotManager,
FileStoreScan scan,
MergeTreeOptions options) {
- this.schemaManager = schemaManager;
- this.schemaId = schemaId;
- this.valueType = valueType;
+ super(snapshotManager, scan);
this.dataFileReaderFactory =
new DataFileReader.Factory(
schemaManager, schemaId, keyType, valueType, fileFormat, pathFactory);
@@ -99,69 +83,26 @@ public class FileStoreWriteImpl implements FileStoreWrite {
fileFormat,
pathFactory,
options.targetFileSize);
- this.writeMode = writeMode;
- this.fileFormat = fileFormat;
this.keyComparatorSupplier = keyComparatorSupplier;
this.mergeFunction = mergeFunction;
- this.pathFactory = pathFactory;
- this.snapshotManager = snapshotManager;
- this.scan = scan;
this.options = options;
}
@Override
- public RecordWriter createWriter(
+ public RecordWriter<KeyValue> createWriter(
BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
- Long latestSnapshotId = snapshotManager.latestSnapshotId();
- List<DataFileMeta> existingFileMetas = Lists.newArrayList();
- if (latestSnapshotId != null) {
- // Concat all the DataFileMeta of existing files into existingFileMetas.
- scan.withSnapshot(latestSnapshotId)
- .withPartitionFilter(Collections.singletonList(partition)).withBucket(bucket)
- .plan().files().stream()
- .map(ManifestEntry::file)
- .forEach(existingFileMetas::add);
- }
-
- switch (writeMode) {
- case APPEND_ONLY:
- DataFilePathFactory factory =
- pathFactory.createDataFilePathFactory(partition, bucket);
- long maxSeqNum =
- existingFileMetas.stream()
- .map(DataFileMeta::maxSequenceNumber)
- .max(Long::compare)
- .orElse(-1L);
-
- return new AppendOnlyWriter(
- schemaId,
- fileFormat,
- options.targetFileSize,
- valueType,
- maxSeqNum,
- factory);
-
- case CHANGE_LOG:
- if (latestSnapshotId == null) {
- return createEmptyWriter(partition, bucket, compactExecutor);
- } else {
- return createMergeTreeWriter(
- partition, bucket, existingFileMetas, compactExecutor);
- }
-
- default:
- throw new UnsupportedOperationException("Unknown write mode: " + writeMode);
- }
+ return createMergeTreeWriter(
+ partition, bucket, scanExistingFileMetas(partition, bucket), compactExecutor);
}
@Override
- public RecordWriter createEmptyWriter(
+ public RecordWriter<KeyValue> createEmptyWriter(
BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
return createMergeTreeWriter(partition, bucket, Collections.emptyList(), compactExecutor);
}
@Override
- public RecordWriter createCompactWriter(
+ public RecordWriter<KeyValue> createCompactWriter(
BinaryRowData partition,
int bucket,
ExecutorService compactExecutor,
@@ -177,16 +118,11 @@ public class FileStoreWriteImpl implements FileStoreWrite {
compactExecutor));
}
- private RecordWriter createMergeTreeWriter(
+ private RecordWriter<KeyValue> createMergeTreeWriter(
BinaryRowData partition,
int bucket,
List<DataFileMeta> restoreFiles,
ExecutorService compactExecutor) {
- long maxSequenceNumber =
- restoreFiles.stream()
- .map(DataFileMeta::maxSequenceNumber)
- .max(Long::compare)
- .orElse(-1L);
DataFileWriter dataFileWriter = dataFileWriterFactory.create(partition, bucket);
Comparator<RowData> keyComparator = keyComparatorSupplier.get();
return new MergeTreeWriter(
@@ -204,7 +140,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
options.numSortedRunCompactionTrigger),
compactExecutor),
new Levels(keyComparator, restoreFiles, options.numLevels),
- maxSequenceNumber,
+ getMaxSequenceNumber(restoreFiles),
keyComparator,
mergeFunction.copy(),
dataFileWriter,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
index de625beb..884af4e8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
@@ -18,8 +18,7 @@
package org.apache.flink.table.store.file.writer;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
@@ -35,7 +34,7 @@ import java.util.concurrent.ExecutionException;
* A {@link RecordWriter} implementation that only perform compaction on existing records and does
* not generate new records.
*/
-public class CompactWriter implements RecordWriter {
+public class CompactWriter implements RecordWriter<KeyValue> {
private final CompactUnit unit;
private final CompactManager compactManager;
@@ -73,7 +72,7 @@ public class CompactWriter implements RecordWriter {
}
@Override
- public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+ public void write(KeyValue kv) throws Exception {
// nothing to write
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
index 4f29e278..5b0ce6aa 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.file.writer;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.Increment;
@@ -29,11 +27,13 @@ import java.util.List;
* The {@code RecordWriter} is responsible for writing data and handling in-progress files used to
* write yet un-staged data. The incremental files ready to commit is returned to the system by the
* {@link #prepareCommit()}.
+ *
+ * @param <T> type of record to write.
*/
-public interface RecordWriter {
+public interface RecordWriter<T> {
/** Add a key-value element to the writer. */
- void write(ValueKind valueKind, RowData key, RowData value) throws Exception;
+ void write(T record) throws Exception;
/**
* Prepare for a commit.
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index ff0c8539..7f0de998 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -19,65 +19,67 @@
package org.apache.flink.table.store.table;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.AppendOnlyFileStore;
import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreRead;
+import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.AbstractTableWrite;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
-import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
-import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
+import java.io.IOException;
+import java.util.List;
+
/** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
private static final long serialVersionUID = 1L;
- private final FileStoreImpl store;
+ private final AppendOnlyFileStore store;
AppendOnlyFileStoreTable(String name, SchemaManager schemaManager, Schema schema, String user) {
super(name, schema);
this.store =
- new FileStoreImpl(
+ new AppendOnlyFileStore(
schemaManager,
schema.id(),
new FileStoreOptions(schema.options()),
- WriteMode.APPEND_ONLY,
user,
schema.logicalPartitionType(),
- RowType.of(),
- schema.logicalRowType(),
- null);
+ schema.logicalRowType());
}
@Override
public TableScan newScan() {
- return new TableScan(store.newScan(), schema, store.pathFactory()) {
+ AppendOnlyFileStoreScan scan = store.newScan();
+ return new TableScan(scan, schema, store.pathFactory()) {
@Override
protected void withNonPartitionFilter(Predicate predicate) {
- scan.withValueFilter(predicate);
+ scan.withFilter(predicate);
}
};
}
@Override
public TableRead newRead() {
- return new TableRead(store.newRead()) {
+ AppendOnlyFileStoreRead read = store.newRead();
+ return new TableRead() {
@Override
public TableRead withProjection(int[][] projection) {
- read.withValueProjection(projection);
+ read.withProjection(projection);
return this;
}
@@ -87,9 +89,10 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
}
@Override
- protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(
- RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
- return new ValueContentRowDataRecordIterator(kvRecordIterator);
+ public RecordReader<RowData> createReader(
+ BinaryRowData partition, int bucket, List<DataFileMeta> files)
+ throws IOException {
+ return read.createReader(partition, bucket, files);
}
};
}
@@ -98,21 +101,21 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
public TableWrite newWrite() {
SinkRecordConverter recordConverter =
new SinkRecordConverter(store.options().bucket(), schema);
- return new TableWrite(store.newWrite(), recordConverter) {
+ return new AbstractTableWrite<RowData>(store.newWrite(), recordConverter) {
@Override
- protected void writeSinkRecord(SinkRecord record, RecordWriter writer)
+ protected void writeSinkRecord(SinkRecord record, RecordWriter<RowData> writer)
throws Exception {
Preconditions.checkState(
record.row().getRowKind() == RowKind.INSERT,
"Append only writer can not accept row with RowKind %s",
record.row().getRowKind());
- writer.write(ValueKind.ADD, BinaryRowDataUtil.EMPTY_ROW, record.row());
+ writer.write(record.row());
}
};
}
@Override
- public FileStoreImpl store() {
+ public AppendOnlyFileStore store() {
return store;
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 79785d6e..c9f85f80 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -20,21 +20,24 @@ package org.apache.flink.table.store.table;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStoreImpl;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.KeyValueFileStore;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.AbstractTableWrite;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
@@ -47,7 +50,7 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
private static final long serialVersionUID = 1L;
- private final FileStoreImpl store;
+ private final KeyValueFileStore store;
ChangelogValueCountFileStoreTable(
String name, SchemaManager schemaManager, Schema schema, String user) {
@@ -57,11 +60,10 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
new LogicalType[] {new BigIntType(false)}, new String[] {"_VALUE_COUNT"});
MergeFunction mergeFunction = new ValueCountMergeFunction();
this.store =
- new FileStoreImpl(
+ new KeyValueFileStore(
schemaManager,
schema.id(),
new FileStoreOptions(schema.options()),
- WriteMode.CHANGE_LOG,
user,
schema.logicalPartitionType(),
schema.logicalRowType(),
@@ -71,7 +73,8 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
@Override
public TableScan newScan() {
- return new TableScan(store.newScan(), schema, store.pathFactory()) {
+ KeyValueFileStoreScan scan = store.newScan();
+ return new TableScan(scan, schema, store.pathFactory()) {
@Override
protected void withNonPartitionFilter(Predicate predicate) {
scan.withKeyFilter(predicate);
@@ -81,7 +84,7 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
@Override
public TableRead newRead() {
- return new TableRead(store.newRead()) {
+ return new KeyValueTableRead(store.newRead()) {
private int[][] projection = null;
private boolean isIncremental = false;
@@ -114,29 +117,31 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
public TableWrite newWrite() {
SinkRecordConverter recordConverter =
new SinkRecordConverter(store.options().bucket(), schema);
- return new TableWrite(store.newWrite(), recordConverter) {
+ return new AbstractTableWrite<KeyValue>(store.newWrite(), recordConverter) {
@Override
- protected void writeSinkRecord(SinkRecord record, RecordWriter writer)
+ protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
throws Exception {
+ KeyValue kv = new KeyValue();
switch (record.row().getRowKind()) {
case INSERT:
case UPDATE_AFTER:
- writer.write(ValueKind.ADD, record.row(), GenericRowData.of(1L));
+ kv.replace(record.row(), ValueKind.ADD, GenericRowData.of(1L));
break;
case UPDATE_BEFORE:
case DELETE:
- writer.write(ValueKind.ADD, record.row(), GenericRowData.of(-1L));
+ kv.replace(record.row(), ValueKind.ADD, GenericRowData.of(-1L));
break;
default:
throw new UnsupportedOperationException(
"Unknown row kind " + record.row().getRowKind());
}
+ writer.write(kv);
}
};
}
@Override
- public FileStoreImpl store() {
+ public KeyValueFileStore store() {
return store;
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 6d611638..7109a91a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -20,23 +20,26 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStoreImpl;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.KeyValueFileStore;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.AbstractTableWrite;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
@@ -53,7 +56,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
private static final long serialVersionUID = 1L;
- private final FileStoreImpl store;
+ private final KeyValueFileStore store;
ChangelogWithKeyFileStoreTable(
String name, SchemaManager schemaManager, Schema schema, String user) {
@@ -93,11 +96,10 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
}
this.store =
- new FileStoreImpl(
+ new KeyValueFileStore(
schemaManager,
schema.id(),
new FileStoreOptions(conf),
- WriteMode.CHANGE_LOG,
user,
schema.logicalPartitionType(),
keyType,
@@ -107,7 +109,8 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
@Override
public TableScan newScan() {
- return new TableScan(store.newScan(), schema, store.pathFactory()) {
+ KeyValueFileStoreScan scan = store.newScan();
+ return new TableScan(scan, schema, store.pathFactory()) {
@Override
protected void withNonPartitionFilter(Predicate predicate) {
// currently we can only perform filter push down on keys
@@ -138,7 +141,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
@Override
public TableRead newRead() {
- return new TableRead(store.newRead()) {
+ return new KeyValueTableRead(store.newRead()) {
@Override
public TableRead withProjection(int[][] projection) {
read.withValueProjection(projection);
@@ -163,29 +166,31 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
public TableWrite newWrite() {
SinkRecordConverter recordConverter =
new SinkRecordConverter(store.options().bucket(), schema);
- return new TableWrite(store.newWrite(), recordConverter) {
+ return new AbstractTableWrite<KeyValue>(store.newWrite(), recordConverter) {
@Override
- protected void writeSinkRecord(SinkRecord record, RecordWriter writer)
+ protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
throws Exception {
+ KeyValue kv = new KeyValue();
switch (record.row().getRowKind()) {
case INSERT:
case UPDATE_AFTER:
- writer.write(ValueKind.ADD, record.primaryKey(), record.row());
+ kv.replace(record.primaryKey(), ValueKind.ADD, record.row());
break;
case UPDATE_BEFORE:
case DELETE:
- writer.write(ValueKind.DELETE, record.primaryKey(), record.row());
+ kv.replace(record.primaryKey(), ValueKind.DELETE, record.row());
break;
default:
throw new UnsupportedOperationException(
"Unknown row kind " + record.row().getRowKind());
}
+ writer.write(kv);
}
};
}
@Override
- public FileStoreImpl store() {
+ public KeyValueFileStore store() {
return store;
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
similarity index 75%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
index 2b70a329..1e45320e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
@@ -33,18 +33,22 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-/** An abstraction layer above {@link FileStoreWrite} to provide {@link RowData} writing. */
-public abstract class TableWrite {
+/**
+ * Base {@link TableWrite} implementation.
+ *
+ * @param <T> type of record to write into {@link org.apache.flink.table.store.file.FileStore}.
+ */
+public abstract class AbstractTableWrite<T> implements TableWrite {
- private final FileStoreWrite write;
+ private final FileStoreWrite<T> write;
private final SinkRecordConverter recordConverter;
- private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
+ private final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers;
private final ExecutorService compactExecutor;
private boolean overwrite = false;
- protected TableWrite(FileStoreWrite write, SinkRecordConverter recordConverter) {
+ protected AbstractTableWrite(FileStoreWrite<T> write, SinkRecordConverter recordConverter) {
this.write = write;
this.recordConverter = recordConverter;
@@ -54,36 +58,40 @@ public abstract class TableWrite {
new ExecutorThreadFactory("compaction-thread"));
}
+ @Override
public TableWrite withOverwrite(boolean overwrite) {
this.overwrite = overwrite;
return this;
}
+ @Override
public SinkRecordConverter recordConverter() {
return recordConverter;
}
+ @Override
public SinkRecord write(RowData rowData) throws Exception {
SinkRecord record = recordConverter.convert(rowData);
- RecordWriter writer = getWriter(record.partition(), record.bucket());
+ RecordWriter<T> writer = getWriter(record.partition(), record.bucket());
writeSinkRecord(record, writer);
return record;
}
+ @Override
public List<FileCommittable> prepareCommit() throws Exception {
List<FileCommittable> result = new ArrayList<>();
- Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter>>> partIter =
+ Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>>> partIter =
writers.entrySet().iterator();
while (partIter.hasNext()) {
- Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> partEntry = partIter.next();
+ Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>> partEntry = partIter.next();
BinaryRowData partition = partEntry.getKey();
- Iterator<Map.Entry<Integer, RecordWriter>> bucketIter =
+ Iterator<Map.Entry<Integer, RecordWriter<T>>> bucketIter =
partEntry.getValue().entrySet().iterator();
while (bucketIter.hasNext()) {
- Map.Entry<Integer, RecordWriter> entry = bucketIter.next();
+ Map.Entry<Integer, RecordWriter<T>> entry = bucketIter.next();
int bucket = entry.getKey();
- RecordWriter writer = entry.getValue();
+ RecordWriter<T> writer = entry.getValue();
FileCommittable committable =
new FileCommittable(partition, bucket, writer.prepareCommit());
result.add(committable);
@@ -105,15 +113,16 @@ public abstract class TableWrite {
return result;
}
- private void closeWriter(RecordWriter writer) throws Exception {
+ private void closeWriter(RecordWriter<T> writer) throws Exception {
writer.sync();
writer.close();
}
+ @Override
public void close() throws Exception {
compactExecutor.shutdownNow();
- for (Map<Integer, RecordWriter> bucketWriters : writers.values()) {
- for (RecordWriter writer : bucketWriters.values()) {
+ for (Map<Integer, RecordWriter<T>> bucketWriters : writers.values()) {
+ for (RecordWriter<T> writer : bucketWriters.values()) {
closeWriter(writer);
}
}
@@ -121,15 +130,15 @@ public abstract class TableWrite {
}
@VisibleForTesting
- public Map<BinaryRowData, Map<Integer, RecordWriter>> writers() {
+ public Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers() {
return writers;
}
- protected abstract void writeSinkRecord(SinkRecord record, RecordWriter writer)
+ protected abstract void writeSinkRecord(SinkRecord record, RecordWriter<T> writer)
throws Exception;
- private RecordWriter getWriter(BinaryRowData partition, int bucket) {
- Map<Integer, RecordWriter> buckets = writers.get(partition);
+ private RecordWriter<T> getWriter(BinaryRowData partition, int bucket) {
+ Map<Integer, RecordWriter<T>> buckets = writers.get(partition);
if (buckets == null) {
buckets = new HashMap<>();
writers.put(partition.copy(), buckets);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index 2b70a329..5384f892 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -18,127 +18,23 @@
package org.apache.flink.table.store.table.sink;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.operation.FileStoreWrite;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-/** An abstraction layer above {@link FileStoreWrite} to provide {@link RowData} writing. */
-public abstract class TableWrite {
-
- private final FileStoreWrite write;
- private final SinkRecordConverter recordConverter;
-
- private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
- private final ExecutorService compactExecutor;
-
- private boolean overwrite = false;
-
- protected TableWrite(FileStoreWrite write, SinkRecordConverter recordConverter) {
- this.write = write;
- this.recordConverter = recordConverter;
-
- this.writers = new HashMap<>();
- this.compactExecutor =
- Executors.newSingleThreadScheduledExecutor(
- new ExecutorThreadFactory("compaction-thread"));
- }
-
- public TableWrite withOverwrite(boolean overwrite) {
- this.overwrite = overwrite;
- return this;
- }
-
- public SinkRecordConverter recordConverter() {
- return recordConverter;
- }
-
- public SinkRecord write(RowData rowData) throws Exception {
- SinkRecord record = recordConverter.convert(rowData);
- RecordWriter writer = getWriter(record.partition(), record.bucket());
- writeSinkRecord(record, writer);
- return record;
- }
-
- public List<FileCommittable> prepareCommit() throws Exception {
- List<FileCommittable> result = new ArrayList<>();
-
- Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter>>> partIter =
- writers.entrySet().iterator();
- while (partIter.hasNext()) {
- Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> partEntry = partIter.next();
- BinaryRowData partition = partEntry.getKey();
- Iterator<Map.Entry<Integer, RecordWriter>> bucketIter =
- partEntry.getValue().entrySet().iterator();
- while (bucketIter.hasNext()) {
- Map.Entry<Integer, RecordWriter> entry = bucketIter.next();
- int bucket = entry.getKey();
- RecordWriter writer = entry.getValue();
- FileCommittable committable =
- new FileCommittable(partition, bucket, writer.prepareCommit());
- result.add(committable);
-
- // clear if no update
- // we need a mechanism to clear writers, otherwise there will be more and more
- // such as yesterday's partition that no longer needs to be written.
- if (committable.increment().newFiles().isEmpty()) {
- closeWriter(writer);
- bucketIter.remove();
- }
- }
-
- if (partEntry.getValue().isEmpty()) {
- partIter.remove();
- }
- }
-
- return result;
- }
+/**
+ * An abstraction layer above {@link org.apache.flink.table.store.file.operation.FileStoreWrite} to
+ * provide {@link RowData} writing.
+ */
+public interface TableWrite {
- private void closeWriter(RecordWriter writer) throws Exception {
- writer.sync();
- writer.close();
- }
+ TableWrite withOverwrite(boolean overwrite);
- public void close() throws Exception {
- compactExecutor.shutdownNow();
- for (Map<Integer, RecordWriter> bucketWriters : writers.values()) {
- for (RecordWriter writer : bucketWriters.values()) {
- closeWriter(writer);
- }
- }
- writers.clear();
- }
+ SinkRecordConverter recordConverter();
- @VisibleForTesting
- public Map<BinaryRowData, Map<Integer, RecordWriter>> writers() {
- return writers;
- }
+ SinkRecord write(RowData rowData) throws Exception;
- protected abstract void writeSinkRecord(SinkRecord record, RecordWriter writer)
- throws Exception;
+ List<FileCommittable> prepareCommit() throws Exception;
- private RecordWriter getWriter(BinaryRowData partition, int bucket) {
- Map<Integer, RecordWriter> buckets = writers.get(partition);
- if (buckets == null) {
- buckets = new HashMap<>();
- writers.put(partition.copy(), buckets);
- }
- return buckets.computeIfAbsent(
- bucket,
- k ->
- overwrite
- ? write.createEmptyWriter(partition.copy(), bucket, compactExecutor)
- : write.createWriter(partition.copy(), bucket, compactExecutor));
- }
+ void close() throws Exception;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
similarity index 75%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
index 53ff22e6..f5336065 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
@@ -22,36 +22,26 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
import org.apache.flink.table.store.file.utils.RecordReader;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
-/** An abstraction layer above {@link FileStoreRead} to provide reading of {@link RowData}. */
-public abstract class TableRead {
+/**
+ * An abstraction layer above {@link KeyValueFileStoreRead} to provide reading of {@link RowData}.
+ */
+public abstract class KeyValueTableRead implements TableRead {
- protected final FileStoreRead read;
+ protected final KeyValueFileStoreRead read;
- protected TableRead(FileStoreRead read) {
+ protected KeyValueTableRead(KeyValueFileStoreRead read) {
this.read = read;
}
- // TODO support filter push down
-
- public TableRead withProjection(int[] projection) {
- int[][] nestedProjection =
- Arrays.stream(projection).mapToObj(i -> new int[] {i}).toArray(int[][]::new);
- return withProjection(nestedProjection);
- }
-
- public abstract TableRead withProjection(int[][] projection);
-
- public abstract TableRead withIncremental(boolean isIncremental);
-
+ @Override
public RecordReader<RowData> createReader(
BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
return new RowDataRecordReader(read.createReader(partition, bucket, files));
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
index 53ff22e6..013c1b49 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
@@ -20,64 +20,29 @@ package org.apache.flink.table.store.table.source;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.utils.RecordReader;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
/** An abstraction layer above {@link FileStoreRead} to provide reading of {@link RowData}. */
-public abstract class TableRead {
-
- protected final FileStoreRead read;
-
- protected TableRead(FileStoreRead read) {
- this.read = read;
- }
+public interface TableRead {
// TODO support filter push down
- public TableRead withProjection(int[] projection) {
+ default TableRead withProjection(int[] projection) {
int[][] nestedProjection =
Arrays.stream(projection).mapToObj(i -> new int[] {i}).toArray(int[][]::new);
return withProjection(nestedProjection);
}
- public abstract TableRead withProjection(int[][] projection);
-
- public abstract TableRead withIncremental(boolean isIncremental);
-
- public RecordReader<RowData> createReader(
- BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
- return new RowDataRecordReader(read.createReader(partition, bucket, files));
- }
-
- protected abstract RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(
- RecordReader.RecordIterator<KeyValue> kvRecordIterator);
-
- private class RowDataRecordReader implements RecordReader<RowData> {
+ TableRead withProjection(int[][] projection);
- private final RecordReader<KeyValue> wrapped;
+ TableRead withIncremental(boolean isIncremental);
- private RowDataRecordReader(RecordReader<KeyValue> wrapped) {
- this.wrapped = wrapped;
- }
-
- @Nullable
- @Override
- public RecordIterator<RowData> readBatch() throws IOException {
- RecordIterator<KeyValue> batch = wrapped.readBatch();
- return batch == null ? null : rowDataRecordIteratorFromKv(batch);
- }
-
- @Override
- public void close() throws IOException {
- wrapped.close();
- }
- }
+ RecordReader<RowData> createReader(
+ BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index ada81909..1769e6e0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -36,7 +36,7 @@ import java.util.Optional;
/** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
public abstract class TableScan {
- protected final FileStoreScan scan;
+ private final FileStoreScan scan;
private final Schema schema;
private final FileStorePathFactory pathFactory;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 7b872218..655b38db 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -69,7 +69,7 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/** {@link FileStore} for tests. */
-public class TestFileStore extends FileStoreImpl {
+public class TestFileStore extends KeyValueFileStore {
private static final Logger LOG = LoggerFactory.getLogger(TestFileStore.class);
@@ -115,7 +115,6 @@ public class TestFileStore extends FileStoreImpl {
new SchemaManager(options.path()),
0L,
options,
- WriteMode.CHANGE_LOG,
UUID.randomUUID().toString(),
partitionType,
keyType,
@@ -182,12 +181,17 @@ public class TestFileStore extends FileStoreImpl {
List<KeyValue> kvs,
Function<KeyValue, BinaryRowData> partitionCalculator,
Function<KeyValue, Integer> bucketCalculator,
- QuadFunction<FileStoreWrite, BinaryRowData, Integer, ExecutorService, RecordWriter>
+ QuadFunction<
+ FileStoreWrite<KeyValue>,
+ BinaryRowData,
+ Integer,
+ ExecutorService,
+ RecordWriter<KeyValue>>
createWriterFunction,
BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
throws Exception {
- FileStoreWrite write = newWrite();
- Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>();
+ FileStoreWrite<KeyValue> write = newWrite();
+ Map<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> writers = new HashMap<>();
for (KeyValue kv : kvs) {
BinaryRowData partition = partitionCalculator.apply(kv);
int bucket = bucketCalculator.apply(kv);
@@ -203,15 +207,15 @@ public class TestFileStore extends FileStoreImpl {
return w;
}
})
- .write(kv.valueKind(), kv.key(), kv.value());
+ .write(kv);
}
FileStoreCommit commit = newCommit();
ManifestCommittable committable =
new ManifestCommittable(String.valueOf(new Random().nextLong()));
- for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> entryWithPartition :
+ for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> entryWithPartition :
writers.entrySet()) {
- for (Map.Entry<Integer, RecordWriter> entryWithBucket :
+ for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket :
entryWithPartition.getValue().entrySet()) {
Increment increment = entryWithBucket.getValue().prepareCommit();
committable.addFileCommittable(
@@ -271,7 +275,7 @@ public class TestFileStore extends FileStoreImpl {
}
List<KeyValue> kvs = new ArrayList<>();
- FileStoreRead read = newRead();
+ FileStoreRead<KeyValue> read = newRead();
for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entryWithPartition :
filesPerPartitionAndBucket.entrySet()) {
for (Map.Entry<Integer, List<DataFileMeta>> entryWithBucket :
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
similarity index 90%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index f5927de3..774676ad 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.flink.table.store.file.writer;
+package org.apache.flink.table.store.file.data;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
@@ -26,13 +26,11 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.data.DataFilePathFactory;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.stats.FieldStats;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -71,7 +69,7 @@ public class AppendOnlyWriterTest {
@Test
public void testEmptyCommits() throws Exception {
- RecordWriter writer = createWriter(1024 * 1024L, SCHEMA, 0);
+ RecordWriter<RowData> writer = createWriter(1024 * 1024L, SCHEMA, 0);
for (int i = 0; i < 3; i++) {
writer.sync();
@@ -85,8 +83,8 @@ public class AppendOnlyWriterTest {
@Test
public void testSingleWrite() throws Exception {
- RecordWriter writer = createWriter(1024 * 1024L, SCHEMA, 0);
- writer.write(ValueKind.ADD, EMPTY_ROW, row(1, "AAA", PART));
+ RecordWriter<RowData> writer = createWriter(1024 * 1024L, SCHEMA, 0);
+ writer.write(row(1, "AAA", PART));
List<DataFileMeta> result = writer.close();
@@ -115,7 +113,7 @@ public class AppendOnlyWriterTest {
@Test
public void testMultipleCommits() throws Exception {
- RecordWriter writer = createWriter(1024 * 1024L, SCHEMA, 0);
+ RecordWriter<RowData> writer = createWriter(1024 * 1024L, SCHEMA, 0);
// Commit 5 continues txn.
for (int txn = 0; txn < 5; txn += 1) {
@@ -124,7 +122,7 @@ public class AppendOnlyWriterTest {
int start = txn * 100;
int end = txn * 100 + 100;
for (int i = start; i < end; i++) {
- writer.write(ValueKind.ADD, EMPTY_ROW, row(i, String.format("%03d", i), PART));
+ writer.write(row(i, String.format("%03d", i), PART));
}
writer.sync();
@@ -161,10 +159,10 @@ public class AppendOnlyWriterTest {
public void testRollingWrite() throws Exception {
// Set a very small target file size, so that we will roll over to a new file even if
// writing one record.
- RecordWriter writer = createWriter(10L, SCHEMA, 0);
+ RecordWriter<RowData> writer = createWriter(10L, SCHEMA, 0);
for (int i = 0; i < 10; i++) {
- writer.write(ValueKind.ADD, EMPTY_ROW, row(i, String.format("%03d", i), PART));
+ writer.write(row(i, String.format("%03d", i), PART));
}
writer.sync();
@@ -220,7 +218,8 @@ public class AppendOnlyWriterTest {
FileStoreOptions.FILE_FORMAT.defaultValue());
}
- private RecordWriter createWriter(long targetFileSize, RowType writeSchema, long maxSeqNum) {
+ private RecordWriter<RowData> createWriter(
+ long targetFileSize, RowType writeSchema, long maxSeqNum) {
FileFormat fileFormat =
FileFormat.fromIdentifier(
Thread.currentThread().getContextClassLoader(), AVRO, new Configuration());
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 331e8a9e..16137dd3 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -82,7 +82,7 @@ public class MergeTreeTest {
private MergeTreeOptions options;
private DataFileReader dataFileReader;
private DataFileWriter dataFileWriter;
- private RecordWriter writer;
+ private RecordWriter<KeyValue> writer;
@BeforeEach
public void beforeEach() throws IOException {
@@ -357,7 +357,8 @@ public class MergeTreeTest {
private void writeAll(List<TestRecord> records) throws Exception {
for (TestRecord record : records) {
- writer.write(record.kind, row(record.k), row(record.v));
+ KeyValue kv = new KeyValue().replace(row(record.k), record.kind, row(record.v));
+ writer.write(kv);
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
similarity index 98%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index a3378ed1..af4f774c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -50,8 +50,8 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link FileStoreReadImpl}. */
-public class FileStoreReadTest {
+/** Tests for {@link KeyValueFileStoreRead}. */
+public class KeyValueFileStoreReadTest {
@TempDir java.nio.file.Path tempDir;
@@ -189,7 +189,7 @@ public class FileStoreReadTest {
scan.withSnapshot(store.snapshotManager().latestSnapshotId()).plan().files()
.stream()
.collect(Collectors.groupingBy(ManifestEntry::partition));
- FileStoreRead read = store.newRead();
+ KeyValueFileStoreRead read = store.newRead();
if (keyProjection != null) {
read.withKeyProjection(keyProjection);
read.withDropDelete(false);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
similarity index 90%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
index f6245761..74f7ecae 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
@@ -46,8 +46,8 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link FileStoreScanImpl}. */
-public class FileStoreScanTest {
+/** Tests for {@link KeyValueFileStoreScan}. */
+public class KeyValueFileStoreScanTest {
private static final int NUM_BUCKETS = 10;
@@ -112,7 +112,7 @@ public class FileStoreScanTest {
int wantedShopId = data.get(random.nextInt(data.size())).key().getInt(0);
- FileStoreScan scan = store.newScan();
+ KeyValueFileStoreScan scan = store.newScan();
scan.withSnapshot(snapshot.id());
scan.withKeyFilter(
PredicateBuilder.equal(0, new Literal(new IntType(false), wantedShopId)));
@@ -125,27 +125,6 @@ public class FileStoreScanTest {
runTestContainsAll(scan, snapshot.id(), expected);
}
- @Test
- public void testWithValueFilter() throws Exception {
- ThreadLocalRandom random = ThreadLocalRandom.current();
- List<KeyValue> data = generateData(random.nextInt(1000) + 1);
- Snapshot snapshot = writeData(data);
-
- int wantedShopId = data.get(random.nextInt(data.size())).value().getInt(2);
-
- FileStoreScan scan = store.newScan();
- scan.withSnapshot(snapshot.id());
- scan.withValueFilter(
- PredicateBuilder.equal(2, new Literal(new IntType(false), wantedShopId)));
-
- Map<BinaryRowData, BinaryRowData> expected =
- store.toKvMap(
- data.stream()
- .filter(kv -> kv.value().getInt(2) == wantedShopId)
- .collect(Collectors.toList()));
- runTestContainsAll(scan, snapshot.id(), expected);
- }
-
@Test
public void testWithBucket() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index c65ef8bb..d51e4416 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -49,7 +49,7 @@ public class TestCommitThread extends Thread {
private final Map<BinaryRowData, List<KeyValue>> result;
private final Map<BinaryRowData, MergeTreeWriter> writers;
- private final FileStoreWrite write;
+ private final FileStoreWrite<KeyValue> write;
private final FileStoreCommit commit;
public TestCommitThread(
@@ -150,7 +150,7 @@ public class TestCommitThread extends Thread {
MergeTreeWriter writer =
writers.compute(partition, (p, w) -> w == null ? createWriter(p, false) : w);
for (KeyValue kv : changes) {
- writer.write(kv.valueKind(), kv.key(), kv.value());
+ writer.write(kv);
}
}
@@ -169,7 +169,7 @@ public class TestCommitThread extends Thread {
MergeTreeWriter writer = createWriter(partition, true);
writers.put(partition, writer);
for (KeyValue kv : changes) {
- writer.write(kv.valueKind(), kv.key(), kv.value());
+ writer.write(kv);
}
return partition;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 22460fee..0ee8e79f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -34,7 +34,6 @@ import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -45,8 +44,6 @@ import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link AppendOnlyFileStoreTable}. */
-// TODO enable this test class after append only file store with avro format is fixed
-@Disabled
public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
@TempDir java.nio.file.Path tempDir;
@@ -59,10 +56,10 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().plan().splits;
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(
+ .hasSameElementsAs(
Arrays.asList("1|10|100", "1|11|101", "1|12|102", "1|11|101", "1|12|102"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(Arrays.asList("2|20|200", "2|21|201", "2|22|202", "2|21|201"));
+ .hasSameElementsAs(Arrays.asList("2|20|200", "2|21|201", "2|22|202", "2|21|201"));
}
@Test
@@ -73,9 +70,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().plan().splits;
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_PROJECTED_ROW_TO_STRING))
- .isEqualTo(Arrays.asList("100|10", "101|11", "102|12", "101|11", "102|12"));
+ .hasSameElementsAs(Arrays.asList("100|10", "101|11", "102|12", "101|11", "102|12"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_PROJECTED_ROW_TO_STRING))
- .isEqualTo(Arrays.asList("200|20", "201|21", "202|22", "201|21"));
+ .hasSameElementsAs(Arrays.asList("200|20", "201|21", "202|22", "201|21"));
}
@Test
@@ -90,7 +87,7 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(
+ .hasSameElementsAs(
Arrays.asList(
"2|21|201",
// this record is in the same file with the first "2|21|201"
@@ -108,7 +105,7 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
.isEqualTo(Arrays.asList("+1|11|101", "+1|12|102"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
- .hasSameElementsAs(Collections.singletonList("+2|21|201"));
+ .isEqualTo(Collections.singletonList("+2|21|201"));
}
@Test
@@ -119,9 +116,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().withIncremental(true).plan().splits;
TableRead read = table.newRead().withIncremental(true).withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_PROJECTED_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("+101|11", "+102|12"));
+ .isEqualTo(Arrays.asList("+101|11", "+102|12"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_PROJECTED_ROW_TO_STRING))
- .hasSameElementsAs(Collections.singletonList("+201|21"));
+ .isEqualTo(Collections.singletonList("+201|21"));
}
@Test
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 7154062c..b6fa2abc 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -57,9 +57,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
List<Split> splits = table.newScan().plan().splits;
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("1|11|101", "1|12|102", "1|11|101"));
+ .isEqualTo(Arrays.asList("1|11|101", "1|11|101", "1|12|102"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("2|20|200", "2|21|201", "2|22|202"));
+ .isEqualTo(Arrays.asList("2|20|200", "2|21|201", "2|22|202"));
}
@Test
@@ -70,9 +70,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
List<Split> splits = table.newScan().plan().splits;
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_PROJECTED_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("101|11", "102|12", "101|11"));
+ .isEqualTo(Arrays.asList("101|11", "101|11", "102|12"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_PROJECTED_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("200|20", "201|21", "202|22"));
+ .isEqualTo(Arrays.asList("200|20", "201|21", "202|22"));
}
@Test
@@ -87,7 +87,7 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(
+ .isEqualTo(
Arrays.asList(
"2|21|201",
// this record is in the same file with "delete 2|21|201"
@@ -102,9 +102,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
List<Split> splits = table.newScan().withIncremental(true).plan().splits;
TableRead read = table.newRead().withIncremental(true);
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("+1|11|101", "-1|10|100"));
+ .isEqualTo(Arrays.asList("-1|10|100", "+1|11|101"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("+2|22|202", "-2|21|201"));
+ .isEqualTo(Arrays.asList("-2|21|201", "-2|21|201", "+2|22|202"));
}
@Test
@@ -115,9 +115,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
List<Split> splits = table.newScan().withIncremental(true).plan().splits;
TableRead read = table.newRead().withIncremental(true).withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_PROJECTED_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("+101|11", "-100|10"));
+ .isEqualTo(Arrays.asList("-100|10", "+101|11"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_PROJECTED_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("+202|22", "-201|21"));
+ .isEqualTo(Arrays.asList("-201|21", "-201|21", "+202|22"));
}
@Test
@@ -133,8 +133,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
TableRead read = table.newRead().withIncremental(true);
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
- .hasSameElementsAs(
+ .isEqualTo(
Arrays.asList(
+ "-2|21|201",
"-2|21|201",
// this record is in the same file with "delete 2|21|201"
"+2|22|202"));
@@ -149,6 +150,7 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
write.write(GenericRowData.of(1, 11, 101L));
table.newCommit().commit("0", write.prepareCommit());
+ write.write(GenericRowData.of(2, 21, 201L));
write.write(GenericRowData.of(1, 12, 102L));
write.write(GenericRowData.of(2, 21, 201L));
write.write(GenericRowData.of(2, 21, 201L));
@@ -156,6 +158,7 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
write.write(GenericRowData.of(1, 11, 101L));
write.write(GenericRowData.of(2, 22, 202L));
+ write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
table.newCommit().commit("2", write.prepareCommit());
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index b5da3cd4..08463278 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -57,9 +57,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().plan().splits;
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(Collections.singletonList("1|10|1000"));
+ .isEqualTo(Collections.singletonList("1|10|1000"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("2|21|20001", "2|22|202"));
+ .isEqualTo(Arrays.asList("2|21|20001", "2|22|202"));
}
@Test
@@ -70,9 +70,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().plan().splits;
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_PROJECTED_ROW_TO_STRING))
- .hasSameElementsAs(Collections.singletonList("1000|10"));
+ .isEqualTo(Collections.singletonList("1000|10"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_PROJECTED_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("20001|21", "202|22"));
+ .isEqualTo(Arrays.asList("20001|21", "202|22"));
}
@Test
@@ -91,7 +91,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(
+ .isEqualTo(
Arrays.asList(
// only filter on key should be performed,
// and records from the same file should also be selected
@@ -106,9 +106,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().withIncremental(true).plan().splits;
TableRead read = table.newRead().withIncremental(true);
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
- .hasSameElementsAs(Collections.singletonList("-1|11|1001"));
+ .isEqualTo(Collections.singletonList("-1|11|1001"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("+2|21|20001", "+2|22|202", "-2|20|200"));
+ .isEqualTo(Arrays.asList("-2|20|200", "+2|21|20001", "+2|22|202"));
}
@Test
@@ -120,9 +120,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
TableRead read = table.newRead().withIncremental(true).withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_PROJECTED_ROW_TO_STRING))
- .hasSameElementsAs(Collections.singletonList("-1001|11"));
+ .isEqualTo(Collections.singletonList("-1001|11"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_PROJECTED_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("+20001|21", "+202|22", "-200|20"));
+ .isEqualTo(Arrays.asList("-200|20", "+20001|21", "+202|22"));
}
@Test
@@ -142,11 +142,11 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
TableRead read = table.newRead().withIncremental(true);
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
- .hasSameElementsAs(
+ .isEqualTo(
Arrays.asList(
// only filter on key should be performed,
// and records from the same file should also be selected
- "+2|21|20001", "+2|22|202", "-2|20|200"));
+ "-2|20|200", "+2|21|20001", "+2|22|202"));
}
private void writeData() throws Exception {
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
index 53a031c5..75ba824c 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
@@ -22,14 +22,12 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.AppendOnlyWriter;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFilePathFactory;
import org.apache.flink.table.store.file.data.DataFileTest;
import org.apache.flink.table.store.file.data.DataFileWriter;
import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.writer.AppendOnlyWriter;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -68,8 +66,6 @@ public class FileFormatSuffixTest extends DataFileTest {
AppendOnlyWriter appendOnlyWriter =
new AppendOnlyWriter(0, fileFormat, 10, SCHEMA, 10, dataFilePathFactory);
appendOnlyWriter.write(
- ValueKind.ADD,
- BinaryRowDataUtil.EMPTY_ROW,
GenericRowData.of(1, StringData.fromString("aaa"), StringData.fromString("1")));
List<DataFileMeta> result = appendOnlyWriter.close();