You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2024/01/15 02:14:38 UTC

(incubator-paimon) branch release-0.6 updated (bdb206807 -> 570e52792)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


    from bdb206807 [core] Use isAutoTag in TagAutoCreation
     new 146a21ed5 [flink] Generate tags when Flink Batch is completed
     new 570e52792 [core] Fix that some tag creations haven't handle tag callbacks (#2550)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content/maintenance/manage-tags.md            |   1 +
 .../shortcodes/generated/core_configuration.html   |   4 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |   6 +-
 .../java/org/apache/paimon/AbstractFileStore.java  |  31 ++++-
 .../org/apache/paimon/AppendOnlyFileStore.java     |   6 +-
 .../src/main/java/org/apache/paimon/FileStore.java |   4 +
 .../java/org/apache/paimon/KeyValueFileStore.java  |   6 +-
 .../paimon/table/AbstractFileStoreTable.java       |  76 +----------
 .../paimon/table/AppendOnlyFileStoreTable.java     |   3 +-
 .../paimon/table/PrimaryKeyFileStoreTable.java     |   3 +-
 .../apache/paimon/table/sink/CallbackUtils.java    |  75 +++++++++++
 .../org/apache/paimon/tag/TagAutoCreation.java     |  15 ++-
 .../java/org/apache/paimon/tag/TagPreview.java     |   3 +-
 .../org/apache/paimon/tag/TagTimeExtractor.java    |   1 +
 .../java/org/apache/paimon/utils/TagManager.java   |  11 +-
 .../test/java/org/apache/paimon/TestFileStore.java |   5 +-
 .../apache/paimon/operation/FileDeletionTest.java  |  28 ++--
 .../operation/UncleanedFileStoreExpireTest.java    |   2 +-
 .../sink/AutoTagForSavepointCommitterOperator.java |  18 ++-
 ...or.java => BatchWriteGeneratorTagOperator.java} | 142 ++++++++++-----------
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  11 +-
 .../AutoTagForSavepointCommitterOperatorTest.java  |   6 +-
 .../sink/BatchWriteGeneratorTagOperatorTest.java   | 102 +++++++++++++++
 23 files changed, 371 insertions(+), 188 deletions(-)
 create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
 copy paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/{AutoTagForSavepointCommitterOperator.java => BatchWriteGeneratorTagOperator.java} (62%)
 create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java


(incubator-paimon) 01/02: [flink] Generate tags when Flink Batch is completed

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 146a21ed55eceee1f867f4e4f85031e1d0ca1318
Author: siyang.zeng <si...@ly.com>
AuthorDate: Thu Dec 7 19:46:10 2023 +0800

    [flink] Generate tags when Flink Batch is completed
    
    This closes #2469
---
 docs/content/maintenance/manage-tags.md            |   1 +
 .../shortcodes/generated/core_configuration.html   |   4 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |   6 +-
 .../java/org/apache/paimon/tag/TagPreview.java     |   3 +-
 .../org/apache/paimon/tag/TagTimeExtractor.java    |   1 +
 .../flink/sink/BatchWriteGeneratorTagOperator.java | 249 +++++++++++++++++++++
 .../org/apache/paimon/flink/sink/FlinkSink.java    |   8 +
 .../sink/BatchWriteGeneratorTagOperatorTest.java   | 102 +++++++++
 8 files changed, 369 insertions(+), 5 deletions(-)

diff --git a/docs/content/maintenance/manage-tags.md b/docs/content/maintenance/manage-tags.md
index 76afb7747..865e68b5e 100644
--- a/docs/content/maintenance/manage-tags.md
+++ b/docs/content/maintenance/manage-tags.md
@@ -42,6 +42,7 @@ Paimon supports automatic creation of tags in writing job.
 You can set `'tag.automatic-creation'` to `process-time` or `watermark`:
 - `process-time`: Create TAG based on the time of the machine.
 - `watermark`: Create TAG based on the watermark of the Sink input.
+- `batch`: In a batch processing scenario, a tag is generated after the current task is completed.
 
 {{< hint info >}}
 If you choose Watermark, you may need to specify the time zone of watermark, if watermark is not in the
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 3b916d95a..ea95883a2 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -303,7 +303,7 @@ This config option does not affect the default filesystem metastore.</td>
             <td><h5>metastore.tag-to-partition.preview</h5></td>
             <td style="word-wrap: break-word;">none</td>
             <td><p>Enum</p></td>
-            <td>Whether to preview tag of generated snapshots in metastore. This allows the Hive engine to query specific tag before creation.<br /><br />Possible values:<ul><li>"none": No automatically created tags.</li><li>"process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.</li><li>"watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.</li></ul></td>
+            <td>Whether to preview tag of generated snapshots in metastore. This allows the Hive engine to query specific tag before creation.<br /><br />Possible values:<ul><li>"none": No automatically created tags.</li><li>"process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.</li><li>"watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.</li><li>"batch": In the batch p [...]
         </tr>
         <tr>
             <td><h5>num-levels</h5></td>
