You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/11/08 06:59:29 UTC

[flink-table-store] branch release-0.2 updated: [FLINK-29805] Fix incorrect snapshot filter when snapshots are committing too slow

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new d41d14a5 [FLINK-29805] Fix incorrect snapshot filter when snapshots are committing too slow
d41d14a5 is described below

commit d41d14a5921aa27ebd903eed45365841093e97ed
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Nov 8 14:59:24 2022 +0800

    [FLINK-29805] Fix incorrect snapshot filter when snapshots are committing too slow
    
    This closes #360.
---
 .../store/file/operation/FileStoreCommitImpl.java  | 40 +++++++---------------
 .../store/file/operation/FileStoreExpireImpl.java  |  8 +----
 .../table/store/file/utils/SnapshotManager.java    | 34 ++++++++++++++++--
 .../flink/table/store/file/TestFileStore.java      |  4 +--
 .../store/file/operation/FileStoreCommitTest.java  | 34 +++++++++++++++---
 .../store/file/operation/FileStoreExpireTest.java  |  4 +--
 6 files changed, 79 insertions(+), 45 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 8103b570..51ca590e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -50,10 +50,10 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
@@ -149,36 +149,20 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             return committableList;
         }
 
-        // if there is no previous snapshots then nothing should be filtered
-        Long latestSnapshotId = snapshotManager.latestSnapshotId();
-        if (latestSnapshotId == null) {
-            return committableList;
-        }
-
-        // check if a committable is already committed by its identifier
-        Map<Long, ManifestCommittable> identifiers = new LinkedHashMap<>();
-        for (ManifestCommittable committable : committableList) {
-            identifiers.put(committable.identifier(), committable);
-        }
-
-        for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
-            if (!snapshotManager.snapshotExists(id)) {
-                // snapshots before this are expired
-                break;
-            }
-            Snapshot snapshot = snapshotManager.snapshot(id);
-            if (commitUser.equals(snapshot.commitUser())) {
-                if (identifiers.containsKey(snapshot.commitIdentifier())) {
-                    identifiers.remove(snapshot.commitIdentifier());
-                } else {
-                    // early exit, because committableList must be the latest commits by this
-                    // commit user
-                    break;
+        Optional<Snapshot> latestSnapshot = snapshotManager.latestSnapshotOfUser(commitUser);
+        if (latestSnapshot.isPresent()) {
+            List<ManifestCommittable> result = new ArrayList<>();
+            for (ManifestCommittable committable : committableList) {
+                // if committable is newer than latest snapshot, then it hasn't been committed
+                if (committable.identifier() > latestSnapshot.get().commitIdentifier()) {
+                    result.add(committable);
                 }
             }
+            return result;
+        } else {
+            // if there is no previous snapshots then nothing should be filtered
+            return committableList;
         }
-
-        return new ArrayList<>(identifiers.values());
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
index 8cb11210..7901fc91 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -100,12 +99,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
 
         long currentMillis = System.currentTimeMillis();
 
-        Long earliest;
-        try {
-            earliest = snapshotManager.findEarliest();
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to find earliest snapshot id", e);
-        }
+        Long earliest = snapshotManager.earliestSnapshotId();
         if (earliest == null) {
             return;
         }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
index 86c0e22c..93670b42 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.utils;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.function.BinaryOperator;
 
@@ -79,7 +81,35 @@ public class SnapshotManager {
         }
     }
 
-    public Long findLatest() throws IOException {
+    public @Nullable Long earliestSnapshotId() {
+        try {
+            return findEarliest();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to find earliest snapshot id", e);
+        }
+    }
+
+    public Optional<Snapshot> latestSnapshotOfUser(String user) {
+        Long latestId = latestSnapshotId();
+        if (latestId == null) {
+            return Optional.empty();
+        }
+
+        long earliestId =
+                Preconditions.checkNotNull(
+                        earliestSnapshotId(),
+                        "Latest snapshot id is not null, but earliest snapshot id is null. "
+                                + "This is unexpected.");
+        for (long id = latestId; id >= earliestId; id--) {
+            Snapshot snapshot = snapshot(id);
+            if (user.equals(snapshot.commitUser())) {
+                return Optional.of(snapshot);
+            }
+        }
+        return Optional.empty();
+    }
+
+    private @Nullable Long findLatest() throws IOException {
         Path snapshotDir = snapshotDirectory();
         FileSystem fs = snapshotDir.getFileSystem();
         if (!fs.exists(snapshotDir)) {
@@ -98,7 +128,7 @@ public class SnapshotManager {
         return findByListFiles(Math::max);
     }
 
-    public Long findEarliest() throws IOException {
+    private @Nullable Long findEarliest() throws IOException {
         Path snapshotDir = snapshotDirectory();
         FileSystem fs = snapshotDir.getFileSystem();
         if (!fs.exists(snapshotDir)) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 54742c0b..8c8a7242 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -376,12 +376,12 @@ public class TestFileStore extends KeyValueFileStore {
         if (actualFiles.remove(earliest)) {
             long earliestId = snapshotManager.readHint(SnapshotManager.EARLIEST);
             earliest.getFileSystem().delete(earliest, false);
-            assertThat(earliestId <= snapshotManager.findEarliest()).isTrue();
+            assertThat(earliestId <= snapshotManager.earliestSnapshotId()).isTrue();
         }
         if (actualFiles.remove(latest)) {
             long latestId = snapshotManager.readHint(SnapshotManager.LATEST);
             latest.getFileSystem().delete(latest, false);
-            assertThat(latestId <= snapshotManager.findLatest()).isTrue();
+            assertThat(latestId <= snapshotManager.latestSnapshotId()).isTrue();
         }
         actualFiles.remove(latest);
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index a6970aa6..12647b2e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -46,6 +46,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -100,12 +101,12 @@ public class FileStoreCommitTest {
 
         assertThat(latest.getFileSystem().exists(latest)).isTrue();
 
-        Long latestId = snapshotManager.findLatest();
+        Long latestId = snapshotManager.latestSnapshotId();
 
         // remove latest hint file
         latest.getFileSystem().delete(latest, false);
 
-        assertThat(snapshotManager.findLatest()).isEqualTo(latestId);
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(latestId);
     }
 
     @Test
@@ -121,6 +122,31 @@ public class FileStoreCommitTest {
                 .filterCommitted(Collections.singletonList(new ManifestCommittable(999)));
     }
 
+    @Test
+    public void testFilterAllCommits() throws Exception {
+        testRandomConcurrentNoConflict(1, false);
+        TestFileStore store = createStore(false);
+        SnapshotManager snapshotManager = store.snapshotManager();
+        long latestSnapshotId = snapshotManager.latestSnapshotId();
+
+        LinkedHashSet<Long> commitIdentifiers = new LinkedHashSet<>();
+        String user = "";
+        for (long id = Snapshot.FIRST_SNAPSHOT_ID; id <= latestSnapshotId; id++) {
+            Snapshot snapshot = snapshotManager.snapshot(id);
+            commitIdentifiers.add(snapshot.commitIdentifier());
+            user = snapshot.commitUser();
+        }
+
+        // all commit identifiers should be filtered out
+        List<ManifestCommittable> remaining =
+                store.newCommit(user)
+                        .filterCommitted(
+                                commitIdentifiers.stream()
+                                        .map(ManifestCommittable::new)
+                                        .collect(Collectors.toList()));
+        assertThat(remaining).isEmpty();
+    }
+
     protected void testRandomConcurrentNoConflict(int numThreads, boolean failing)
             throws Exception {
         // prepare test data
@@ -328,7 +354,7 @@ public class FileStoreCommitTest {
                 false,
                 null,
                 (commit, committable) -> commit.commit(committable, Collections.emptyMap()));
-        assertThat(store.snapshotManager().findLatest()).isEqualTo(snapshot.id());
+        assertThat(store.snapshotManager().latestSnapshotId()).isEqualTo(snapshot.id());
 
         // commit empty new files
         store.commitDataImpl(
@@ -341,7 +367,7 @@ public class FileStoreCommitTest {
                     commit.withCreateEmptyCommit(true);
                     commit.commit(committable, Collections.emptyMap());
                 });
-        assertThat(store.snapshotManager().findLatest()).isEqualTo(snapshot.id() + 1);
+        assertThat(store.snapshotManager().latestSnapshotId()).isEqualTo(snapshot.id() + 1);
     }
 
     @Test
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index b9fac9fd..741b8ea1 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -206,12 +206,12 @@ public class FileStoreExpireTest {
 
         assertThat(earliest.getFileSystem().exists(earliest)).isTrue();
 
-        Long earliestId = snapshotManager.findEarliest();
+        Long earliestId = snapshotManager.earliestSnapshotId();
 
         // remove earliest hint file
         earliest.getFileSystem().delete(earliest, false);
 
-        assertThat(snapshotManager.findEarliest()).isEqualTo(earliestId);
+        assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(earliestId);
     }
 
     @Test