You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/10 09:27:53 UTC
[flink-table-store] branch master updated: [FLINK-26293] Store log offsets in snapshot
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new e6037c9 [FLINK-26293] Store log offsets in snapshot
e6037c9 is described below
commit e6037c95a0e7fdc8724c5536d2d71951be4ce030
Author: SteNicholas <pr...@163.com>
AuthorDate: Thu Mar 10 17:27:48 2022 +0800
[FLINK-26293] Store log offsets in snapshot
This closes #31
---
.../apache/flink/table/store/file/Snapshot.java | 14 ++++-
.../store/file/operation/FileStoreCommitImpl.java | 41 ++++++++++++---
.../flink/table/store/file/TestFileStore.java | 6 +++
.../store/file/operation/FileStoreCommitTest.java | 61 ++++++++++++++++++++++
4 files changed, 114 insertions(+), 8 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index 6fc84ce..6d5c5d5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -32,6 +32,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/** This file is the entrance to all data committed at some specific time point. */
public class Snapshot {
@@ -45,6 +46,7 @@ public class Snapshot {
private static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
private static final String FIELD_COMMIT_KIND = "commitKind";
private static final String FIELD_TIME_MILLIS = "timeMillis";
+ private static final String FIELD_LOG_OFFSETS = "logOffsets";
@JsonProperty(FIELD_ID)
private final long id;
@@ -71,6 +73,9 @@ public class Snapshot {
@JsonProperty(FIELD_TIME_MILLIS)
private final long timeMillis;
+ @JsonProperty(FIELD_LOG_OFFSETS)
+ private final Map<Integer, Long> logOffsets;
+
@JsonCreator
public Snapshot(
@JsonProperty(FIELD_ID) long id,
@@ -79,7 +84,8 @@ public class Snapshot {
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
@JsonProperty(FIELD_COMMIT_IDENTIFIER) String commitIdentifier,
@JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
- @JsonProperty(FIELD_TIME_MILLIS) long timeMillis) {
+ @JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
+ @JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets) {
this.id = id;
this.baseManifestList = baseManifestList;
this.deltaManifestList = deltaManifestList;
@@ -87,6 +93,7 @@ public class Snapshot {
this.commitIdentifier = commitIdentifier;
this.commitKind = commitKind;
this.timeMillis = timeMillis;
+ this.logOffsets = logOffsets;
}
@JsonGetter(FIELD_ID)
@@ -124,6 +131,11 @@ public class Snapshot {
return timeMillis;
}
+ @JsonGetter(FIELD_LOG_OFFSETS)
+ public Map<Integer, Long> getLogOffsets() {
+ return logOffsets;
+ }
+
public String toJson() {
try {
return new ObjectMapper().writeValueAsString(this);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index d39f05a..9673d02 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -151,13 +151,23 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
- tryCommit(appendChanges, committable.identifier(), Snapshot.CommitKind.APPEND, false);
+ tryCommit(
+ appendChanges,
+ committable.identifier(),
+ committable.logOffsets(),
+ Snapshot.CommitKind.APPEND,
+ false);
List<ManifestEntry> compactChanges = new ArrayList<>();
compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
if (!compactChanges.isEmpty()) {
- tryCommit(compactChanges, committable.identifier(), Snapshot.CommitKind.COMPACT, true);
+ tryCommit(
+ compactChanges,
+ committable.identifier(),
+ committable.logOffsets(),
+ Snapshot.CommitKind.COMPACT,
+ true);
}
}
@@ -190,31 +200,42 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
}
// overwrite new files
- tryOverwrite(partitionFilter, appendChanges, committable.identifier());
+ tryOverwrite(
+ partitionFilter, appendChanges, committable.identifier(), committable.logOffsets());
List<ManifestEntry> compactChanges = new ArrayList<>();
compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
if (!compactChanges.isEmpty()) {
- tryCommit(compactChanges, committable.identifier(), Snapshot.CommitKind.COMPACT, true);
+ tryCommit(
+ compactChanges,
+ committable.identifier(),
+ committable.logOffsets(),
+ Snapshot.CommitKind.COMPACT,
+ true);
}
}
private void tryCommit(
List<ManifestEntry> changes,
String hash,
+ Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
boolean checkDeletedFiles) {
while (true) {
Long latestSnapshotId = pathFactory.latestSnapshotId();
- if (tryCommitOnce(changes, hash, commitKind, latestSnapshotId, checkDeletedFiles)) {
+ if (tryCommitOnce(
+ changes, hash, logOffsets, commitKind, latestSnapshotId, checkDeletedFiles)) {
break;
}
}
}
private void tryOverwrite(
- Predicate partitionFilter, List<ManifestEntry> changes, String identifier) {
+ Predicate partitionFilter,
+ List<ManifestEntry> changes,
+ String identifier,
+ Map<Integer, Long> logOffsets) {
while (true) {
Long latestSnapshotId = pathFactory.latestSnapshotId();
@@ -240,6 +261,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
if (tryCommitOnce(
changesWithOverwrite,
identifier,
+ logOffsets,
Snapshot.CommitKind.OVERWRITE,
latestSnapshotId,
false)) {
@@ -274,6 +296,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private boolean tryCommitOnce(
List<ManifestEntry> changes,
String identifier,
+ Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
Long latestSnapshotId,
boolean checkDeletedFiles) {
@@ -306,6 +329,9 @@ public class FileStoreCommitImpl implements FileStoreCommit {
if (latestSnapshot != null) {
// read all previous manifest files
oldMetas.addAll(latestSnapshot.readAllManifests(manifestList));
+ // read the last snapshot to complete the bucket's offsets when logOffsets does not
+ // contain all buckets
+ latestSnapshot.getLogOffsets().forEach(logOffsets::putIfAbsent);
}
// merge manifest files with changes
newMetas.addAll(
@@ -330,7 +356,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {
commitUser,
identifier,
commitKind,
- System.currentTimeMillis());
+ System.currentTimeMillis(),
+ logOffsets);
FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
} catch (Throwable e) {
// fails when preparing for commit, we should clean up
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 58adb76..f10642b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -60,6 +60,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -75,6 +76,8 @@ public class TestFileStore extends FileStoreImpl {
private final RowDataSerializer keySerializer;
private final RowDataSerializer valueSerializer;
+ private static final AtomicInteger ID = new AtomicInteger();
+
public static TestFileStore create(
String format,
String root,
@@ -188,6 +191,9 @@ public class TestFileStore extends FileStoreImpl {
Increment increment = entryWithBucket.getValue().prepareCommit();
committable.addFileCommittable(
entryWithPartition.getKey(), entryWithBucket.getKey(), increment);
+ if (!committable.logOffsets().containsKey(entryWithBucket.getKey())) {
+ committable.addLogOffset(entryWithBucket.getKey(), ID.getAndIncrement());
+ }
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 21d0a63..7224d19 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -208,6 +208,67 @@ public class FileStoreCommitTest {
assertThat(actual).isEqualTo(expected);
}
+ @Test
+ public void testSnapshotAddLogOffset() throws Exception {
+ Map<BinaryRowData, List<KeyValue>> data1 =
+ generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
+ logData(
+ () ->
+ data1.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ "data1");
+
+ TestFileStore store = createStore(false);
+ List<Snapshot> commitSnapshots =
+ store.commitData(
+ data1.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ gen::getPartition,
+ kv -> Math.toIntExact(kv.sequenceNumber() % 10));
+
+ Map<Integer, Long> commitLogOffsets = commitSnapshots.get(0).getLogOffsets();
+ assertThat(commitLogOffsets.size()).isEqualTo(10);
+ commitLogOffsets.forEach((key, value) -> assertThat(key).isEqualTo(value.intValue()));
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ String dtToOverwrite =
+ new ArrayList<>(data1.keySet())
+ .get(random.nextInt(data1.size()))
+ .getString(0)
+ .toString();
+ Map<String, String> partitionToOverwrite = new HashMap<>();
+ partitionToOverwrite.put("dt", dtToOverwrite);
+
+ // overwrite partial commit
+ int numRecords = ThreadLocalRandom.current().nextInt(5) + 1;
+ Map<BinaryRowData, List<KeyValue>> data2 = generateData(numRecords);
+ data2.entrySet().removeIf(e -> !dtToOverwrite.equals(e.getKey().getString(0).toString()));
+ logData(
+ () ->
+ data2.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ "data2");
+ List<Snapshot> overwriteSnapshots =
+ store.overwriteData(
+ data2.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ gen::getPartition,
+ kv -> Math.toIntExact(kv.sequenceNumber() % 10),
+ partitionToOverwrite);
+
+ Map<Integer, Long> overwriteLogOffsets = overwriteSnapshots.get(0).getLogOffsets();
+ assertThat(overwriteLogOffsets.size()).isEqualTo(commitLogOffsets.size());
+ assertThat(
+ overwriteLogOffsets.entrySet().stream()
+ .filter(o -> !o.getKey().equals(o.getValue().intValue()))
+ .count())
+ .isLessThanOrEqualTo(numRecords);
+ }
+
private TestFileStore createStore(boolean failing) {
String root =
failing