You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2024/01/15 02:12:03 UTC

(incubator-paimon) 19/20: [core] Fix creation failure caused by savepoint tag (#2515)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 0e609d70da0abef12bf2d89c217fed641704fd41
Author: monster <60...@users.noreply.github.com>
AuthorDate: Tue Dec 19 16:30:48 2023 +0800

    [core] Fix creation failure caused by savepoint tag (#2515)
---
 .../org/apache/paimon/tag/TagAutoCreation.java     |  4 +--
 .../java/org/apache/paimon/utils/TagManager.java   | 26 ++++++++++++++----
 .../org/apache/paimon/tag/TagAutoCreationTest.java | 31 ++++++++++++++++++++++
 3 files changed, 54 insertions(+), 7 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index e02da25c1..c4e9f7948 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -67,7 +67,7 @@ public class TagAutoCreation {
 
         this.periodHandler.validateDelay(delay);
 
-        SortedMap<Snapshot, String> tags = tagManager.tags();
+        SortedMap<Snapshot, String> tags = tagManager.tags(t -> !t.startsWith("savepoint"));
 
         if (tags.isEmpty()) {
             this.nextSnapshot =
@@ -122,7 +122,7 @@ public class TagAutoCreation {
             nextTag = periodHandler.nextTagTime(thisTag);
 
             if (numRetainedMax != null) {
-                SortedMap<Snapshot, String> tags = tagManager.tags();
+                SortedMap<Snapshot, String> tags = tagManager.tags(t -> !t.startsWith("savepoint"));
                 if (tags.size() > numRetainedMax) {
                     int toDelete = tags.size() - numRetainedMax;
                     int i = 0;
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 927dbd373..bd526ee2b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -179,6 +179,21 @@ public class TagManager {
 
     /** Get all tagged snapshots with names sorted by snapshot id. */
     public SortedMap<Snapshot, String> tags() {
+        return tags(tagName -> true);
+    }
+
+    /**
+     * Retrieves a sorted map of snapshots filtered based on a provided predicate. The predicate
+     * determines which tag names should be included in the result. Only snapshots with tag names
+     * that pass the predicate test are included.
+     *
+     * @param filter A Predicate that tests each tag name. Snapshots with tag names that fail the
+     *     test are excluded from the result.
+     * @return A sorted map of filtered snapshots keyed by their IDs, each associated with its tag
+     *     name.
+     * @throws RuntimeException if an IOException occurs during retrieval of snapshots.
+     */
+    public SortedMap<Snapshot, String> tags(Predicate<String> filter) {
         TreeMap<Snapshot, String> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id));
         try {
             List<Path> paths =
@@ -187,14 +202,15 @@ public class TagManager {
                             .collect(Collectors.toList());
 
             for (Path path : paths) {
+                String tagName = path.getName().substring(TAG_PREFIX.length());
+
+                if (!filter.test(tagName)) {
+                    continue;
+                }
                 // If the tag file is not found, it might be deleted by
                 // other processes, so just skip this tag
                 Snapshot.safelyFromPath(fileIO, path)
-                        .ifPresent(
-                                snapshot ->
-                                        tags.put(
-                                                snapshot,
-                                                path.getName().substring(TAG_PREFIX.length())));
+                        .ifPresent(snapshot -> tags.put(snapshot, tagName));
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
index 53dd6aed2..3b753093b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
@@ -215,6 +215,37 @@ public class TagAutoCreationTest extends PrimaryKeyTableTestBase {
         assertThat(tagManager.tags().values()).contains("2023-07-18 11", "2023-07-19");
     }
 
+    @Test
+    public void testSavepointTag() {
+        Options options = new Options();
+        options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+        options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
+        options.set(TAG_NUM_RETAINED_MAX, 3);
+        FileStoreTable table;
+        TableCommitImpl commit;
+        TagManager tagManager;
+        table = this.table.copy(options.toMap());
+
+        commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
+        tagManager = table.store().newTagManager();
+
+        // test normal creation
+        commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00")));
+        assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+
+        table.createTag("savepoint-11", 1);
+
+        // test newCommit create
+        commit.commit(new ManifestCommittable(1, utcMills("2023-07-18T14:00:00")));
+        assertThat(tagManager.tags().values()).contains("2023-07-18 11", "2023-07-18 13");
+
+        // test expire old tag
+        commit.commit(new ManifestCommittable(2, utcMills("2023-07-18T15:00:00")));
+        commit.commit(new ManifestCommittable(3, utcMills("2023-07-18T16:00:00")));
+        assertThat(tagManager.tags().values())
+                .containsOnly("savepoint-11", "2023-07-18 13", "2023-07-18 14", "2023-07-18 15");
+    }
+
     private long localZoneMills(String timestamp) {
         return LocalDateTime.parse(timestamp)
                 .atZone(ZoneId.systemDefault())