You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/17 03:31:39 UTC
[incubator-paimon] branch master updated: [FLINK-31430] Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite (#605)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1af16a5a [FLINK-31430] Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite (#605)
1af16a5a is described below
commit 1af16a5a1cb27b6b72bbcef9e5862fda84d8c996
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Mar 17 11:31:34 2023 +0800
[FLINK-31430] Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite (#605)
---
.../file/append/AppendOnlyCompactManager.java | 5 +
.../table/store/file/append/AppendOnlyWriter.java | 29 +--
.../table/store/file/compact/CompactManager.java | 3 +
.../store/file/compact/NoopCompactManager.java | 7 +
.../store/file/mergetree/MergeTreeWriter.java | 33 ++--
.../mergetree/compact/MergeTreeCompactManager.java | 5 +
.../file/operation/AbstractFileStoreWrite.java | 210 ++++++++++++++++-----
.../file/operation/AppendOnlyFileStoreWrite.java | 30 +--
.../file/operation/KeyValueFileStoreWrite.java | 32 +---
.../table/store/file/utils/CommitIncrement.java | 47 +++++
.../flink/table/store/file/utils/RecordWriter.java | 13 +-
.../utils/Restorable.java} | 23 +--
.../table/store/table/sink/TableWriteImpl.java | 19 +-
.../table/store/table/source/StreamTableScan.java | 13 +-
.../flink/table/store/file/TestFileStore.java | 17 +-
.../store/file/append/AppendOnlyWriterTest.java | 14 +-
.../store/file/format/FileFormatSuffixTest.java | 7 +-
.../table/store/file/mergetree/MergeTreeTest.java | 10 +-
.../store/file/operation/TestCommitThread.java | 23 +--
.../table/store/table/sink/TableWriteTest.java | 187 ++++++++++++++++++
.../source/FileStoreSourceReaderTest.java | 2 +-
.../source/FileStoreSourceSplitReaderTest.java | 33 +---
.../source/TestChangelogDataReadWrite.java | 10 +-
23 files changed, 538 insertions(+), 234 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
index 132995e0..c7519a54 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
@@ -116,6 +116,11 @@ public class AppendOnlyCompactManager extends CompactFutureManager {
toCompact.add(file);
}
+ @Override
+ public List<DataFileMeta> allFiles() {
+ return toCompact;
+ }
+
/** Finish current task, and update result files to {@link #toCompact}. */
@Override
public Optional<CompactResult> getCompactionResult(boolean blocking)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
index 6ec7883c..91005835 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.DataFilePathFactory;
import org.apache.flink.table.store.file.io.NewFilesIncrement;
import org.apache.flink.table.store.file.io.RowDataRollingFileWriter;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.fs.FileIO;
@@ -34,6 +35,8 @@ import org.apache.flink.table.store.types.RowType;
import org.apache.flink.table.store.utils.LongCounter;
import org.apache.flink.table.store.utils.Preconditions;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -71,7 +74,8 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> {
long maxSequenceNumber,
CompactManager compactManager,
boolean forceCompact,
- DataFilePathFactory pathFactory) {
+ DataFilePathFactory pathFactory,
+ @Nullable CommitIncrement increment) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
@@ -86,6 +90,12 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> {
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
this.writer = createRollingRowWriter();
+
+ if (increment != null) {
+ newFiles.addAll(increment.newFilesIncrement().newFiles());
+ compactBefore.addAll(increment.compactIncrement().compactBefore());
+ compactAfter.addAll(increment.compactIncrement().compactAfter());
+ }
}
@Override
@@ -107,6 +117,11 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> {
files.forEach(compactManager::addNewFile);
}
+ @Override
+ public List<DataFileMeta> dataFiles() {
+ return compactManager.allFiles();
+ }
+
@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception {
flushWriter(false, false);
@@ -186,16 +201,6 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> {
compactBefore.clear();
compactAfter.clear();
- return new CommitIncrement() {
- @Override
- public NewFilesIncrement newFilesIncrement() {
- return newFilesIncrement;
- }
-
- @Override
- public CompactIncrement compactIncrement() {
- return compactIncrement;
- }
- };
+ return new CommitIncrement(newFilesIncrement, compactIncrement);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
index 982405d4..24d0dfdb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.compact;
import org.apache.flink.table.store.file.io.DataFileMeta;
import java.io.Closeable;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -33,6 +34,8 @@ public interface CompactManager extends Closeable {
/** Add a new file. */
void addNewFile(DataFileMeta file);
+ List<DataFileMeta> allFiles();
+
/**
* Trigger a new compaction task.
*
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
index 3aff478d..071dbd76 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
@@ -23,6 +23,8 @@ import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.utils.Preconditions;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -39,6 +41,11 @@ public class NoopCompactManager implements CompactManager {
@Override
public void addNewFile(DataFileMeta file) {}
+ @Override
+ public List<DataFileMeta> allFiles() {
+ return Collections.emptyList();
+ }
+
@Override
public void triggerCompaction(boolean fullCompaction) {
Preconditions.checkArgument(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 28a1f742..4111e7f8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -33,9 +33,12 @@ import org.apache.flink.table.store.file.io.RollingFileWriter;
import org.apache.flink.table.store.file.memory.MemoryOwner;
import org.apache.flink.table.store.file.memory.MemorySegmentPool;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.types.RowType;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
@@ -80,7 +83,8 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
MergeFunction<KeyValue> mergeFunction,
KeyValueFileWriterFactory writerFactory,
boolean commitForceCompact,
- ChangelogProducer changelogProducer) {
+ ChangelogProducer changelogProducer,
+ @Nullable CommitIncrement increment) {
this.writeBufferSpillable = writeBufferSpillable;
this.sortMaxFan = sortMaxFan;
this.ioManager = ioManager;
@@ -99,6 +103,16 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
this.compactBefore = new LinkedHashMap<>();
this.compactAfter = new LinkedHashSet<>();
this.compactChangelog = new LinkedHashSet<>();
+ if (increment != null) {
+ newFiles.addAll(increment.newFilesIncrement().newFiles());
+ newFilesChangelog.addAll(increment.newFilesIncrement().changelogFiles());
+ increment
+ .compactIncrement()
+ .compactBefore()
+ .forEach(f -> compactBefore.put(f.fileName(), f));
+ compactAfter.addAll(increment.compactIncrement().compactAfter());
+ compactChangelog.addAll(increment.compactIncrement().changelogFiles());
+ }
}
private long newSequenceNumber() {
@@ -148,6 +162,11 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
files.forEach(compactManager::addNewFile);
}
+ @Override
+ public List<DataFileMeta> dataFiles() {
+ return compactManager.allFiles();
+ }
+
@Override
public long memoryOccupancy() {
return writeBuffer.memoryOccupancy();
@@ -232,17 +251,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
compactAfter.clear();
compactChangelog.clear();
- return new CommitIncrement() {
- @Override
- public NewFilesIncrement newFilesIncrement() {
- return newFilesIncrement;
- }
-
- @Override
- public CompactIncrement compactIncrement() {
- return compactIncrement;
- }
- };
+ return new CommitIncrement(newFilesIncrement, compactIncrement);
}
private void updateCompactResult(CompactResult result) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
index 58c02c4c..9efc971b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
@@ -79,6 +79,11 @@ public class MergeTreeCompactManager extends CompactFutureManager {
levels.addLevel0File(file);
}
+ @Override
+ public List<DataFileMeta> allFiles() {
+ return levels.allFiles();
+ }
+
@Override
public void triggerCompaction(boolean fullCompaction) {
Optional<CompactUnit> optionalUnit;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
index 74d581df..3a3e79a9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
@@ -24,8 +24,10 @@ import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.disk.IOManager;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
import org.apache.flink.table.store.file.utils.ExecutorThreadFactory;
import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.utils.Restorable;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.sink.CommitMessage;
import org.apache.flink.table.store.table.sink.CommitMessageImpl;
@@ -50,12 +52,13 @@ import java.util.concurrent.Executors;
*
* @param <T> type of record to write.
*/
-public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
+public abstract class AbstractFileStoreWrite<T>
+ implements FileStoreWrite<T>, Restorable<List<AbstractFileStoreWrite.State>> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);
private final String commitUser;
- protected final SnapshotManager snapshotManager;
+ private final SnapshotManager snapshotManager;
private final FileStoreScan scan;
@Nullable protected IOManager ioManager;
@@ -74,35 +77,12 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
this.writers = new HashMap<>();
}
- private ExecutorService compactExecutor() {
- if (lazyCompactExecutor == null) {
- lazyCompactExecutor =
- Executors.newSingleThreadScheduledExecutor(
- new ExecutorThreadFactory(
- Thread.currentThread().getName() + "-compaction"));
- }
- return lazyCompactExecutor;
- }
-
@Override
public FileStoreWrite<T> withIOManager(IOManager ioManager) {
this.ioManager = ioManager;
return this;
}
- protected List<DataFileMeta> scanExistingFileMetas(
- Long snapshotId, BinaryRow partition, int bucket) {
- List<DataFileMeta> existingFileMetas = new ArrayList<>();
- if (snapshotId != null) {
- // Concat all the DataFileMeta of existing files into existingFileMetas.
- scan.withSnapshot(snapshotId).withPartitionFilter(Collections.singletonList(partition))
- .withBucket(bucket).plan().files().stream()
- .map(ManifestEntry::file)
- .forEach(existingFileMetas::add);
- }
- return existingFileMetas;
- }
-
public void withOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
@@ -177,8 +157,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
int bucket = entry.getKey();
WriterContainer<T> writerContainer = entry.getValue();
- RecordWriter.CommitIncrement increment =
- writerContainer.writer.prepareCommit(waitCompaction);
+ CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction);
CommitMessageImpl committable =
new CommitMessageImpl(
partition,
@@ -232,6 +211,68 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
}
}
+ @Override
+ public List<State> checkpoint() {
+ List<State> result = new ArrayList<>();
+
+ for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partitionEntry :
+ writers.entrySet()) {
+ BinaryRow partition = partitionEntry.getKey();
+ for (Map.Entry<Integer, WriterContainer<T>> bucketEntry :
+ partitionEntry.getValue().entrySet()) {
+ int bucket = bucketEntry.getKey();
+ WriterContainer<T> writerContainer = bucketEntry.getValue();
+
+ CommitIncrement increment;
+ try {
+ increment = writerContainer.writer.prepareCommit(false);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to extract state from writer of partition "
+ + partition
+ + " bucket "
+ + bucket,
+ e);
+ }
+ // writer.allFiles() must be fetched after writer.prepareCommit(), because
+ // compaction result might be updated during prepareCommit
+ List<DataFileMeta> dataFiles = writerContainer.writer.dataFiles();
+ result.add(
+ new State(
+ partition,
+ bucket,
+ writerContainer.baseSnapshotId,
+ writerContainer.lastModifiedCommitIdentifier,
+ dataFiles,
+ increment));
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Extracted state " + result.toString());
+ }
+ return result;
+ }
+
+ @Override
+ public void restore(List<State> states) {
+ for (State state : states) {
+ RecordWriter<T> writer =
+ createWriter(
+ state.partition,
+ state.bucket,
+ state.dataFiles,
+ state.commitIncrement,
+ compactExecutor());
+ notifyNewWriter(writer);
+ WriterContainer<T> writerContainer =
+ new WriterContainer<>(writer, state.baseSnapshotId);
+ writerContainer.lastModifiedCommitIdentifier = state.lastModifiedCommitIdentifier;
+ writers.computeIfAbsent(state.partition, k -> new HashMap<>())
+ .put(state.bucket, writerContainer);
+ }
+ }
+
private WriterContainer<T> getWriterWrapper(BinaryRow partition, int bucket) {
Map<Integer, WriterContainer<T>> buckets = writers.get(partition);
if (buckets == null) {
@@ -239,32 +280,70 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
writers.put(partition.copy(), buckets);
}
return buckets.computeIfAbsent(
- bucket, k -> createWriterContainer(partition.copy(), bucket));
+ bucket, k -> createWriterContainer(partition.copy(), bucket, overwrite));
}
- private WriterContainer<T> createWriterContainer(BinaryRow partition, int bucket) {
+ @VisibleForTesting
+ public WriterContainer<T> createWriterContainer(
+ BinaryRow partition, int bucket, boolean emptyWriter) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating writer for partition {}, bucket {}", partition, bucket);
}
- WriterContainer<T> writerContainer =
- overwrite
- ? createEmptyWriterContainer(partition.copy(), bucket, compactExecutor())
- : createWriterContainer(partition.copy(), bucket, compactExecutor());
- notifyNewWriter(writerContainer.writer);
- return writerContainer;
+
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ RecordWriter<T> writer;
+ if (emptyWriter) {
+ writer =
+ createWriter(
+ partition.copy(),
+ bucket,
+ Collections.emptyList(),
+ null,
+ compactExecutor());
+ } else {
+ writer =
+ createWriter(
+ partition.copy(),
+ bucket,
+ scanExistingFileMetas(latestSnapshotId, partition, bucket),
+ null,
+ compactExecutor());
+ }
+ notifyNewWriter(writer);
+ return new WriterContainer<>(writer, latestSnapshotId);
}
- protected void notifyNewWriter(RecordWriter<T> writer) {}
+ private List<DataFileMeta> scanExistingFileMetas(
+ Long snapshotId, BinaryRow partition, int bucket) {
+ List<DataFileMeta> existingFileMetas = new ArrayList<>();
+ if (snapshotId != null) {
+ // Concat all the DataFileMeta of existing files into existingFileMetas.
+ scan.withSnapshot(snapshotId).withPartitionFilter(Collections.singletonList(partition))
+ .withBucket(bucket).plan().files().stream()
+ .map(ManifestEntry::file)
+ .forEach(existingFileMetas::add);
+ }
+ return existingFileMetas;
+ }
- /** Create a {@link RecordWriter} from partition and bucket. */
- @VisibleForTesting
- public abstract WriterContainer<T> createWriterContainer(
- BinaryRow partition, int bucket, ExecutorService compactExecutor);
+ private ExecutorService compactExecutor() {
+ if (lazyCompactExecutor == null) {
+ lazyCompactExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory(
+ Thread.currentThread().getName() + "-compaction"));
+ }
+ return lazyCompactExecutor;
+ }
- /** Create an empty {@link RecordWriter} from partition and bucket. */
- @VisibleForTesting
- public abstract WriterContainer<T> createEmptyWriterContainer(
- BinaryRow partition, int bucket, ExecutorService compactExecutor);
+ protected void notifyNewWriter(RecordWriter<T> writer) {}
+
+ protected abstract RecordWriter<T> createWriter(
+ BinaryRow partition,
+ int bucket,
+ List<DataFileMeta> restoreFiles,
+ @Nullable CommitIncrement restoreIncrement,
+ ExecutorService compactExecutor);
/**
* {@link RecordWriter} with the snapshot id it is created upon and the identifier of its last
@@ -272,10 +351,9 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
*/
@VisibleForTesting
public static class WriterContainer<T> {
-
public final RecordWriter<T> writer;
- private final long baseSnapshotId;
- private long lastModifiedCommitIdentifier;
+ protected final long baseSnapshotId;
+ protected long lastModifiedCommitIdentifier;
protected WriterContainer(RecordWriter<T> writer, Long baseSnapshotId) {
this.writer = writer;
@@ -284,4 +362,42 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
}
}
+
+ /** Recoverable state of {@link AbstractFileStoreWrite}. */
+ public static class State {
+ protected final BinaryRow partition;
+ protected final int bucket;
+
+ protected final long baseSnapshotId;
+ protected final long lastModifiedCommitIdentifier;
+ protected final List<DataFileMeta> dataFiles;
+ protected final CommitIncrement commitIncrement;
+
+ protected State(
+ BinaryRow partition,
+ int bucket,
+ long baseSnapshotId,
+ long lastModifiedCommitIdentifier,
+ List<DataFileMeta> dataFiles,
+ CommitIncrement commitIncrement) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.baseSnapshotId = baseSnapshotId;
+ this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
+ this.dataFiles = dataFiles;
+ this.commitIncrement = commitIncrement;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "{%s, %d, %d, %d, %s, %s}",
+ partition,
+ bucket,
+ baseSnapshotId,
+ lastModifiedCommitIdentifier,
+ dataFiles,
+ commitIncrement);
+ }
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index c0b106a6..e46a4e8a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.compact.NoopCompactManager;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.DataFilePathFactory;
import org.apache.flink.table.store.file.io.RowDataRollingFileWriter;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -38,6 +39,8 @@ import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.types.RowType;
import org.apache.flink.table.store.utils.LongCounter;
+import javax.annotation.Nullable;
+
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -85,31 +88,11 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow
}
@Override
- public WriterContainer<InternalRow> createWriterContainer(
- BinaryRow partition, int bucket, ExecutorService compactExecutor) {
- Long latestSnapshotId = snapshotManager.latestSnapshotId();
- RecordWriter<InternalRow> writer =
- createWriter(
- partition,
- bucket,
- scanExistingFileMetas(latestSnapshotId, partition, bucket),
- compactExecutor);
- return new WriterContainer<>(writer, latestSnapshotId);
- }
-
- @Override
- public WriterContainer<InternalRow> createEmptyWriterContainer(
- BinaryRow partition, int bucket, ExecutorService compactExecutor) {
- Long latestSnapshotId = snapshotManager.latestSnapshotId();
- RecordWriter<InternalRow> writer =
- createWriter(partition, bucket, Collections.emptyList(), compactExecutor);
- return new WriterContainer<>(writer, latestSnapshotId);
- }
-
- private RecordWriter<InternalRow> createWriter(
+ protected RecordWriter<InternalRow> createWriter(
BinaryRow partition,
int bucket,
List<DataFileMeta> restoredFiles,
+ @Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor) {
// let writer and compact manager hold the same reference
// and make restore files mutable to update
@@ -136,7 +119,8 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow
getMaxSequenceNumber(restored),
compactManager,
commitForceCompact,
- factory);
+ factory,
+ restoreIncrement);
}
private AppendOnlyCompactManager.CompactRewriter compactRewriter(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 97778e29..cd5822a4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -42,8 +42,8 @@ import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactRewri
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.format.FileFormatDiscover;
import org.apache.flink.table.store.fs.FileIO;
@@ -53,7 +53,8 @@ import org.apache.flink.table.store.types.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
+import javax.annotation.Nullable;
+
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -118,31 +119,11 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
}
@Override
- public WriterContainer<KeyValue> createWriterContainer(
- BinaryRow partition, int bucket, ExecutorService compactExecutor) {
- Long latestSnapshotId = snapshotManager.latestSnapshotId();
- RecordWriter<KeyValue> writer =
- createMergeTreeWriter(
- partition,
- bucket,
- scanExistingFileMetas(latestSnapshotId, partition, bucket),
- compactExecutor);
- return new WriterContainer<>(writer, latestSnapshotId);
- }
-
- @Override
- public WriterContainer<KeyValue> createEmptyWriterContainer(
- BinaryRow partition, int bucket, ExecutorService compactExecutor) {
- Long latestSnapshotId = snapshotManager.latestSnapshotId();
- RecordWriter<KeyValue> writer =
- createMergeTreeWriter(partition, bucket, Collections.emptyList(), compactExecutor);
- return new WriterContainer<>(writer, latestSnapshotId);
- }
-
- private MergeTreeWriter createMergeTreeWriter(
+ protected MergeTreeWriter createWriter(
BinaryRow partition,
int bucket,
List<DataFileMeta> restoreFiles,
+ @Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor) {
if (LOG.isDebugEnabled()) {
LOG.debug(
@@ -178,7 +159,8 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
mfFactory.create(),
writerFactory,
options.commitForceCompact(),
- options.changelogProducer());
+ options.changelogProducer(),
+ restoreIncrement);
}
private boolean bufferSpillable() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/CommitIncrement.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/CommitIncrement.java
new file mode 100644
index 00000000..e1aa61ec
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/CommitIncrement.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.store.file.io.CompactIncrement;
+import org.apache.flink.table.store.file.io.NewFilesIncrement;
+
+/** Changes to commit. */
+public class CommitIncrement {
+
+ private final NewFilesIncrement newFilesIncrement;
+ private final CompactIncrement compactIncrement;
+
+ public CommitIncrement(NewFilesIncrement newFilesIncrement, CompactIncrement compactIncrement) {
+ this.newFilesIncrement = newFilesIncrement;
+ this.compactIncrement = compactIncrement;
+ }
+
+ public NewFilesIncrement newFilesIncrement() {
+ return newFilesIncrement;
+ }
+
+ public CompactIncrement compactIncrement() {
+ return compactIncrement;
+ }
+
+ @Override
+ public String toString() {
+ return newFilesIncrement.toString() + "\n" + compactIncrement;
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
index 6c561486..447a23f1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
@@ -18,9 +18,7 @@
package org.apache.flink.table.store.file.utils;
-import org.apache.flink.table.store.file.io.CompactIncrement;
import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.io.NewFilesIncrement;
import java.util.List;
@@ -51,6 +49,9 @@ public interface RecordWriter<T> {
*/
void addNewFiles(List<DataFileMeta> files);
+ /** Get all data files maintained by this writer. */
+ List<DataFileMeta> dataFiles();
+
/**
* Prepare for a commit.
*
@@ -67,12 +68,4 @@ public interface RecordWriter<T> {
/** Close this writer, the call will delete newly generated but not committed files. */
void close() throws Exception;
-
- /** Changes to commit. */
- interface CommitIncrement {
-
- NewFilesIncrement newFilesIncrement();
-
- CompactIncrement compactIncrement();
- }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/Restorable.java
similarity index 59%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/Restorable.java
index 98526a1e..f56ccf5f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/Restorable.java
@@ -16,24 +16,19 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.table.source;
-
-import org.apache.flink.table.store.annotation.Experimental;
-
-import javax.annotation.Nullable;
+package org.apache.flink.table.store.file.utils;
/**
- * {@link TableScan} for streaming, supports {@link #checkpoint} and {@link #restore}.
+ * Operations implementing this interface can checkpoint and restore their states between different
+ * instances.
*
- * @since 0.4.0
+ * @param <S> type of state
*/
-@Experimental
-public interface StreamTableScan extends TableScan {
+public interface Restorable<S> {
- /** Checkpoint this stream table scan, return next snapshot id. */
- @Nullable
- Long checkpoint();
+ /** Extract state of the current operation instance. */
+ S checkpoint();
- /** Restore this stream table scan, read incremental from {@code nextSnapshotId}. */
- void restore(@Nullable Long nextSnapshotId);
+ /** Restore state of a previous operation instance into the current operation instance. */
+ void restore(S state);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
index 2dd51b92..b304d22d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
@@ -22,7 +22,9 @@ import org.apache.flink.table.store.data.BinaryRow;
import org.apache.flink.table.store.data.InternalRow;
import org.apache.flink.table.store.file.disk.IOManager;
import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.operation.AbstractFileStoreWrite;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.utils.Restorable;
import java.util.List;
@@ -33,9 +35,10 @@ import static org.apache.flink.table.store.utils.Preconditions.checkState;
*
* @param <T> type of record to write into {@link org.apache.flink.table.store.file.FileStore}.
*/
-public class TableWriteImpl<T> implements InnerTableWrite {
+public class TableWriteImpl<T>
+ implements InnerTableWrite, Restorable<List<AbstractFileStoreWrite.State>> {
- private final FileStoreWrite<T> write;
+ private final AbstractFileStoreWrite<T> write;
private final SinkRecordConverter recordConverter;
private final RecordExtractor<T> recordExtractor;
@@ -45,7 +48,7 @@ public class TableWriteImpl<T> implements InnerTableWrite {
FileStoreWrite<T> write,
SinkRecordConverter recordConverter,
RecordExtractor<T> recordExtractor) {
- this.write = write;
+ this.write = (AbstractFileStoreWrite<T>) write;
this.recordConverter = recordConverter;
this.recordExtractor = recordExtractor;
}
@@ -121,6 +124,16 @@ public class TableWriteImpl<T> implements InnerTableWrite {
write.close();
}
+ @Override
+ public List<AbstractFileStoreWrite.State> checkpoint() {
+ return write.checkpoint();
+ }
+
+ @Override
+ public void restore(List<AbstractFileStoreWrite.State> state) {
+ write.restore(state);
+ }
+
/** Extractor to extract {@link T} from the {@link SinkRecord}. */
public interface RecordExtractor<T> {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
index 98526a1e..9fd99f51 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
@@ -19,8 +19,7 @@
package org.apache.flink.table.store.table.source;
import org.apache.flink.table.store.annotation.Experimental;
-
-import javax.annotation.Nullable;
+import org.apache.flink.table.store.file.utils.Restorable;
/**
* {@link TableScan} for streaming, supports {@link #checkpoint} and {@link #restore}.
@@ -28,12 +27,4 @@ import javax.annotation.Nullable;
* @since 0.4.0
*/
@Experimental
-public interface StreamTableScan extends TableScan {
-
- /** Checkpoint this stream table scan, return next snapshot id. */
- @Nullable
- Long checkpoint();
-
- /** Restore this stream table scan, read incremental from {@code nextSnapshotId}. */
- void restore(@Nullable Long nextSnapshotId);
-}
+public interface StreamTableScan extends TableScan, Restorable<Long> {}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index c776b39a..3261ee59 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -65,8 +66,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.Function;
@@ -214,15 +213,10 @@ public class TestFileStore extends KeyValueFileStore {
bucket,
(b, w) -> {
if (w == null) {
- ExecutorService service = Executors.newSingleThreadExecutor();
RecordWriter<KeyValue> writer =
- emptyWriter
- ? write.createEmptyWriterContainer(
- partition, bucket, service)
- .writer
- : write.createWriterContainer(
- partition, bucket, service)
- .writer;
+ write.createWriterContainer(
+ partition, bucket, emptyWriter)
+ .writer;
((MemoryOwner) writer)
.setMemoryPool(
new HeapMemorySegmentPool(
@@ -244,8 +238,7 @@ public class TestFileStore extends KeyValueFileStore {
writers.entrySet()) {
for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket :
entryWithPartition.getValue().entrySet()) {
- RecordWriter.CommitIncrement increment =
- entryWithBucket.getValue().prepareCommit(emptyWriter);
+ CommitIncrement increment = entryWithBucket.getValue().prepareCommit(emptyWriter);
committable.addFileCommittable(
new CommitMessageImpl(
entryWithPartition.getKey(),
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
index 0c06c00b..fcd1ecdb 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.data.InternalRow;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.DataFilePathFactory;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
import org.apache.flink.table.store.file.utils.ExecutorThreadFactory;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.format.FieldStats;
@@ -87,7 +88,7 @@ public class AppendOnlyWriterTest {
for (int i = 0; i < 3; i++) {
writer.sync();
- RecordWriter.CommitIncrement inc = writer.prepareCommit(true);
+ CommitIncrement inc = writer.prepareCommit(true);
assertThat(inc.newFilesIncrement().isEmpty()).isTrue();
assertThat(inc.compactIncrement().isEmpty()).isTrue();
@@ -98,7 +99,7 @@ public class AppendOnlyWriterTest {
public void testSingleWrite() throws Exception {
RecordWriter<InternalRow> writer = createEmptyWriter(1024 * 1024L);
writer.write(row(1, "AAA", PART));
- RecordWriter.CommitIncrement increment = writer.prepareCommit(true);
+ CommitIncrement increment = writer.prepareCommit(true);
writer.close();
assertThat(increment.newFilesIncrement().newFiles().size()).isEqualTo(1);
@@ -140,7 +141,7 @@ public class AppendOnlyWriterTest {
}
writer.sync();
- RecordWriter.CommitIncrement inc = writer.prepareCommit(true);
+ CommitIncrement inc = writer.prepareCommit(true);
if (txn > 0 && txn % 3 == 0) {
assertThat(inc.compactIncrement().compactBefore()).hasSize(4);
assertThat(inc.compactIncrement().compactAfter()).hasSize(1);
@@ -199,7 +200,7 @@ public class AppendOnlyWriterTest {
}
writer.sync();
- RecordWriter.CommitIncrement firstInc = writer.prepareCommit(true);
+ CommitIncrement firstInc = writer.prepareCommit(true);
assertThat(firstInc.compactIncrement().compactBefore()).isEqualTo(Collections.emptyList());
assertThat(firstInc.compactIncrement().compactAfter()).isEqualTo(Collections.emptyList());
@@ -241,7 +242,7 @@ public class AppendOnlyWriterTest {
assertThat(toCompact).containsExactlyElementsOf(firstInc.newFilesIncrement().newFiles());
writer.write(row(id, String.format("%03d", id), PART));
writer.sync();
- RecordWriter.CommitIncrement secInc = writer.prepareCommit(true);
+ CommitIncrement secInc = writer.prepareCommit(true);
// check compact before and after
List<DataFileMeta> compactBefore = secInc.compactIncrement().compactBefore();
@@ -328,7 +329,8 @@ public class AppendOnlyWriterTest {
generateCompactAfter(compactBefore)),
pathFactory),
forceCompact,
- pathFactory),
+ pathFactory,
+ null),
toCompact);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
index 8a1707e2..9024ac42 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.DataFilePathFactory;
import org.apache.flink.table.store.file.io.KeyValueFileReadWriteTest;
import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.fs.Path;
import org.apache.flink.table.store.fs.local.LocalFileIO;
@@ -79,10 +79,11 @@ public class FileFormatSuffixTest extends KeyValueFileReadWriteTest {
null,
dataFilePathFactory), // not used
false,
- dataFilePathFactory);
+ dataFilePathFactory,
+ null);
appendOnlyWriter.write(
GenericRow.of(1, BinaryString.fromString("aaa"), BinaryString.fromString("1")));
- RecordWriter.CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
+ CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
appendOnlyWriter.close();
DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 6707649c..3dcb3453 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.format.FileFormat;
@@ -254,7 +255,7 @@ public class MergeTreeTest {
}
writeAll(records);
expected.addAll(records);
- RecordWriter.CommitIncrement increment = writer.prepareCommit(true);
+ CommitIncrement increment = writer.prepareCommit(true);
mergeCompacted(newFileNames, compactedFiles, increment);
}
writer.close();
@@ -285,7 +286,7 @@ public class MergeTreeTest {
writer.sync();
}
- RecordWriter.CommitIncrement increment = writer.prepareCommit(true);
+ CommitIncrement increment = writer.prepareCommit(true);
newFiles.addAll(increment.newFilesIncrement().newFiles());
mergeCompacted(newFileNames, compactedFiles, increment);
}
@@ -327,7 +328,8 @@ public class MergeTreeTest {
DeduplicateMergeFunction.factory().create(),
writerFactory,
options.commitForceCompact(),
- ChangelogProducer.NONE);
+ ChangelogProducer.NONE,
+ null);
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
return writer;
@@ -354,7 +356,7 @@ public class MergeTreeTest {
private void mergeCompacted(
Set<String> newFileNames,
List<DataFileMeta> compactedFiles,
- RecordWriter.CommitIncrement increment) {
+ CommitIncrement increment) {
increment.newFilesIncrement().newFiles().stream()
.map(DataFileMeta::fileName)
.forEach(newFileNames::add);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 4780421c..232c6f7f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
import org.apache.flink.table.store.table.sink.CommitMessageImpl;
import org.apache.flink.table.store.types.RowType;
@@ -41,8 +41,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -139,7 +137,7 @@ public class TestCommitThread extends Thread {
}
ManifestCommittable committable = new ManifestCommittable(commitIdentifier++);
for (Map.Entry<BinaryRow, MergeTreeWriter> entry : writers.entrySet()) {
- RecordWriter.CommitIncrement inc = entry.getValue().prepareCommit(true);
+ CommitIncrement inc = entry.getValue().prepareCommit(true);
committable.addFileCommittable(
new CommitMessageImpl(
entry.getKey(), 0, inc.newFilesIncrement(), inc.compactIncrement()));
@@ -151,7 +149,7 @@ public class TestCommitThread extends Thread {
private void doOverwrite() throws Exception {
BinaryRow partition = overwriteData();
ManifestCommittable committable = new ManifestCommittable(commitIdentifier++);
- RecordWriter.CommitIncrement inc = writers.get(partition).prepareCommit(true);
+ CommitIncrement inc = writers.get(partition).prepareCommit(true);
committable.addFileCommittable(
new CommitMessageImpl(
partition, 0, inc.newFilesIncrement(), inc.compactIncrement()));
@@ -174,7 +172,7 @@ public class TestCommitThread extends Thread {
MergeTreeWriter writer =
writers.computeIfAbsent(partition, p -> createWriter(p, false));
writer.compact(true);
- RecordWriter.CommitIncrement inc = writer.prepareCommit(true);
+ CommitIncrement inc = writer.prepareCommit(true);
committable.addFileCommittable(
new CommitMessageImpl(
partition, 0, inc.newFilesIncrement(), inc.compactIncrement()));
@@ -275,19 +273,8 @@ public class TestCommitThread extends Thread {
}
private MergeTreeWriter createWriter(BinaryRow partition, boolean empty) {
- ExecutorService service =
- Executors.newSingleThreadExecutor(
- r -> {
- Thread t = new Thread(r);
- t.setName(Thread.currentThread().getName() + "-writer-service-pool");
- return t;
- });
MergeTreeWriter writer =
- empty
- ? (MergeTreeWriter)
- write.createEmptyWriterContainer(partition, 0, service).writer
- : (MergeTreeWriter)
- write.createWriterContainer(partition, 0, service).writer;
+ (MergeTreeWriter) write.createWriterContainer(partition, 0, empty).writer;
writer.setMemoryPool(
new HeapMemorySegmentPool(
WRITE_BUFFER_SIZE.getBytes(), (int) PAGE_SIZE.getBytes()));
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/TableWriteTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/TableWriteTest.java
new file mode 100644
index 00000000..c9f6cd93
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/TableWriteTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.data.GenericRow;
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.file.operation.AbstractFileStoreWrite;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.SchemaUtils;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.utils.TraceableFileIO;
+import org.apache.flink.table.store.fs.Path;
+import org.apache.flink.table.store.fs.local.LocalFileIO;
+import org.apache.flink.table.store.options.MemorySize;
+import org.apache.flink.table.store.options.Options;
+import org.apache.flink.table.store.reader.RecordReaderIterator;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.types.DataType;
+import org.apache.flink.table.store.types.DataTypes;
+import org.apache.flink.table.store.types.RowType;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TableWriteImpl}. */
+public class TableWriteTest {
+
+ private static final RowType ROW_TYPE =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()},
+ new String[] {"pt", "k", "v"});
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private Path tablePath;
+ private String commitUser;
+
+ @BeforeEach
+ public void before() {
+ tablePath = new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString());
+ commitUser = UUID.randomUUID().toString();
+ }
+
+ @AfterEach
+ public void after() {
+ // assert all connections are closed
+ Predicate<Path> pathPredicate = path -> path.toString().contains(tempDir.toString());
+ assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
+ assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
+ }
+
+ @Test
+ public void testExtractAndRecoverState() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int commitCount = random.nextInt(10) + 1;
+ int extractCount = random.nextInt(10) + 1;
+ int numRecords = 1000;
+ int numPartitions = 2;
+ int numKeys = 100;
+
+ Map<Integer, List<Event>> events = new HashMap<>();
+ for (int i = 0; i < commitCount; i++) {
+ int prepareTime = random.nextInt(numRecords);
+ int commitTime = random.nextInt(prepareTime, numRecords);
+ events.computeIfAbsent(prepareTime, k -> new ArrayList<>()).add(Event.PREPARE_COMMIT);
+ events.computeIfAbsent(commitTime, k -> new ArrayList<>()).add(Event.COMMIT);
+ }
+ for (int i = 0; i < extractCount; i++) {
+ int extractTime = random.nextInt(numRecords);
+ List<Event> eventList = events.computeIfAbsent(extractTime, k -> new ArrayList<>());
+ eventList.add(random.nextInt(eventList.size() + 1), Event.EXTRACT_STATE);
+ }
+
+ FileStoreTable table = createFileStoreTable();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ Map<String, Long> expected = new HashMap<>();
+ List<List<CommitMessage>> commitList = new ArrayList<>();
+ int commitId = 0;
+ for (int i = 0; i < numRecords; i++) {
+ if (events.containsKey(i)) {
+ List<Event> eventList = events.get(i);
+ for (Event event : eventList) {
+ switch (event) {
+ case PREPARE_COMMIT:
+ List<CommitMessage> messages =
+ write.prepareCommit(false, commitList.size());
+ commitList.add(messages);
+ break;
+ case COMMIT:
+ commit.commit(commitId, commitList.get(commitId));
+ commitId++;
+ break;
+ case EXTRACT_STATE:
+ List<AbstractFileStoreWrite.State> state = write.checkpoint();
+ write.close();
+ write = table.newWrite(commitUser);
+ write.restore(state);
+ break;
+ }
+ }
+ }
+
+ int partition = random.nextInt(numPartitions);
+ int key = random.nextInt(numKeys);
+ long value = random.nextLong();
+ write.write(GenericRow.of(partition, key, value));
+ expected.put(partition + "|" + key, value);
+ }
+
+ assertThat(commitId).isEqualTo(commitCount);
+ List<CommitMessage> messages = write.prepareCommit(false, commitCount);
+ commit.commit(commitCount, messages);
+ write.close();
+ commit.close();
+
+ Map<String, Long> actual = new HashMap<>();
+ TableScan.Plan plan = table.newScan().plan();
+ try (RecordReaderIterator<InternalRow> it =
+ new RecordReaderIterator<>(table.newRead().createReader(plan))) {
+ while (it.hasNext()) {
+ InternalRow row = it.next();
+ actual.put(row.getInt(0) + "|" + row.getInt(1), row.getLong(2));
+ }
+ }
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private enum Event {
+ PREPARE_COMMIT,
+ COMMIT,
+ EXTRACT_STATE
+ }
+
+ private FileStoreTable createFileStoreTable() throws Exception {
+ Options conf = new Options();
+ conf.set(CoreOptions.BUCKET, 2);
+ conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+ conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ ROW_TYPE.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ conf.toMap(),
+ ""));
+ return FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema);
+ }
+}
diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
index 12c797d4..df4e3d6b 100644
--- a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
+++ b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
@@ -89,7 +89,7 @@ public class FileStoreSourceReaderTest {
return new FileStoreSourceReader<>(
RecordsFunction.forIterate(),
context,
- new TestChangelogDataReadWrite(tempDir.toString(), null).createReadWithKey(),
+ new TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
null);
}
diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index 9ab21e06..4c65cf54 100644
--- a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -38,8 +38,6 @@ import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -50,8 +48,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -65,21 +61,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link FileStoreSourceSplitReader}. */
public class FileStoreSourceSplitReaderTest {
- private static ExecutorService service;
-
@TempDir java.nio.file.Path tempDir;
- @BeforeAll
- public static void before() {
- service = Executors.newSingleThreadExecutor();
- }
-
- @AfterAll
- public static void after() {
- service.shutdownNow();
- service = null;
- }
-
@BeforeEach
public void beforeEach() throws Exception {
SchemaManager schemaManager =
@@ -126,7 +109,7 @@ public class FileStoreSourceSplitReaderTest {
}
private void innerTestOnce(boolean valueCountMode, int skip) throws Exception {
- TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+ TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
createReader(
valueCountMode ? rw.createReadWithValueCount() : rw.createReadWithKey(),
@@ -170,7 +153,7 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testPrimaryKeyWithDelete() throws Exception {
- TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+ TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
createReader(rw.createReadWithKey(), null);
@@ -213,7 +196,7 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testMultipleBatchInSplit() throws Exception {
- TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+ TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
createReader(rw.createReadWithKey(), null);
@@ -250,7 +233,7 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testRestore() throws Exception {
- TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+ TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
createReader(rw.createReadWithKey(), null);
@@ -277,7 +260,7 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testRestoreMultipleBatchInSplit() throws Exception {
- TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+ TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
createReader(rw.createReadWithKey(), null);
@@ -309,7 +292,7 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testMultipleSplits() throws Exception {
- TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+ TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
createReader(rw.createReadWithKey(), null);
@@ -348,7 +331,7 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testNoSplit() throws Exception {
- TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+ TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
createReader(rw.createReadWithKey(), null);
assertThatThrownBy(reader::fetch).hasMessageContaining("no split remaining");
@@ -357,7 +340,7 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testLimit() throws Exception {
- TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+ TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
createReader(rw.createReadWithKey(), 2L);
diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 397bdf2f..294e9d9c 100644
--- a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -51,14 +51,12 @@ import org.apache.flink.table.store.types.DataField;
import org.apache.flink.table.store.types.IntType;
import org.apache.flink.table.store.types.RowKind;
import org.apache.flink.table.store.types.RowType;
-import org.apache.flink.table.store.utils.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import static java.util.Collections.singletonList;
@@ -97,10 +95,9 @@ public class TestChangelogDataReadWrite {
private final Path tablePath;
private final FileStorePathFactory pathFactory;
private final SnapshotManager snapshotManager;
- private final ExecutorService service;
private final String commitUser;
- public TestChangelogDataReadWrite(String root, ExecutorService service) {
+ public TestChangelogDataReadWrite(String root) {
this.avro = FileFormat.fromIdentifier("avro", new Options());
this.tablePath = new Path(root);
this.pathFactory =
@@ -110,7 +107,6 @@ public class TestChangelogDataReadWrite {
"default",
CoreOptions.FILE_FORMAT.defaultValue());
this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new Path(root));
- this.service = service;
this.commitUser = UUID.randomUUID().toString();
}
@@ -160,8 +156,6 @@ public class TestChangelogDataReadWrite {
public List<DataFileMeta> writeFiles(
BinaryRow partition, int bucket, List<Tuple2<Long, Long>> kvs) throws Exception {
- Preconditions.checkNotNull(
- service, "ExecutorService must be provided if writeFiles is needed");
RecordWriter<KeyValue> writer = createMergeTreeWriter(partition, bucket);
for (Tuple2<Long, Long> tuple2 : kvs) {
writer.write(
@@ -194,7 +188,7 @@ public class TestChangelogDataReadWrite {
null, // not used, we only create an empty writer
options,
EXTRACTOR)
- .createEmptyWriterContainer(partition, bucket, service)
+ .createWriterContainer(partition, bucket, true)
.writer;
((MemoryOwner) writer)
.setMemoryPool(