@@ -579,7 +579,7 @@ This config option does not affect the default filesystem metastore.</td>
             <td><h5>tag.automatic-creation</h5></td>
             <td style="word-wrap: break-word;">none</td>
             <td><p>Enum</p></td>
-            <td>Whether to create tag automatically. And how to generate tags.<br /><br />Possible values:<ul><li>"none": No automatically created tags.</li><li>"process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.</li><li>"watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.</li></ul></td>
+            <td>Whether to create tag automatically. And how to generate tags.<br /><br />Possible values:<ul><li>"none": No automatically created tags.</li><li>"process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.</li><li>"watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.</li><li>"batch": In the batch processing scenario, the tag corresponding to the current snapsho [...]
         </tr>
         <tr>
             <td><h5>tag.callback.#.param</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 506e924e2..83803f398 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1973,8 +1973,10 @@ public class CoreOptions implements Serializable {
                 "Based on the time of the machine, create TAG once the processing time passes period time plus delay."),
         WATERMARK(
                 "watermark",
-                "Based on the watermark of the input, create TAG once the watermark passes period time plus delay.");
-
+                "Based on the watermark of the input, create TAG once the watermark passes period time plus delay."),
+        BATCH(
+                "batch",
+                "In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.");
         private final String value;
         private final String description;
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
index 671e8841d..cd1a3fda1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
@@ -47,7 +47,8 @@ public class TagPreview {
     }
 
     public static TagPreview create(CoreOptions options) {
-        if (options.tagToPartitionPreview() != CoreOptions.TagCreationMode.NONE) {
+        if (options.tagToPartitionPreview() != CoreOptions.TagCreationMode.NONE
+                && options.tagToPartitionPreview() != CoreOptions.TagCreationMode.BATCH) {
             return new TagPreview(options);
         }
         return null;
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
index dadfde3a7..e017b2151 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
@@ -79,6 +79,7 @@ public interface TagTimeExtractor {
     static TagTimeExtractor create(CoreOptions.TagCreationMode mode, CoreOptions options) {
         switch (mode) {
             case NONE:
+            case BATCH:
                 return null;
             case PROCESS_TIME:
                 return new ProcessTimeExtractor();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
new file mode 100644
index 000000000..dff75ff29
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.SortedMap;
+
+/**
+ * Commit {@link Committable} for snapshot using the {@link CommitterOperator}. When the task is
+ * completed, the corresponding tag is generated.
+ */
+public class BatchWriteGeneratorTagOperator<CommitT, GlobalCommitT>
+        implements OneInputStreamOperator<CommitT, CommitT>,
+                SetupableStreamOperator,
+                BoundedOneInput {
+
+    private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-";
+
+    private static final long serialVersionUID = 1L;
+
+    private final CommitterOperator<CommitT, GlobalCommitT> commitOperator;
+
+    protected final FileStoreTable table;
+
+    public BatchWriteGeneratorTagOperator(
+            CommitterOperator<CommitT, GlobalCommitT> commitOperator, FileStoreTable table) {
+        this.table = table;
+        this.commitOperator = commitOperator;
+    }
+
+    @Override
+    public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
+            throws Exception {
+        commitOperator.initializeState(streamTaskStateManager);
+    }
+
+    @Override
+    public OperatorSnapshotFutures snapshotState(
+            long checkpointId,
+            long timestamp,
+            CheckpointOptions checkpointOptions,
+            CheckpointStreamFactory storageLocation)
+            throws Exception {
+        return commitOperator.snapshotState(
+                checkpointId, timestamp, checkpointOptions, storageLocation);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        commitOperator.notifyCheckpointComplete(checkpointId);
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        commitOperator.notifyCheckpointAborted(checkpointId);
+    }
+
+    private void createTag() {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        Snapshot snapshot = snapshotManager.latestSnapshot();
+        if (snapshot == null) {
+            return;
+        }
+        TagManager tagManager = table.tagManager();
+        TagDeletion tagDeletion = table.store().newTagDeletion();
+        Instant instant = Instant.ofEpochMilli(snapshot.timeMillis());
+        LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
+        String tagName =
+                BATCH_WRITE_TAG_PREFIX
+                        + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
+        try {
+            // If the tag already exists, delete the tag
+            if (tagManager.tagExists(tagName)) {
+                tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
+            }
+            // Create a new tag
+            tagManager.createTag(snapshot, tagName);
+            // Expire the tag
+            expireTag();
+        } catch (Exception e) {
+            if (tagManager.tagExists(tagName)) {
+                tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
+            }
+        }
+    }
+
+    private void expireTag() {
+        Integer tagNumRetainedMax = table.coreOptions().tagNumRetainedMax();
+        if (tagNumRetainedMax != null) {
+            SnapshotManager snapshotManager = table.snapshotManager();
+            if (snapshotManager.latestSnapshot() == null) {
+                return;
+            }
+            TagManager tagManager = table.tagManager();
+            TagDeletion tagDeletion = table.store().newTagDeletion();
+            SortedMap<Snapshot, String> tags = tagManager.tags();
+            if (tags.size() > tagNumRetainedMax) {
+                int toDelete = tags.size() - tagNumRetainedMax;
+                int i = 0;
+                for (String tag : tags.values()) {
+                    tagManager.deleteTag(tag, tagDeletion, snapshotManager);
+                    i++;
+                    if (i == toDelete) {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void open() throws Exception {
+        commitOperator.open();
+    }
+
+    @Override
+    public void processElement(StreamRecord<CommitT> element) throws Exception {
+        commitOperator.processElement(element);
+    }
+
+    @Override
+    public void processWatermark(Watermark watermark) throws Exception {
+        commitOperator.processWatermark(watermark);
+    }
+
+    @Override
+    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
+        commitOperator.processWatermarkStatus(watermarkStatus);
+    }
+
+    @Override
+    public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
+        commitOperator.processLatencyMarker(latencyMarker);
+    }
+
+    @Override
+    public void finish() throws Exception {
+        createTag();
+        commitOperator.finish();
+    }
+
+    @Override
+    public void close() throws Exception {
+        commitOperator.close();
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        commitOperator.prepareSnapshotPreBarrier(checkpointId);
+    }
+
+    @Override
+    public void setKeyContextElement1(StreamRecord<?> record) throws Exception {
+        commitOperator.setKeyContextElement1(record);
+    }
+
+    @Override
+    public void setKeyContextElement2(StreamRecord<?> record) throws Exception {
+        commitOperator.setKeyContextElement2(record);
+    }
+
+    @Override
+    public OperatorMetricGroup getMetricGroup() {
+        return commitOperator.getMetricGroup();
+    }
+
+    @Override
+    public OperatorID getOperatorID() {
+        return commitOperator.getOperatorID();
+    }
+
+    @Override
+    public void setCurrentKey(Object key) {
+        commitOperator.setCurrentKey(key);
+    }
+
+    @Override
+    public Object getCurrentKey() {
+        return commitOperator.getCurrentKey();
+    }
+
+    @Override
+    public void setKeyContextElement(StreamRecord<CommitT> record) throws Exception {
+        commitOperator.setKeyContextElement(record);
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        commitOperator.endInput();
+    }
+
+    @Override
+    public void setup(StreamTask containingTask, StreamConfig config, Output output) {
+        commitOperator.setup(containingTask, config, output);
+    }
+
+    @Override
+    public ChainingStrategy getChainingStrategy() {
+        return commitOperator.getChainingStrategy();
+    }
+
+    @Override
+    public void setChainingStrategy(ChainingStrategy strategy) {
+        commitOperator.setChainingStrategy(strategy);
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index c9a6f3c05..480d41ae8 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.CoreOptions.TagCreationMode;
 import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.MemorySize;
@@ -203,6 +204,13 @@ public abstract class FlinkSink<T> implements Serializable {
                             table::tagManager,
                             () -> table.store().newTagDeletion());
         }
+        if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH
+                && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) {
+            committerOperator =
+                    new BatchWriteGeneratorTagOperator<>(
+                            (CommitterOperator<Committable, ManifestCommittable>) committerOperator,
+                            table);
+        }
         SingleOutputStreamOperator<?> committed =
                 written.transform(
                                 GLOBAL_COMMITTER_NAME + " : " + table.name(),
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
new file mode 100644
index 000000000..894410dae
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.VersionedSerializerWrapper;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestCommittableSerializer;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BatchWriteGeneratorTagOperator}. */
+public class BatchWriteGeneratorTagOperatorTest extends CommitterOperatorTest {
+
+    @Test
+    public void testBatchWriteGeneratorTag() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        // set tag.automatic-creation = batch
+        HashMap<String, String> dynamicOptions = new HashMap<>();
+        dynamicOptions.put("tag.automatic-creation", "batch");
+        dynamicOptions.put("tag.num-retained-max", "2");
+        table = table.copy(dynamicOptions);
+
+        StreamTableWrite write =
+                table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+        OneInputStreamOperator<Committable, Committable> committerOperator =
+                createCommitterOperator(
+                        table,
+                        initialCommitUser,
+                        new RestoreAndFailCommittableStateManager<>(
+                                () ->
+                                        new VersionedSerializerWrapper<>(
+                                                new ManifestCommittableSerializer())));
+
+        TableCommitImpl tableCommit = table.newCommit(initialCommitUser);
+
+        write.write(GenericRow.of(1, 10L));
+        tableCommit.commit(write.prepareCommit(false, 1));
+
+        SnapshotManager snapshotManager = table.newSnapshotReader().snapshotManager();
+        TagManager tagManager = table.tagManager();
+
+        //  Generate tag name
+        String prefix = "batch-write-";
+        Instant instant =
+                Instant.ofEpochMilli(
+                        Objects.requireNonNull(snapshotManager.latestSnapshot()).timeMillis());
+        LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
+        String tagName = prefix + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
+
+        // No tag is generated before the finish method
+        assertThat(table.tagManager().tagCount()).isEqualTo(0);
+        committerOperator.finish();
+        // After the finish method, a tag is generated
+        assertThat(table.tagManager().tagCount()).isEqualTo(1);
+        // The tag is consistent with the latest snapshot
+        assertThat(tagManager.taggedSnapshot(tagName)).isEqualTo(snapshotManager.latestSnapshot());
+    }
+
+    @Override
+    protected OneInputStreamOperator<Committable, Committable> createCommitterOperator(
+            FileStoreTable table,
+            String commitUser,
+            CommittableStateManager<ManifestCommittable> committableStateManager) {
+        return new BatchWriteGeneratorTagOperator<>(
+                (CommitterOperator<Committable, ManifestCommittable>)
+                        super.createCommitterOperator(table, commitUser, committableStateManager),
+                table);
+    }
+}


(incubator-paimon) 02/02: [core] Fix that some tag creations haven't handle tag callbacks (#2550)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 570e52792472aff00d491c96fd842b35bfb8d218
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Thu Dec 21 13:27:17 2023 +0800

    [core] Fix that some tag creations haven't handle tag callbacks (#2550)
---
 .../java/org/apache/paimon/AbstractFileStore.java  | 31 ++++++++-
 .../org/apache/paimon/AppendOnlyFileStore.java     |  6 +-
 .../src/main/java/org/apache/paimon/FileStore.java |  4 ++
 .../java/org/apache/paimon/KeyValueFileStore.java  |  6 +-
 .../paimon/table/AbstractFileStoreTable.java       | 76 ++--------------------
 .../paimon/table/AppendOnlyFileStoreTable.java     |  3 +-
 .../paimon/table/PrimaryKeyFileStoreTable.java     |  3 +-
 .../apache/paimon/table/sink/CallbackUtils.java    | 75 +++++++++++++++++++++
 .../org/apache/paimon/tag/TagAutoCreation.java     | 15 +++--
 .../java/org/apache/paimon/utils/TagManager.java   | 11 +++-
 .../test/java/org/apache/paimon/TestFileStore.java |  5 +-
 .../apache/paimon/operation/FileDeletionTest.java  | 28 ++++----
 .../operation/UncleanedFileStoreExpireTest.java    |  2 +-
 .../sink/AutoTagForSavepointCommitterOperator.java | 18 +++--
 .../flink/sink/BatchWriteGeneratorTagOperator.java |  2 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  3 +-
 .../AutoTagForSavepointCommitterOperatorTest.java  |  6 +-
 17 files changed, 187 insertions(+), 107 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 344118cd0..3615b488e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -25,6 +25,8 @@ import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.metastore.AddPartitionTagCallback;
+import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.FileStoreCommitImpl;
 import org.apache.paimon.operation.FileStoreExpireImpl;
 import org.apache.paimon.operation.PartitionExpire;
@@ -32,6 +34,9 @@ import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.sink.CallbackUtils;
+import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoCreation;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -42,7 +47,9 @@ import org.apache.paimon.utils.TagManager;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.List;
 
 /**
  * Base {@link FileStore} implementation.
@@ -56,6 +63,7 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
     protected final long schemaId;
     protected final CoreOptions options;
     protected final RowType partitionType;
+    private final CatalogEnvironment catalogEnvironment;
 
     @Nullable private final SegmentsCache<String> writeManifestCache;
 
@@ -64,12 +72,14 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
             SchemaManager schemaManager,
             long schemaId,
             CoreOptions options,
-            RowType partitionType) {
+            RowType partitionType,
+            CatalogEnvironment catalogEnvironment) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.schemaId = schemaId;
         this.options = options;
         this.partitionType = partitionType;
+        this.catalogEnvironment = catalogEnvironment;
         MemorySize writeManifestCache = options.writeManifestCache();
         this.writeManifestCache =
                 writeManifestCache.getBytes() == 0
@@ -230,6 +240,23 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
     @Nullable
     public TagAutoCreation newTagCreationManager() {
         return TagAutoCreation.create(
-                options, snapshotManager(), newTagManager(), newTagDeletion());
+                options,
+                snapshotManager(),
+                newTagManager(),
+                newTagDeletion(),
+                createTagCallbacks());
+    }
+
+    @Override
+    public List<TagCallback> createTagCallbacks() {
+        List<TagCallback> callbacks = new ArrayList<>(CallbackUtils.loadTagCallbacks(options));
+        String partitionField = options.tagToPartitionField();
+        MetastoreClient.Factory metastoreClientFactory =
+                catalogEnvironment.metastoreClientFactory();
+        if (partitionField != null && metastoreClientFactory != null) {
+            callbacks.add(
+                    new AddPartitionTagCallback(metastoreClientFactory.create(), partitionField));
+        }
+        return callbacks;
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index a36d02bd8..ec1e7cb58 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -29,6 +29,7 @@ import org.apache.paimon.operation.ScanBucketFilter;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.types.RowType;
 
 import java.util.Comparator;
@@ -53,8 +54,9 @@ public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {
             RowType partitionType,
             RowType bucketKeyType,
             RowType rowType,
-            String tableName) {
-        super(fileIO, schemaManager, schemaId, options, partitionType);
+            String tableName,
+            CatalogEnvironment catalogEnvironment) {
+        super(fileIO, schemaManager, schemaId, options, partitionType, catalogEnvironment);
         this.bucketKeyType = bucketKeyType;
         this.rowType = rowType;
         this.tableName = tableName;
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index e67cf9f1e..f044102dc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -31,6 +31,7 @@ import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoCreation;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -40,6 +41,7 @@ import org.apache.paimon.utils.TagManager;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.List;
 
 /**
  * File store interface.
@@ -89,4 +91,6 @@ public interface FileStore<T> extends Serializable {
     TagAutoCreation newTagCreationManager();
 
     boolean mergeSchema(RowType rowType, boolean allowExplicitCast);
+
+    List<TagCallback> createTagCallbacks();
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 98b88323f..43aadcbfd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -34,6 +34,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.KeyComparatorSupplier;
@@ -79,8 +80,9 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
             RowType valueType,
             KeyValueFieldsExtractor keyValueFieldsExtractor,
             MergeFunctionFactory<KeyValue> mfFactory,
-            String tableName) {
-        super(fileIO, schemaManager, schemaId, options, partitionType);
+            String tableName,
+            CatalogEnvironment catalogEnvironment) {
+        super(fileIO, schemaManager, schemaId, options, partitionType, catalogEnvironment);
         this.crossPartitionUpdate = crossPartitionUpdate;
         this.bucketKeyType = bucketKeyType;
         this.keyType = keyType;
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 7e206d787..423ed58ee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -34,12 +34,12 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaValidation;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.CallbackUtils;
 import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.RowKeyExtractor;
 import org.apache.paimon.table.sink.TableCommitImpl;
-import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
 import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.InnerStreamTableScanImpl;
@@ -50,15 +50,12 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
 import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
 import org.apache.paimon.tag.TagPreview;
-import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -269,7 +266,8 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
     }
 
     private List<CommitCallback> createCommitCallbacks() {
-        List<CommitCallback> callbacks = new ArrayList<>(loadCommitCallbacks());
+        List<CommitCallback> callbacks =
+                new ArrayList<>(CallbackUtils.loadCommitCallbacks(coreOptions()));
         CoreOptions options = coreOptions();
         MetastoreClient.Factory metastoreClientFactory =
                 catalogEnvironment.metastoreClientFactory();
@@ -293,62 +291,6 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
         return callbacks;
     }
 
-    private List<TagCallback> createTagCallbacks() {
-        List<TagCallback> callbacks = new ArrayList<>(loadTagCallbacks());
-        String partitionField = coreOptions().tagToPartitionField();
-        MetastoreClient.Factory metastoreClientFactory =
-                catalogEnvironment.metastoreClientFactory();
-        if (partitionField != null && metastoreClientFactory != null) {
-            callbacks.add(
-                    new AddPartitionTagCallback(metastoreClientFactory.create(), partitionField));
-        }
-        return callbacks;
-    }
-
-    private List<TagCallback> loadTagCallbacks() {
-        return loadCallbacks(coreOptions().tagCallbacks(), TagCallback.class);
-    }
-
-    private List<CommitCallback> loadCommitCallbacks() {
-        return loadCallbacks(coreOptions().commitCallbacks(), CommitCallback.class);
-    }
-
-    @SuppressWarnings("unchecked")
-    private <T> List<T> loadCallbacks(Map<String, String> clazzParamMaps, Class<T> expectClass) {
-        List<T> result = new ArrayList<>();
-
-        for (Map.Entry<String, String> classParamEntry : clazzParamMaps.entrySet()) {
-            String className = classParamEntry.getKey();
-            String param = classParamEntry.getValue();
-
-            Class<?> clazz;
-            try {
-                clazz = Class.forName(className, true, this.getClass().getClassLoader());
-            } catch (ClassNotFoundException e) {
-                throw new RuntimeException(e);
-            }
-
-            Preconditions.checkArgument(
-                    expectClass.isAssignableFrom(clazz),
-                    "Class " + clazz + " must implement " + expectClass);
-
-            try {
-                if (param == null) {
-                    result.add((T) clazz.newInstance());
-                } else {
-                    result.add((T) clazz.getConstructor(String.class).newInstance(param));
-                }
-            } catch (Exception e) {
-                throw new RuntimeException(
-                        "Failed to initialize commit callback "
-                                + className
-                                + (param == null ? "" : " with param " + param),
-                        e);
-            }
-        }
-        return result;
-    }
-
     private Optional<TableSchema> tryTimeTravel(Options options) {
         CoreOptions coreOptions = new CoreOptions(options);
 
@@ -404,17 +346,7 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
                 fromSnapshotId);
 
         Snapshot snapshot = snapshotManager.snapshot(fromSnapshotId);
-        tagManager().createTag(snapshot, tagName);
-
-        List<TagCallback> callbacks = Collections.emptyList();
-        try {
-            callbacks = createTagCallbacks();
-            callbacks.forEach(callback -> callback.notifyCreation(tagName));
-        } finally {
-            for (TagCallback tagCallback : callbacks) {
-                IOUtils.closeQuietly(tagCallback);
-            }
-        }
+        tagManager().createTag(snapshot, tagName, store().createTagCallbacks());
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index dc865d93c..5f2c479e8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -81,7 +81,8 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
                             tableSchema.logicalPartitionType(),
                             tableSchema.logicalBucketKeyType(),
                             tableSchema.logicalRowType(),
-                            name());
+                            name(),
+                            catalogEnvironment);
         }
         return lazyStore;
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 84a586066..fc4db704d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -109,7 +109,8 @@ public class PrimaryKeyFileStoreTable extends AbstractFileStoreTable {
                             rowType,
                             extractor,
                             mfFactory,
-                            name());
+                            name(),
+                            catalogEnvironment);
         }
         return lazyStore;
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
new file mode 100644
index 000000000..7d8a0a849
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.utils.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Utils to load callbacks. */
+public class CallbackUtils {
+
+    public static List<TagCallback> loadTagCallbacks(CoreOptions coreOptions) {
+        return loadCallbacks(coreOptions.tagCallbacks(), TagCallback.class);
+    }
+
+    public static List<CommitCallback> loadCommitCallbacks(CoreOptions coreOptions) {
+        return loadCallbacks(coreOptions.commitCallbacks(), CommitCallback.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> List<T> loadCallbacks(
+            Map<String, String> clazzParamMaps, Class<T> expectClass) {
+        List<T> result = new ArrayList<>();
+
+        for (Map.Entry<String, String> classParamEntry : clazzParamMaps.entrySet()) {
+            String className = classParamEntry.getKey();
+            String param = classParamEntry.getValue();
+
+            Class<?> clazz;
+            try {
+                clazz = Class.forName(className, true, CallbackUtils.class.getClassLoader());
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+
+            Preconditions.checkArgument(
+                    expectClass.isAssignableFrom(clazz),
+                    "Class " + clazz + " must implement " + expectClass);
+
+            try {
+                if (param == null) {
+                    result.add((T) clazz.newInstance());
+                } else {
+                    result.add((T) clazz.getConstructor(String.class).newInstance(param));
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(
+                        "Failed to initialize commit callback "
+                                + className
+                                + (param == null ? "" : " with param " + param),
+                        e);
+            }
+        }
+        return result;
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index ba7069bd8..3b15d4891 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -21,6 +21,7 @@ package org.apache.paimon.tag;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagTimeExtractor.ProcessTimeExtractor;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -29,6 +30,7 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.time.LocalDateTime;
+import java.util.List;
 import java.util.Optional;
 import java.util.SortedMap;
 
@@ -45,6 +47,7 @@ public class TagAutoCreation {
     private final TagPeriodHandler periodHandler;
     private final Duration delay;
     private final Integer numRetainedMax;
+    private final List<TagCallback> callbacks;
 
     private LocalDateTime nextTag;
     private long nextSnapshot;
@@ -56,7 +59,8 @@ public class TagAutoCreation {
             TagTimeExtractor timeExtractor,
             TagPeriodHandler periodHandler,
             Duration delay,
-            Integer numRetainedMax) {
+            Integer numRetainedMax,
+            List<TagCallback> callbacks) {
         this.snapshotManager = snapshotManager;
         this.tagManager = tagManager;
         this.tagDeletion = tagDeletion;
@@ -64,6 +68,7 @@ public class TagAutoCreation {
         this.periodHandler = periodHandler;
         this.delay = delay;
         this.numRetainedMax = numRetainedMax;
+        this.callbacks = callbacks;
 
         this.periodHandler.validateDelay(delay);
 
@@ -127,7 +132,7 @@ public class TagAutoCreation {
                 || isAfterOrEqual(time.minus(delay), periodHandler.nextTagTime(nextTag))) {
             LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time);
             String tagName = periodHandler.timeToTag(thisTag);
-            tagManager.createTag(snapshot, tagName);
+            tagManager.createTag(snapshot, tagName, callbacks);
             nextTag = periodHandler.nextTagTime(thisTag);
 
             if (numRetainedMax != null) {
@@ -156,7 +161,8 @@ public class TagAutoCreation {
             CoreOptions options,
             SnapshotManager snapshotManager,
             TagManager tagManager,
-            TagDeletion tagDeletion) {
+            TagDeletion tagDeletion,
+            List<TagCallback> callbacks) {
         TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options);
         if (extractor == null) {
             return null;
@@ -168,6 +174,7 @@ public class TagAutoCreation {
                 extractor,
                 TagPeriodHandler.create(options),
                 options.tagCreationDelay(),
-                options.tagNumRetainedMax());
+                options.tagNumRetainedMax(),
+                callbacks);
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index bd526ee2b..a83e201d5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.sink.TagCallback;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,7 +67,7 @@ public class TagManager {
     }
 
     /** Create a tag from given snapshot and save it in the storage. */
-    public void createTag(Snapshot snapshot, String tagName) {
+    public void createTag(Snapshot snapshot, String tagName, List<TagCallback> callbacks) {
         checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName);
         checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName);
         checkArgument(
@@ -85,6 +86,14 @@ public class TagManager {
                             tagName, newTagPath),
                     e);
         }
+
+        try {
+            callbacks.forEach(callback -> callback.notifyCreation(tagName));
+        } finally {
+            for (TagCallback tagCallback : callbacks) {
+                IOUtils.closeQuietly(tagCallback);
+            }
+        }
     }
 
     public void deleteTag(
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index c8fc99432..0a9b2bf5b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -39,11 +39,13 @@ import org.apache.paimon.operation.FileStoreCommitImpl;
 import org.apache.paimon.operation.FileStoreExpireImpl;
 import org.apache.paimon.operation.FileStoreRead;
 import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
@@ -113,7 +115,8 @@ public class TestFileStore extends KeyValueFileStore {
                 valueType,
                 keyValueFieldsExtractor,
                 mfFactory,
-                (new Path(root)).getName());
+                (new Path(root)).getName(),
+                new CatalogEnvironment(Lock.emptyFactory(), null, null));
         this.root = root;
         this.fileIO = FileIOFinder.find(new Path(root));
         this.keySerializer = new InternalRowSerializer(keyType);
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 30b28bd57..7556e6ead 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -76,11 +76,13 @@ public class FileDeletionTest {
 
     private long commitIdentifier;
     private String root;
+    private TagManager tagManager;
 
     @BeforeEach
     public void setup() throws Exception {
         commitIdentifier = 0L;
         root = tempDir.toString();
+        tagManager = null;
     }
 
     /**
@@ -297,7 +299,7 @@ public class FileDeletionTest {
     @Test
     public void testExpireWithExistingTags() throws Exception {
         TestFileStore store = createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 4);
-        TagManager tagManager = new TagManager(fileIO, store.options().path());
+        tagManager = new TagManager(fileIO, store.options().path());
         SnapshotManager snapshotManager = store.snapshotManager();
         TestKeyValueGenerator gen =
                 new TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
@@ -313,7 +315,7 @@ public class FileDeletionTest {
 
         // step 2: commit -A (by clean bucket 0) and create tag1
         cleanBucket(store, gen.getPartition(gen.next()), 0);
-        tagManager.createTag(snapshotManager.snapshot(2), "tag1");
+        createTag(snapshotManager.snapshot(2), "tag1");
         assertThat(tagManager.tagExists("tag1")).isTrue();
 
         // step 3: commit C to bucket 2
@@ -324,7 +326,7 @@ public class FileDeletionTest {
 
         // step 4: commit -B (by clean bucket 1) and create tag2
         cleanBucket(store, partition, 1);
-        tagManager.createTag(snapshotManager.snapshot(4), "tag2");
+        createTag(snapshotManager.snapshot(4), "tag2");
         assertThat(tagManager.tagExists("tag2")).isTrue();
 
         // step 5: commit D to bucket 3
@@ -375,7 +377,7 @@ public class FileDeletionTest {
     @Test
     public void testExpireWithUpgradeAndTags() throws Exception {
         TestFileStore store = createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
-        TagManager tagManager = new TagManager(fileIO, store.options().path());
+        tagManager = new TagManager(fileIO, store.options().path());
         SnapshotManager snapshotManager = store.snapshotManager();
         TestKeyValueGenerator gen =
                 new TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
@@ -404,7 +406,7 @@ public class FileDeletionTest {
         // snapshot 3: commit -A (by clean bucket 0)
         cleanBucket(store, gen.getPartition(gen.next()), 0);
 
-        tagManager.createTag(snapshotManager.snapshot(1), "tag1");
+        createTag(snapshotManager.snapshot(1), "tag1");
         store.newExpire(1, 1, Long.MAX_VALUE).expire();
 
         // check data file and manifests
@@ -429,7 +431,7 @@ public class FileDeletionTest {
     @Test
     public void testDeleteTagWithSnapshot() throws Exception {
         TestFileStore store = createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 3);
-        TagManager tagManager = new TagManager(fileIO, store.options().path());
+        tagManager = new TagManager(fileIO, store.options().path());
         SnapshotManager snapshotManager = store.snapshotManager();
         TestKeyValueGenerator gen =
                 new TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
@@ -461,7 +463,7 @@ public class FileDeletionTest {
                 Arrays.asList(snapshot1.baseManifestList(), snapshot1.deltaManifestList());
 
         // create tag1
-        tagManager.createTag(snapshot1, "tag1");
+        createTag(snapshot1, "tag1");
 
         // expire snapshot 1, 2
         store.newExpire(1, 1, Long.MAX_VALUE).expire();
@@ -502,7 +504,7 @@ public class FileDeletionTest {
     @Test
     public void testDeleteTagWithOtherTag() throws Exception {
         TestFileStore store = createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 3);
-        TagManager tagManager = new TagManager(fileIO, store.options().path());
+        tagManager = new TagManager(fileIO, store.options().path());
         SnapshotManager snapshotManager = store.snapshotManager();
         TestKeyValueGenerator gen =
                 new TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
@@ -535,9 +537,9 @@ public class FileDeletionTest {
                 Arrays.asList(snapshot2.baseManifestList(), snapshot2.deltaManifestList());
 
         // create tags
-        tagManager.createTag(snapshotManager.snapshot(1), "tag1");
-        tagManager.createTag(snapshotManager.snapshot(2), "tag2");
-        tagManager.createTag(snapshotManager.snapshot(4), "tag3");
+        createTag(snapshotManager.snapshot(1), "tag1");
+        createTag(snapshotManager.snapshot(2), "tag2");
+        createTag(snapshotManager.snapshot(4), "tag3");
 
         // expire snapshot 1, 2, 3, 4
         store.newExpire(1, 1, Long.MAX_VALUE).expire();
@@ -779,4 +781,8 @@ public class FileDeletionTest {
                         store.snapshotManager().latestSnapshot(),
                         null);
     }
+
+    private void createTag(Snapshot snapshot, String tagName) {
+        tagManager.createTag(snapshot, tagName, Collections.emptyList());
+    }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
index ecaf5ae8e..a0794168f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
@@ -97,7 +97,7 @@ public class UncleanedFileStoreExpireTest extends FileStoreExpireTestBase {
         // create tags for each snapshot
         for (int id = 1; id <= latestSnapshotId; id++) {
             Snapshot snapshot = snapshotManager.snapshot(id);
-            tagManager.createTag(snapshot, "tag" + id);
+            tagManager.createTag(snapshot, "tag" + id, Collections.emptyList());
         }
 
         // randomly expire snapshots
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
index 02c5b9606..da3425e9b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.utils.SerializableSupplier;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -71,13 +72,17 @@ public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
 
     private final SerializableSupplier<TagDeletion> tagDeletionFactory;
 
+    private final SerializableSupplier<List<TagCallback>> callbacksSupplier;
+
     private final NavigableSet<Long> identifiersForTags;
 
-    protected SnapshotManager snapshotManager;
+    private transient SnapshotManager snapshotManager;
+
+    private transient TagManager tagManager;
 
-    protected TagManager tagManager;
+    private transient TagDeletion tagDeletion;
 
-    protected TagDeletion tagDeletion;
+    private transient List<TagCallback> callbacks;
 
     private transient ListState<Long> identifiersForTagsState;
 
@@ -85,11 +90,13 @@ public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
             CommitterOperator<CommitT, GlobalCommitT> commitOperator,
             SerializableSupplier<SnapshotManager> snapshotManagerFactory,
             SerializableSupplier<TagManager> tagManagerFactory,
-            SerializableSupplier<TagDeletion> tagDeletionFactory) {
+            SerializableSupplier<TagDeletion> tagDeletionFactory,
+            SerializableSupplier<List<TagCallback>> callbacksSupplier) {
         this.commitOperator = commitOperator;
         this.tagManagerFactory = tagManagerFactory;
         this.snapshotManagerFactory = snapshotManagerFactory;
         this.tagDeletionFactory = tagDeletionFactory;
+        this.callbacksSupplier = callbacksSupplier;
         this.identifiersForTags = new TreeSet<>();
     }
 
@@ -102,6 +109,7 @@ public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
             snapshotManager = snapshotManagerFactory.get();
             tagManager = tagManagerFactory.get();
             tagDeletion = tagDeletionFactory.get();
+            callbacks = callbacksSupplier.get();
 
             identifiersForTagsState =
                     commitOperator
@@ -159,7 +167,7 @@ public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
         for (Snapshot snapshot : snapshotForTags) {
             String tagName = SAVEPOINT_TAG_PREFIX + snapshot.commitIdentifier();
             if (!tagManager.tagExists(tagName)) {
-                tagManager.createTag(snapshot, tagName);
+                tagManager.createTag(snapshot, tagName, callbacks);
             }
         }
     }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
index dff75ff29..36ae32d15 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
@@ -117,7 +117,7 @@ public class BatchWriteGeneratorTagOperator<CommitT, GlobalCommitT>
                 tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
             }
             // Create a new tag
-            tagManager.createTag(snapshot, tagName);
+            tagManager.createTag(snapshot, tagName, table.store().createTagCallbacks());
             // Expire the tag
             expireTag();
         } catch (Exception e) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 480d41ae8..7d3cecdbf 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -202,7 +202,8 @@ public abstract class FlinkSink<T> implements Serializable {
                             (CommitterOperator<Committable, ManifestCommittable>) committerOperator,
                             table::snapshotManager,
                             table::tagManager,
-                            () -> table.store().newTagDeletion());
+                            () -> table.store().newTagDeletion(),
+                            () -> table.store().createTagCallbacks());
         }
         if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH
                 && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
index 60839805f..880e052c7 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
@@ -206,7 +206,8 @@ public class AutoTagForSavepointCommitterOperatorTest extends CommitterOperatorT
                         super.createCommitterOperator(table, commitUser, committableStateManager),
                 table::snapshotManager,
                 table::tagManager,
-                () -> table.store().newTagDeletion());
+                () -> table.store().newTagDeletion(),
+                () -> table.store().createTagCallbacks());
     }
 
     @Override
@@ -221,6 +222,7 @@ public class AutoTagForSavepointCommitterOperatorTest extends CommitterOperatorT
                                 table, commitUser, committableStateManager, initializeFunction),
                 table::snapshotManager,
                 table::tagManager,
-                () -> table.store().newTagDeletion());
+                () -> table.store().newTagDeletion(),
+                () -> table.store().createTagCallbacks());
     }
 }