You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/11/08 06:59:29 UTC
[flink-table-store] branch release-0.2 updated: [FLINK-29805] Fix incorrect snapshot filter when snapshots are committing too slow
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new d41d14a5 [FLINK-29805] Fix incorrect snapshot filter when snapshots are committing too slow
d41d14a5 is described below
commit d41d14a5921aa27ebd903eed45365841093e97ed
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Nov 8 14:59:24 2022 +0800
[FLINK-29805] Fix incorrect snapshot filter when snapshots are committing too slow
This closes #360.
---
.../store/file/operation/FileStoreCommitImpl.java | 40 +++++++---------------
.../store/file/operation/FileStoreExpireImpl.java | 8 +----
.../table/store/file/utils/SnapshotManager.java | 34 ++++++++++++++++--
.../flink/table/store/file/TestFileStore.java | 4 +--
.../store/file/operation/FileStoreCommitTest.java | 34 +++++++++++++++---
.../store/file/operation/FileStoreExpireTest.java | 4 +--
6 files changed, 79 insertions(+), 45 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 8103b570..51ca590e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -50,10 +50,10 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
@@ -149,36 +149,20 @@ public class FileStoreCommitImpl implements FileStoreCommit {
return committableList;
}
- // if there is no previous snapshots then nothing should be filtered
- Long latestSnapshotId = snapshotManager.latestSnapshotId();
- if (latestSnapshotId == null) {
- return committableList;
- }
-
- // check if a committable is already committed by its identifier
- Map<Long, ManifestCommittable> identifiers = new LinkedHashMap<>();
- for (ManifestCommittable committable : committableList) {
- identifiers.put(committable.identifier(), committable);
- }
-
- for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
- if (!snapshotManager.snapshotExists(id)) {
- // snapshots before this are expired
- break;
- }
- Snapshot snapshot = snapshotManager.snapshot(id);
- if (commitUser.equals(snapshot.commitUser())) {
- if (identifiers.containsKey(snapshot.commitIdentifier())) {
- identifiers.remove(snapshot.commitIdentifier());
- } else {
- // early exit, because committableList must be the latest commits by this
- // commit user
- break;
+ Optional<Snapshot> latestSnapshot = snapshotManager.latestSnapshotOfUser(commitUser);
+ if (latestSnapshot.isPresent()) {
+ List<ManifestCommittable> result = new ArrayList<>();
+ for (ManifestCommittable committable : committableList) {
+ // if committable is newer than latest snapshot, then it hasn't been committed
+ if (committable.identifier() > latestSnapshot.get().commitIdentifier()) {
+ result.add(committable);
}
}
+ return result;
+ } else {
+ // if there is no previous snapshots then nothing should be filtered
+ return committableList;
}
-
- return new ArrayList<>(identifiers.values());
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
index 8cb11210..7901fc91 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -100,12 +99,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
long currentMillis = System.currentTimeMillis();
- Long earliest;
- try {
- earliest = snapshotManager.findEarliest();
- } catch (IOException e) {
- throw new RuntimeException("Failed to find earliest snapshot id", e);
- }
+ Long earliest = snapshotManager.earliestSnapshotId();
if (earliest == null) {
return;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
index 86c0e22c..93670b42 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.utils;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Optional;
import java.util.UUID;
import java.util.function.BinaryOperator;
@@ -79,7 +81,35 @@ public class SnapshotManager {
}
}
- public Long findLatest() throws IOException {
+ public @Nullable Long earliestSnapshotId() {
+ try {
+ return findEarliest();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to find earliest snapshot id", e);
+ }
+ }
+
+ public Optional<Snapshot> latestSnapshotOfUser(String user) {
+ Long latestId = latestSnapshotId();
+ if (latestId == null) {
+ return Optional.empty();
+ }
+
+ long earliestId =
+ Preconditions.checkNotNull(
+ earliestSnapshotId(),
+ "Latest snapshot id is not null, but earliest snapshot id is null. "
+ + "This is unexpected.");
+ for (long id = latestId; id >= earliestId; id--) {
+ Snapshot snapshot = snapshot(id);
+ if (user.equals(snapshot.commitUser())) {
+ return Optional.of(snapshot);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private @Nullable Long findLatest() throws IOException {
Path snapshotDir = snapshotDirectory();
FileSystem fs = snapshotDir.getFileSystem();
if (!fs.exists(snapshotDir)) {
@@ -98,7 +128,7 @@ public class SnapshotManager {
return findByListFiles(Math::max);
}
- public Long findEarliest() throws IOException {
+ private @Nullable Long findEarliest() throws IOException {
Path snapshotDir = snapshotDirectory();
FileSystem fs = snapshotDir.getFileSystem();
if (!fs.exists(snapshotDir)) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 54742c0b..8c8a7242 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -376,12 +376,12 @@ public class TestFileStore extends KeyValueFileStore {
if (actualFiles.remove(earliest)) {
long earliestId = snapshotManager.readHint(SnapshotManager.EARLIEST);
earliest.getFileSystem().delete(earliest, false);
- assertThat(earliestId <= snapshotManager.findEarliest()).isTrue();
+ assertThat(earliestId <= snapshotManager.earliestSnapshotId()).isTrue();
}
if (actualFiles.remove(latest)) {
long latestId = snapshotManager.readHint(SnapshotManager.LATEST);
latest.getFileSystem().delete(latest, false);
- assertThat(latestId <= snapshotManager.findLatest()).isTrue();
+ assertThat(latestId <= snapshotManager.latestSnapshotId()).isTrue();
}
actualFiles.remove(latest);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index a6970aa6..12647b2e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -46,6 +46,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -100,12 +101,12 @@ public class FileStoreCommitTest {
assertThat(latest.getFileSystem().exists(latest)).isTrue();
- Long latestId = snapshotManager.findLatest();
+ Long latestId = snapshotManager.latestSnapshotId();
// remove latest hint file
latest.getFileSystem().delete(latest, false);
- assertThat(snapshotManager.findLatest()).isEqualTo(latestId);
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(latestId);
}
@Test
@@ -121,6 +122,31 @@ public class FileStoreCommitTest {
.filterCommitted(Collections.singletonList(new ManifestCommittable(999)));
}
+ @Test
+ public void testFilterAllCommits() throws Exception {
+ testRandomConcurrentNoConflict(1, false);
+ TestFileStore store = createStore(false);
+ SnapshotManager snapshotManager = store.snapshotManager();
+ long latestSnapshotId = snapshotManager.latestSnapshotId();
+
+ LinkedHashSet<Long> commitIdentifiers = new LinkedHashSet<>();
+ String user = "";
+ for (long id = Snapshot.FIRST_SNAPSHOT_ID; id <= latestSnapshotId; id++) {
+ Snapshot snapshot = snapshotManager.snapshot(id);
+ commitIdentifiers.add(snapshot.commitIdentifier());
+ user = snapshot.commitUser();
+ }
+
+ // all commit identifiers should be filtered out
+ List<ManifestCommittable> remaining =
+ store.newCommit(user)
+ .filterCommitted(
+ commitIdentifiers.stream()
+ .map(ManifestCommittable::new)
+ .collect(Collectors.toList()));
+ assertThat(remaining).isEmpty();
+ }
+
protected void testRandomConcurrentNoConflict(int numThreads, boolean failing)
throws Exception {
// prepare test data
@@ -328,7 +354,7 @@ public class FileStoreCommitTest {
false,
null,
(commit, committable) -> commit.commit(committable, Collections.emptyMap()));
- assertThat(store.snapshotManager().findLatest()).isEqualTo(snapshot.id());
+ assertThat(store.snapshotManager().latestSnapshotId()).isEqualTo(snapshot.id());
// commit empty new files
store.commitDataImpl(
@@ -341,7 +367,7 @@ public class FileStoreCommitTest {
commit.withCreateEmptyCommit(true);
commit.commit(committable, Collections.emptyMap());
});
- assertThat(store.snapshotManager().findLatest()).isEqualTo(snapshot.id() + 1);
+ assertThat(store.snapshotManager().latestSnapshotId()).isEqualTo(snapshot.id() + 1);
}
@Test
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index b9fac9fd..741b8ea1 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -206,12 +206,12 @@ public class FileStoreExpireTest {
assertThat(earliest.getFileSystem().exists(earliest)).isTrue();
- Long earliestId = snapshotManager.findEarliest();
+ Long earliestId = snapshotManager.earliestSnapshotId();
// remove earliest hint file
earliest.getFileSystem().delete(earliest, false);
- assertThat(snapshotManager.findEarliest()).isEqualTo(earliestId);
+ assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(earliestId);
}
@Test