You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by yu...@apache.org on 2024/01/10 08:02:38 UTC

(incubator-paimon) branch master updated: [flink] Cdc ingestion support warkmark strategy (#2640)

This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7992ee910 [flink] Cdc ingestion support warkmark strategy (#2640)
7992ee910 is described below

commit 7992ee910445c2f9a0466538e3747dcd92b146b5
Author: monster <60...@users.noreply.github.com>
AuthorDate: Wed Jan 10 16:02:32 2024 +0800

    [flink] Cdc ingestion support warkmark strategy (#2640)
---
 .../org/apache/paimon/utils/JsonSerdeUtil.java     |  25 +++++
 .../action/cdc/SynchronizationActionBase.java      |  37 ++++++-
 .../watermark/CdcTimestampExtractorFactory.java    | 121 +++++++++++++++++++++
 .../action/cdc/watermark/CdcWatermarkStrategy.java |  70 ++++++++++++
 .../cdc/kafka/KafkaCanalSyncTableActionITCase.java |  36 ++++++
 .../kafka/KafkaDebeziumSyncTableActionITCase.java  |  36 ++++++
 .../kafka/KafkaMaxwellSyncTableActionITCase.java   |  36 ++++++
 .../cdc/kafka/KafkaOggSyncTableActionITCase.java   |  36 ++++++
 .../cdc/pulsar/PulsarSyncTableActionITCase.java    |  42 +++++++
 .../kafka/canal/table/watermark/canal-data-1.txt   |  20 ++++
 .../debezium/table/watermark/debezium-data-1.txt   |  20 ++++
 .../maxwell/table/watermark/maxwell-data-1.txt     |  20 ++++
 .../kafka/ogg/table/watermark/ogg-data-1.txt       |  20 ++++
 13 files changed, 518 insertions(+), 1 deletion(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
index 1477d55ce..19a57ac58 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
@@ -208,6 +208,31 @@ public class JsonSerdeUtil {
         return clazz.cast(resultNode);
     }
 
+    /** Parses a JSON string and extracts a value of the specified type from the given path keys. */
+    public static <T> T extractValue(String json, Class<T> valueType, String... path)
+            throws JsonProcessingException {
+        JsonNode currentNode = OBJECT_MAPPER_INSTANCE.readTree(json);
+        for (String key : path) {
+            currentNode = currentNode.get(key);
+            if (currentNode == null) {
+                throw new IllegalArgumentException("Invalid path or key not found: " + key);
+            }
+        }
+        return OBJECT_MAPPER_INSTANCE.treeToValue(currentNode, valueType);
+    }
+
+    /** Checks if a specified node exists in a JSON string. */
+    public static boolean isNodeExists(String json, String... path) throws JsonProcessingException {
+        JsonNode currentNode = OBJECT_MAPPER_INSTANCE.readTree(json);
+        for (String key : path) {
+            currentNode = currentNode.get(key);
+            if (currentNode == null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     public static boolean isNull(JsonNode jsonNode) {
         return jsonNode == null || jsonNode.isNull();
     }
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index 3cc77e6f5..6d5344f5c 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -18,11 +18,14 @@
 
 package org.apache.paimon.flink.action.cdc;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.options.Options;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -32,11 +35,20 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 
+import static org.apache.paimon.CoreOptions.TagCreationMode.WATERMARK;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
+import static org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.createExtractor;
+
 /** Base {@link Action} for table/database synchronizing job. */
 public abstract class SynchronizationActionBase extends ActionBase {
 
@@ -123,9 +135,32 @@ public abstract class SynchronizationActionBase extends ActionBase {
 
     private DataStreamSource<String> buildDataStreamSource(Object source) {
         if (source instanceof Source) {
+            boolean isAutomaticWatermarkCreationEnabled =
+                    tableConfig.containsKey(CoreOptions.TAG_AUTOMATIC_CREATION.key())
+                            && Objects.equals(
+                                    tableConfig.get(CoreOptions.TAG_AUTOMATIC_CREATION.key()),
+                                    WATERMARK.toString());
+
+            Options options = Options.fromMap(tableConfig);
+            Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
+            String watermarkAlignGroup = options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
+            WatermarkStrategy<String> watermarkStrategy =
+                    isAutomaticWatermarkCreationEnabled
+                            ? watermarkAlignGroup != null
+                                    ? new CdcWatermarkStrategy(createExtractor(source))
+                                            .withWatermarkAlignment(
+                                                    watermarkAlignGroup,
+                                                    options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
+                                                    options.get(
+                                                            SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL))
+                                    : new CdcWatermarkStrategy(createExtractor(source))
+                            : WatermarkStrategy.noWatermarks();
+            if (idleTimeout != null) {
+                watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
+            }
             return env.fromSource(
                     (Source<String, ?, ?>) source,
-                    WatermarkStrategy.noWatermarks(),
+                    watermarkStrategy,
                     syncJobHandler.provideSourceName());
         }
         if (source instanceof SourceFunction) {
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
new file mode 100644
index 000000000..0e91353f9
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.watermark;
+
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+
+import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/** Factory for creating CDC timestamp extractors based on different source types. */
+public class CdcTimestampExtractorFactory implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Map<Class<?>, Supplier<CdcTimestampExtractor>> extractorMap =
+            new HashMap<>();
+
+    static {
+        extractorMap.put(MongoDBSource.class, MongoDBCdcTimestampExtractor::new);
+        extractorMap.put(MySqlSource.class, MysqlCdcTimestampExtractor::new);
+        extractorMap.put(PulsarSource.class, MessageQueueCdcTimestampExtractor::new);
+        extractorMap.put(KafkaSource.class, MessageQueueCdcTimestampExtractor::new);
+    }
+
+    public static CdcTimestampExtractor createExtractor(Object source) {
+        Supplier<CdcTimestampExtractor> extractorSupplier = extractorMap.get(source.getClass());
+        if (extractorSupplier != null) {
+            return extractorSupplier.get();
+        }
+        throw new IllegalArgumentException(
+                "Unsupported source type: " + source.getClass().getName());
+    }
+
+    /** Timestamp extractor for MongoDB sources in CDC applications. */
+    public static class MongoDBCdcTimestampExtractor implements CdcTimestampExtractor {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public long extractTimestamp(String record) throws JsonProcessingException {
+            return JsonSerdeUtil.extractValue(record, Long.class, "payload", "ts_ms");
+        }
+    }
+
+    /** Timestamp extractor for Kafka/Pulsar sources in CDC applications. */
+    public static class MessageQueueCdcTimestampExtractor implements CdcTimestampExtractor {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public long extractTimestamp(String record) throws JsonProcessingException {
+            if (JsonSerdeUtil.isNodeExists(record, "mysqlType")) {
+                // Canal json
+                return JsonSerdeUtil.extractValue(record, Long.class, "ts");
+            } else if (JsonSerdeUtil.isNodeExists(record, "pos")) {
+                // Ogg json
+                String dateTimeString = JsonSerdeUtil.extractValue(record, String.class, "op_ts");
+                DateTimeFormatter formatter =
+                        DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+                LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
+                return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+            } else if (JsonSerdeUtil.isNodeExists(record, "xid")) {
+                // Maxwell json
+                return JsonSerdeUtil.extractValue(record, Long.class, "ts") * 1000;
+            } else if (JsonSerdeUtil.isNodeExists(record, "payload", "source", "connector")) {
+                // Dbz json
+                return JsonSerdeUtil.extractValue(record, Long.class, "payload", "ts_ms");
+            } else if (JsonSerdeUtil.isNodeExists(record, "source", "connector")) {
+                // Dbz json
+                return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms");
+            }
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to extract timestamp: The JSON format of the message queue is unsupported. Record details: %s",
+                            record));
+        }
+    }
+
+    /** Timestamp extractor for MySQL sources in CDC applications. */
+    public static class MysqlCdcTimestampExtractor implements CdcTimestampExtractor {
+
+        @Override
+        public long extractTimestamp(String record) throws JsonProcessingException {
+            return JsonSerdeUtil.extractValue(record, Long.class, "source", "ts_ms");
+        }
+    }
+
+    /** Interface defining the contract for CDC timestamp extraction. */
+    public interface CdcTimestampExtractor extends Serializable {
+
+        long extractTimestamp(String record) throws JsonProcessingException;
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
new file mode 100644
index 000000000..218d45d63
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.watermark;
+
+import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.CdcTimestampExtractor;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+
+/**
+ * Watermark strategy for CDC sources, generating watermarks based on timestamps extracted from
+ * records.
+ */
+public class CdcWatermarkStrategy implements WatermarkStrategy<String> {
+
+    private final CdcTimestampExtractor timestampExtractor;
+    private static final long serialVersionUID = 1L;
+    private long currentMaxTimestamp;
+
+    public CdcWatermarkStrategy(CdcTimestampExtractor extractor) {
+        this.timestampExtractor = extractor;
+    }
+
+    @Override
+    public WatermarkGenerator<String> createWatermarkGenerator(
+            WatermarkGeneratorSupplier.Context context) {
+        return new WatermarkGenerator<String>() {
+
+            @Override
+            public void onEvent(String record, long timestamp, WatermarkOutput output) {
+                long tMs;
+                try {
+                    tMs = timestampExtractor.extractTimestamp(record);
+                } catch (JsonProcessingException e) {
+                    throw new RuntimeException(e);
+                }
+                currentMaxTimestamp = Math.max(currentMaxTimestamp, tMs);
+                output.emitWatermark(new Watermark(currentMaxTimestamp - 1));
+            }
+
+            @Override
+            public void onPeriodicEmit(WatermarkOutput output) {
+                long timeMillis = System.currentTimeMillis();
+                currentMaxTimestamp = Math.max(timeMillis, currentMaxTimestamp);
+                output.emitWatermark(new Watermark(currentMaxTimestamp - 1));
+            }
+        };
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index ed8930d41..e81ed986d 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -20,6 +20,8 @@ package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -1241,4 +1243,38 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
                 rowType,
                 Collections.singletonList("_id"));
     }
+
+    @Test
+    @Timeout(60)
+    public void testWaterMarkSyncTable() throws Exception {
+        String topic = "watermark";
+        createTestTopic(topic, 1, 1);
+        writeRecordsToKafka(topic, readLines("kafka/canal/table/watermark/canal-data-1.txt"));
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+        kafkaConfig.put(TOPIC.key(), topic);
+
+        Map<String, String> config = getBasicTableConfig();
+        config.put("tag.automatic-creation", "watermark");
+        config.put("tag.creation-period", "hourly");
+        config.put("scan.watermark.alignment.group", "alignment-group-1");
+        config.put("scan.watermark.alignment.max-drift", "20 s");
+        config.put("scan.watermark.alignment.update-interval", "1 s");
+
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig).withTableConfig(config).build();
+        runActionWithDefaultEnv(action);
+
+        AbstractFileStoreTable table =
+                (AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
+        while (true) {
+            if (table.snapshotManager().snapshotCount() > 0
+                    && table.snapshotManager().latestSnapshot().watermark()
+                            != -9223372036854775808L) {
+                return;
+            }
+            Thread.sleep(1000);
+        }
+    }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
index 3cbe76053..d4d69e6e8 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -218,4 +220,38 @@ public class KafkaDebeziumSyncTableActionITCase extends KafkaActionITCaseBase {
                 Collections.singletonList("+I[101, hammer, {\"row_key\":\"value\"}]");
         waitForResult(expected, table, rowType, primaryKeys);
     }
+
+    @Test
+    @Timeout(60)
+    public void testWaterMarkSyncTable() throws Exception {
+        String topic = "watermark";
+        createTestTopic(topic, 1, 1);
+        writeRecordsToKafka(topic, readLines("kafka/debezium/table/watermark/debezium-data-1.txt"));
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
+        kafkaConfig.put(TOPIC.key(), topic);
+
+        Map<String, String> config = getBasicTableConfig();
+        config.put("tag.automatic-creation", "watermark");
+        config.put("tag.creation-period", "hourly");
+        config.put("scan.watermark.alignment.group", "alignment-group-1");
+        config.put("scan.watermark.alignment.max-drift", "20 s");
+        config.put("scan.watermark.alignment.update-interval", "1 s");
+
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig).withTableConfig(config).build();
+        runActionWithDefaultEnv(action);
+
+        AbstractFileStoreTable table =
+                (AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
+        while (true) {
+            if (table.snapshotManager().snapshotCount() > 0
+                    && table.snapshotManager().latestSnapshot().watermark()
+                            != -9223372036854775808L) {
+                return;
+            }
+            Thread.sleep(1000);
+        }
+    }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
index 6a8605081..856f66a1e 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -502,4 +504,38 @@ public class KafkaMaxwellSyncTableActionITCase extends KafkaActionITCaseBase {
                 rowType,
                 Arrays.asList("_id", "_year"));
     }
+
+    @Test
+    @Timeout(60)
+    public void testWaterMarkSyncTable() throws Exception {
+        String topic = "watermark";
+        createTestTopic(topic, 1, 1);
+        writeRecordsToKafka(topic, readLines("kafka/maxwell/table/watermark/maxwell-data-1.txt"));
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+        kafkaConfig.put(TOPIC.key(), topic);
+
+        Map<String, String> config = getBasicTableConfig();
+        config.put("tag.automatic-creation", "watermark");
+        config.put("tag.creation-period", "hourly");
+        config.put("scan.watermark.alignment.group", "alignment-group-1");
+        config.put("scan.watermark.alignment.max-drift", "20 s");
+        config.put("scan.watermark.alignment.update-interval", "1 s");
+
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig).withTableConfig(config).build();
+        runActionWithDefaultEnv(action);
+
+        AbstractFileStoreTable table =
+                (AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
+        while (true) {
+            if (table.snapshotManager().snapshotCount() > 0
+                    && table.snapshotManager().latestSnapshot().watermark()
+                            != -9223372036854775808L) {
+                return;
+            }
+            Thread.sleep(1000);
+        }
+    }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
index 3ca5504cc..1c4497021 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -558,4 +560,38 @@ public class KafkaOggSyncTableActionITCase extends KafkaActionITCaseBase {
                         "+I[102, car battery, 12V car battery, 8.100000381469727]");
         waitForResult(expectedReplace, table, rowType, primaryKeys);
     }
+
+    @Test
+    @Timeout(60)
+    public void testWaterMarkSyncTable() throws Exception {
+        String topic = "watermark";
+        createTestTopic(topic, 1, 1);
+        writeRecordsToKafka(topic, readLines("kafka/ogg/table/watermark/ogg-data-1.txt"));
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+        kafkaConfig.put(TOPIC.key(), topic);
+
+        Map<String, String> config = getBasicTableConfig();
+        config.put("tag.automatic-creation", "watermark");
+        config.put("tag.creation-period", "hourly");
+        config.put("scan.watermark.alignment.group", "alignment-group-1");
+        config.put("scan.watermark.alignment.max-drift", "20 s");
+        config.put("scan.watermark.alignment.update-interval", "1 s");
+
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig).withTableConfig(config).build();
+        runActionWithDefaultEnv(action);
+
+        AbstractFileStoreTable table =
+                (AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
+        while (true) {
+            if (table.snapshotManager().snapshotCount() > 0
+                    && table.snapshotManager().latestSnapshot().watermark()
+                            != -9223372036854775808L) {
+                return;
+            }
+            Thread.sleep(1000);
+        }
+    }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java
index 22688ae67..6d1a2137b 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.action.cdc.pulsar;
 
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -193,4 +195,44 @@ public class PulsarSyncTableActionITCase extends PulsarActionITCaseBase {
                         "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]");
         waitForResult(expected, table, rowType, primaryKeys);
     }
+
+    @Test
+    @Timeout(60)
+    public void testWaterMarkSyncTable() throws Exception {
+        String topic = "watermark";
+        topics = Collections.singletonList(topic);
+        createTopic(topic, 1);
+        sendMessages(topic, getMessages("kafka/canal/table/watermark/canal-data-1.txt"));
+
+        Map<String, String> pulsarConfig = getBasicPulsarConfig();
+        pulsarConfig.put(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            pulsarConfig.put(TOPIC.key(), topic);
+        } else {
+            pulsarConfig.put(TOPIC_PATTERN.key(), "schema_.*");
+        }
+        pulsarConfig.put(VALUE_FORMAT.key(), "canal-json");
+        Map<String, String> config = getBasicTableConfig();
+        config.put("tag.automatic-creation", "watermark");
+        config.put("tag.creation-period", "hourly");
+
+        PulsarSyncTableAction action =
+                syncTableActionBuilder(pulsarConfig)
+                        .withPartitionKeys("pt")
+                        .withPrimaryKeys("pt", "_id")
+                        .withTableConfig(config)
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        AbstractFileStoreTable table =
+                (AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
+        while (true) {
+            if (table.snapshotManager().snapshotCount() > 0
+                    && table.snapshotManager().latestSnapshot().watermark()
+                            != -9223372036854775808L) {
+                return;
+            }
+            Thread.sleep(1000);
+        }
+    }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/watermark/canal-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/watermark/canal-data-1.txt
new file mode 100644
index 000000000..974930b7f
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/watermark/canal-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"pt":"1","_id":"1","v1":"one"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12},"table":"schema_evolution_1","ts":1683006706728,"type":"INSERT"}
+{"data":[{"pt":"1","_id":"2","v1":"two"}],"database":"paimon_sync_table","es":1683006806000,"id":92,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12},"table":"schema_evolution_1","ts":1683011381000,"type":"INSERT"}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/watermark/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/watermark/debezium-data-1.txt
new file mode 100644
index 000000000..b3ff4e23a
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/watermark/debezium-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before": null, "after": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}
+{"before": null, "after": {"id": 102, "name": "car battery", "description": "12V car battery", "weight": 8.1}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/watermark/maxwell-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/watermark/maxwell-data-1.txt
new file mode 100644
index 000000000..58a04a82a
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/watermark/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"primary_key_columns": ["id"]}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/ogg/table/watermark/ogg-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/ogg/table/watermark/ogg-data-1.txt
new file mode 100644
index 000000000..5b6ef1085
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/ogg/table/watermark/ogg-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000143","primary_keys":["id"],"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"op_type":"I", "current_ts":"2020-05-13T13:39:35.766000", "op_ts":"2020-05-13 15:40:06.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000144","primary_keys":["id"],"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}