You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2024/01/15 02:12:03 UTC
(incubator-paimon) 19/20: [core] Fix creation failure caused by savepoint tag (#2515)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 0e609d70da0abef12bf2d89c217fed641704fd41
Author: monster <60...@users.noreply.github.com>
AuthorDate: Tue Dec 19 16:30:48 2023 +0800
[core] Fix creation failure caused by savepoint tag (#2515)
---
.../org/apache/paimon/tag/TagAutoCreation.java | 4 +--
.../java/org/apache/paimon/utils/TagManager.java | 26 ++++++++++++++----
.../org/apache/paimon/tag/TagAutoCreationTest.java | 31 ++++++++++++++++++++++
3 files changed, 54 insertions(+), 7 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index e02da25c1..c4e9f7948 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -67,7 +67,7 @@ public class TagAutoCreation {
this.periodHandler.validateDelay(delay);
- SortedMap<Snapshot, String> tags = tagManager.tags();
+ SortedMap<Snapshot, String> tags = tagManager.tags(t -> !t.startsWith("savepoint"));
if (tags.isEmpty()) {
this.nextSnapshot =
@@ -122,7 +122,7 @@ public class TagAutoCreation {
nextTag = periodHandler.nextTagTime(thisTag);
if (numRetainedMax != null) {
- SortedMap<Snapshot, String> tags = tagManager.tags();
+ SortedMap<Snapshot, String> tags = tagManager.tags(t -> !t.startsWith("savepoint"));
if (tags.size() > numRetainedMax) {
int toDelete = tags.size() - numRetainedMax;
int i = 0;
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 927dbd373..bd526ee2b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -179,6 +179,21 @@ public class TagManager {
/** Get all tagged snapshots with names sorted by snapshot id. */
public SortedMap<Snapshot, String> tags() {
+ return tags(tagName -> true);
+ }
+
+ /**
+ * Retrieves a sorted map of snapshots filtered based on a provided predicate. The predicate
+ * determines which tag names should be included in the result. Only snapshots with tag names
+ * that pass the predicate test are included.
+ *
+ * @param filter A Predicate that tests each tag name. Snapshots with tag names that fail the
+ * test are excluded from the result.
+ * @return A sorted map of filtered snapshots keyed by their IDs, each associated with its tag
+ * name.
+ * @throws RuntimeException if an IOException occurs during retrieval of snapshots.
+ */
+ public SortedMap<Snapshot, String> tags(Predicate<String> filter) {
TreeMap<Snapshot, String> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id));
try {
List<Path> paths =
@@ -187,14 +202,15 @@ public class TagManager {
.collect(Collectors.toList());
for (Path path : paths) {
+ String tagName = path.getName().substring(TAG_PREFIX.length());
+
+ if (!filter.test(tagName)) {
+ continue;
+ }
// If the tag file is not found, it might be deleted by
// other processes, so just skip this tag
Snapshot.safelyFromPath(fileIO, path)
- .ifPresent(
- snapshot ->
- tags.put(
- snapshot,
- path.getName().substring(TAG_PREFIX.length())));
+ .ifPresent(snapshot -> tags.put(snapshot, tagName));
}
} catch (IOException e) {
throw new RuntimeException(e);
diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
index 53dd6aed2..3b753093b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
@@ -215,6 +215,37 @@ public class TagAutoCreationTest extends PrimaryKeyTableTestBase {
assertThat(tagManager.tags().values()).contains("2023-07-18 11", "2023-07-19");
}
+ @Test
+ public void testSavepointTag() {
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
+ options.set(TAG_NUM_RETAINED_MAX, 3);
+ FileStoreTable table;
+ TableCommitImpl commit;
+ TagManager tagManager;
+ table = this.table.copy(options.toMap());
+
+ commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
+ tagManager = table.store().newTagManager();
+
+ // test normal creation
+ commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+
+ table.createTag("savepoint-11", 1);
+
+ // test newCommit create
+ commit.commit(new ManifestCommittable(1, utcMills("2023-07-18T14:00:00")));
+ assertThat(tagManager.tags().values()).contains("2023-07-18 11", "2023-07-18 13");
+
+ // test expire old tag
+ commit.commit(new ManifestCommittable(2, utcMills("2023-07-18T15:00:00")));
+ commit.commit(new ManifestCommittable(3, utcMills("2023-07-18T16:00:00")));
+ assertThat(tagManager.tags().values())
+ .containsOnly("savepoint-11", "2023-07-18 13", "2023-07-18 14", "2023-07-18 15");
+ }
+
private long localZoneMills(String timestamp) {
return LocalDateTime.parse(timestamp)
.atZone(ZoneId.systemDefault())