You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/09 07:50:55 UTC
[flink-table-store] branch master updated: [FLINK-26441] Add a new OVERWRITE snapshot commit kind
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new a3014f5 [FLINK-26441] Add a new OVERWRITE snapshot commit kind
a3014f5 is described below
commit a3014f57c901767bd8ff5fbbc28f782112130499
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Mar 9 15:50:16 2022 +0800
[FLINK-26441] Add a new OVERWRITE snapshot commit kind
This closes #37
---
.../org/apache/flink/table/store/file/Snapshot.java | 5 ++++-
.../table/store/file/operation/FileStoreCommitImpl.java | 17 +++++++----------
.../table/store/file/operation/FileStoreCommitTest.java | 15 ++++++++++-----
3 files changed, 21 insertions(+), 16 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index 6bb343b..6fc84ce 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -163,6 +163,9 @@ public class Snapshot {
APPEND,
/** Changes by compacting existing sst files. */
- COMPACT
+ COMPACT,
+
+ /** Changes that clear up the whole partition and then add new records. */
+ OVERWRITE
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 9d0212f..d39f05a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -190,11 +190,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
}
// overwrite new files
- tryOverwrite(
- partitionFilter,
- appendChanges,
- committable.identifier(),
- Snapshot.CommitKind.APPEND);
+ tryOverwrite(partitionFilter, appendChanges, committable.identifier());
List<ManifestEntry> compactChanges = new ArrayList<>();
compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
@@ -218,10 +214,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
private void tryOverwrite(
- Predicate partitionFilter,
- List<ManifestEntry> changes,
- String identifier,
- Snapshot.CommitKind commitKind) {
+ Predicate partitionFilter, List<ManifestEntry> changes, String identifier) {
while (true) {
Long latestSnapshotId = pathFactory.latestSnapshotId();
@@ -245,7 +238,11 @@ public class FileStoreCommitImpl implements FileStoreCommit {
changesWithOverwrite.addAll(changes);
if (tryCommitOnce(
- changesWithOverwrite, identifier, commitKind, latestSnapshotId, false)) {
+ changesWithOverwrite,
+ identifier,
+ Snapshot.CommitKind.OVERWRITE,
+ latestSnapshotId,
+ false)) {
break;
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index c558ed4..21d0a63 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.ValueKind;
@@ -176,11 +177,15 @@ public class FileStoreCommitTest {
.flatMap(Collection::stream)
.collect(Collectors.toList()),
"data2");
- store.overwriteData(
- data2.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
- gen::getPartition,
- kv -> 0,
- partitionToOverwrite);
+ List<Snapshot> overwriteSnapshots =
+ store.overwriteData(
+ data2.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ gen::getPartition,
+ kv -> 0,
+ partitionToOverwrite);
+ assertThat(overwriteSnapshots.get(0).commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
List<KeyValue> expectedKvs = new ArrayList<>();
for (Map.Entry<BinaryRowData, List<KeyValue>> entry : data1.entrySet()) {