You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/09 07:50:55 UTC

[flink-table-store] branch master updated: [FLINK-26441] Add a new OVERWRITE snapshot commit kind

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new a3014f5  [FLINK-26441] Add a new OVERWRITE snapshot commit kind
a3014f5 is described below

commit a3014f57c901767bd8ff5fbbc28f782112130499
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Mar 9 15:50:16 2022 +0800

    [FLINK-26441] Add a new OVERWRITE snapshot commit kind
    
    This closes #37
---
 .../org/apache/flink/table/store/file/Snapshot.java     |  5 ++++-
 .../table/store/file/operation/FileStoreCommitImpl.java | 17 +++++++----------
 .../table/store/file/operation/FileStoreCommitTest.java | 15 ++++++++++-----
 3 files changed, 21 insertions(+), 16 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index 6bb343b..6fc84ce 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -163,6 +163,9 @@ public class Snapshot {
         APPEND,
 
         /** Changes by compacting existing sst files. */
-        COMPACT
+        COMPACT,
+
+        /** Changes that clear up the whole partition and then add new records. */
+        OVERWRITE
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 9d0212f..d39f05a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -190,11 +190,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             }
         }
         // overwrite new files
-        tryOverwrite(
-                partitionFilter,
-                appendChanges,
-                committable.identifier(),
-                Snapshot.CommitKind.APPEND);
+        tryOverwrite(partitionFilter, appendChanges, committable.identifier());
 
         List<ManifestEntry> compactChanges = new ArrayList<>();
         compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
@@ -218,10 +214,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
     }
 
     private void tryOverwrite(
-            Predicate partitionFilter,
-            List<ManifestEntry> changes,
-            String identifier,
-            Snapshot.CommitKind commitKind) {
+            Predicate partitionFilter, List<ManifestEntry> changes, String identifier) {
         while (true) {
             Long latestSnapshotId = pathFactory.latestSnapshotId();
 
@@ -245,7 +238,11 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             changesWithOverwrite.addAll(changes);
 
             if (tryCommitOnce(
-                    changesWithOverwrite, identifier, commitKind, latestSnapshotId, false)) {
+                    changesWithOverwrite,
+                    identifier,
+                    Snapshot.CommitKind.OVERWRITE,
+                    latestSnapshotId,
+                    false)) {
                 break;
             }
         }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index c558ed4..21d0a63 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.operation;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.ValueKind;
@@ -176,11 +177,15 @@ public class FileStoreCommitTest {
                                 .flatMap(Collection::stream)
                                 .collect(Collectors.toList()),
                 "data2");
-        store.overwriteData(
-                data2.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
-                gen::getPartition,
-                kv -> 0,
-                partitionToOverwrite);
+        List<Snapshot> overwriteSnapshots =
+                store.overwriteData(
+                        data2.values().stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList()),
+                        gen::getPartition,
+                        kv -> 0,
+                        partitionToOverwrite);
+        assertThat(overwriteSnapshots.get(0).commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
 
         List<KeyValue> expectedKvs = new ArrayList<>();
         for (Map.Entry<BinaryRowData, List<KeyValue>> entry : data1.entrySet()) {