You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/10 09:27:53 UTC

[flink-table-store] branch master updated: [FLINK-26293] Store log offsets in snapshot

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new e6037c9  [FLINK-26293] Store log offsets in snapshot
e6037c9 is described below

commit e6037c95a0e7fdc8724c5536d2d71951be4ce030
Author: SteNicholas <pr...@163.com>
AuthorDate: Thu Mar 10 17:27:48 2022 +0800

    [FLINK-26293] Store log offsets in snapshot
    
    This closes #31
---
 .../apache/flink/table/store/file/Snapshot.java    | 14 ++++-
 .../store/file/operation/FileStoreCommitImpl.java  | 41 ++++++++++++---
 .../flink/table/store/file/TestFileStore.java      |  6 +++
 .../store/file/operation/FileStoreCommitTest.java  | 61 ++++++++++++++++++++++
 4 files changed, 114 insertions(+), 8 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index 6fc84ce..6d5c5d5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -32,6 +32,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /** This file is the entrance to all data committed at some specific time point. */
 public class Snapshot {
@@ -45,6 +46,7 @@ public class Snapshot {
     private static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
     private static final String FIELD_COMMIT_KIND = "commitKind";
     private static final String FIELD_TIME_MILLIS = "timeMillis";
+    private static final String FIELD_LOG_OFFSETS = "logOffsets";
 
     @JsonProperty(FIELD_ID)
     private final long id;
@@ -71,6 +73,9 @@ public class Snapshot {
     @JsonProperty(FIELD_TIME_MILLIS)
     private final long timeMillis;
 
+    @JsonProperty(FIELD_LOG_OFFSETS)
+    private final Map<Integer, Long> logOffsets;
+
     @JsonCreator
     public Snapshot(
             @JsonProperty(FIELD_ID) long id,
@@ -79,7 +84,8 @@ public class Snapshot {
             @JsonProperty(FIELD_COMMIT_USER) String commitUser,
             @JsonProperty(FIELD_COMMIT_IDENTIFIER) String commitIdentifier,
             @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
-            @JsonProperty(FIELD_TIME_MILLIS) long timeMillis) {
+            @JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
+            @JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets) {
         this.id = id;
         this.baseManifestList = baseManifestList;
         this.deltaManifestList = deltaManifestList;
@@ -87,6 +93,7 @@ public class Snapshot {
         this.commitIdentifier = commitIdentifier;
         this.commitKind = commitKind;
         this.timeMillis = timeMillis;
+        this.logOffsets = logOffsets;
     }
 
     @JsonGetter(FIELD_ID)
@@ -124,6 +131,11 @@ public class Snapshot {
         return timeMillis;
     }
 
+    @JsonGetter(FIELD_LOG_OFFSETS)
+    public Map<Integer, Long> getLogOffsets() {
+        return logOffsets;
+    }
+
     public String toJson() {
         try {
             return new ObjectMapper().writeValueAsString(this);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index d39f05a..9673d02 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -151,13 +151,23 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         }
 
         List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
-        tryCommit(appendChanges, committable.identifier(), Snapshot.CommitKind.APPEND, false);
+        tryCommit(
+                appendChanges,
+                committable.identifier(),
+                committable.logOffsets(),
+                Snapshot.CommitKind.APPEND,
+                false);
 
         List<ManifestEntry> compactChanges = new ArrayList<>();
         compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
         compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
         if (!compactChanges.isEmpty()) {
-            tryCommit(compactChanges, committable.identifier(), Snapshot.CommitKind.COMPACT, true);
+            tryCommit(
+                    compactChanges,
+                    committable.identifier(),
+                    committable.logOffsets(),
+                    Snapshot.CommitKind.COMPACT,
+                    true);
         }
     }
 
@@ -190,31 +200,42 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             }
         }
         // overwrite new files
-        tryOverwrite(partitionFilter, appendChanges, committable.identifier());
+        tryOverwrite(
+                partitionFilter, appendChanges, committable.identifier(), committable.logOffsets());
 
         List<ManifestEntry> compactChanges = new ArrayList<>();
         compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
         compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
         if (!compactChanges.isEmpty()) {
-            tryCommit(compactChanges, committable.identifier(), Snapshot.CommitKind.COMPACT, true);
+            tryCommit(
+                    compactChanges,
+                    committable.identifier(),
+                    committable.logOffsets(),
+                    Snapshot.CommitKind.COMPACT,
+                    true);
         }
     }
 
     private void tryCommit(
             List<ManifestEntry> changes,
             String hash,
+            Map<Integer, Long> logOffsets,
             Snapshot.CommitKind commitKind,
             boolean checkDeletedFiles) {
         while (true) {
             Long latestSnapshotId = pathFactory.latestSnapshotId();
-            if (tryCommitOnce(changes, hash, commitKind, latestSnapshotId, checkDeletedFiles)) {
+            if (tryCommitOnce(
+                    changes, hash, logOffsets, commitKind, latestSnapshotId, checkDeletedFiles)) {
                 break;
             }
         }
     }
 
     private void tryOverwrite(
-            Predicate partitionFilter, List<ManifestEntry> changes, String identifier) {
+            Predicate partitionFilter,
+            List<ManifestEntry> changes,
+            String identifier,
+            Map<Integer, Long> logOffsets) {
         while (true) {
             Long latestSnapshotId = pathFactory.latestSnapshotId();
 
@@ -240,6 +261,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             if (tryCommitOnce(
                     changesWithOverwrite,
                     identifier,
+                    logOffsets,
                     Snapshot.CommitKind.OVERWRITE,
                     latestSnapshotId,
                     false)) {
@@ -274,6 +296,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
     private boolean tryCommitOnce(
             List<ManifestEntry> changes,
             String identifier,
+            Map<Integer, Long> logOffsets,
             Snapshot.CommitKind commitKind,
             Long latestSnapshotId,
             boolean checkDeletedFiles) {
@@ -306,6 +329,9 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             if (latestSnapshot != null) {
                 // read all previous manifest files
                 oldMetas.addAll(latestSnapshot.readAllManifests(manifestList));
+                // read the last snapshot to complete the bucket's offsets when logOffsets does not
+                // contain all buckets
+                latestSnapshot.getLogOffsets().forEach(logOffsets::putIfAbsent);
             }
             // merge manifest files with changes
             newMetas.addAll(
@@ -330,7 +356,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                             commitUser,
                             identifier,
                             commitKind,
-                            System.currentTimeMillis());
+                            System.currentTimeMillis(),
+                            logOffsets);
             FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
         } catch (Throwable e) {
             // fails when preparing for commit, we should clean up
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 58adb76..f10642b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -60,6 +60,7 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -75,6 +76,8 @@ public class TestFileStore extends FileStoreImpl {
     private final RowDataSerializer keySerializer;
     private final RowDataSerializer valueSerializer;
 
+    private static final AtomicInteger ID = new AtomicInteger();
+
     public static TestFileStore create(
             String format,
             String root,
@@ -188,6 +191,9 @@ public class TestFileStore extends FileStoreImpl {
                 Increment increment = entryWithBucket.getValue().prepareCommit();
                 committable.addFileCommittable(
                         entryWithPartition.getKey(), entryWithBucket.getKey(), increment);
+                if (!committable.logOffsets().containsKey(entryWithBucket.getKey())) {
+                    committable.addLogOffset(entryWithBucket.getKey(), ID.getAndIncrement());
+                }
             }
         }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 21d0a63..7224d19 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -208,6 +208,67 @@ public class FileStoreCommitTest {
         assertThat(actual).isEqualTo(expected);
     }
 
+    @Test
+    public void testSnapshotAddLogOffset() throws Exception {
+        Map<BinaryRowData, List<KeyValue>> data1 =
+                generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
+        logData(
+                () ->
+                        data1.values().stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList()),
+                "data1");
+
+        TestFileStore store = createStore(false);
+        List<Snapshot> commitSnapshots =
+                store.commitData(
+                        data1.values().stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList()),
+                        gen::getPartition,
+                        kv -> Math.toIntExact(kv.sequenceNumber() % 10));
+
+        Map<Integer, Long> commitLogOffsets = commitSnapshots.get(0).getLogOffsets();
+        assertThat(commitLogOffsets.size()).isEqualTo(10);
+        commitLogOffsets.forEach((key, value) -> assertThat(key).isEqualTo(value.intValue()));
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        String dtToOverwrite =
+                new ArrayList<>(data1.keySet())
+                        .get(random.nextInt(data1.size()))
+                        .getString(0)
+                        .toString();
+        Map<String, String> partitionToOverwrite = new HashMap<>();
+        partitionToOverwrite.put("dt", dtToOverwrite);
+
+        // overwrite partial commit
+        int numRecords = ThreadLocalRandom.current().nextInt(5) + 1;
+        Map<BinaryRowData, List<KeyValue>> data2 = generateData(numRecords);
+        data2.entrySet().removeIf(e -> !dtToOverwrite.equals(e.getKey().getString(0).toString()));
+        logData(
+                () ->
+                        data2.values().stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList()),
+                "data2");
+        List<Snapshot> overwriteSnapshots =
+                store.overwriteData(
+                        data2.values().stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList()),
+                        gen::getPartition,
+                        kv -> Math.toIntExact(kv.sequenceNumber() % 10),
+                        partitionToOverwrite);
+
+        Map<Integer, Long> overwriteLogOffsets = overwriteSnapshots.get(0).getLogOffsets();
+        assertThat(overwriteLogOffsets.size()).isEqualTo(commitLogOffsets.size());
+        assertThat(
+                        overwriteLogOffsets.entrySet().stream()
+                                .filter(o -> !o.getKey().equals(o.getValue().intValue()))
+                                .count())
+                .isLessThanOrEqualTo(numRecords);
+    }
+
     private TestFileStore createStore(boolean failing) {
         String root =
                 failing