You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by yu...@apache.org on 2024/01/10 08:02:38 UTC
(incubator-paimon) branch master updated: [flink] Cdc ingestion support warkmark strategy (#2640)
This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7992ee910 [flink] Cdc ingestion support warkmark strategy (#2640)
7992ee910 is described below
commit 7992ee910445c2f9a0466538e3747dcd92b146b5
Author: monster <60...@users.noreply.github.com>
AuthorDate: Wed Jan 10 16:02:32 2024 +0800
[flink] Cdc ingestion support warkmark strategy (#2640)
---
.../org/apache/paimon/utils/JsonSerdeUtil.java | 25 +++++
.../action/cdc/SynchronizationActionBase.java | 37 ++++++-
.../watermark/CdcTimestampExtractorFactory.java | 121 +++++++++++++++++++++
.../action/cdc/watermark/CdcWatermarkStrategy.java | 70 ++++++++++++
.../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 36 ++++++
.../kafka/KafkaDebeziumSyncTableActionITCase.java | 36 ++++++
.../kafka/KafkaMaxwellSyncTableActionITCase.java | 36 ++++++
.../cdc/kafka/KafkaOggSyncTableActionITCase.java | 36 ++++++
.../cdc/pulsar/PulsarSyncTableActionITCase.java | 42 +++++++
.../kafka/canal/table/watermark/canal-data-1.txt | 20 ++++
.../debezium/table/watermark/debezium-data-1.txt | 20 ++++
.../maxwell/table/watermark/maxwell-data-1.txt | 20 ++++
.../kafka/ogg/table/watermark/ogg-data-1.txt | 20 ++++
13 files changed, 518 insertions(+), 1 deletion(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
index 1477d55ce..19a57ac58 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
@@ -208,6 +208,31 @@ public class JsonSerdeUtil {
return clazz.cast(resultNode);
}
+ /** Parses a JSON string and extracts a value of the specified type from the given path keys. */
+ public static <T> T extractValue(String json, Class<T> valueType, String... path)
+ throws JsonProcessingException {
+ JsonNode currentNode = OBJECT_MAPPER_INSTANCE.readTree(json);
+ for (String key : path) {
+ currentNode = currentNode.get(key);
+ if (currentNode == null) {
+ throw new IllegalArgumentException("Invalid path or key not found: " + key);
+ }
+ }
+ return OBJECT_MAPPER_INSTANCE.treeToValue(currentNode, valueType);
+ }
+
+ /** Checks if a specified node exists in a JSON string. */
+ public static boolean isNodeExists(String json, String... path) throws JsonProcessingException {
+ JsonNode currentNode = OBJECT_MAPPER_INSTANCE.readTree(json);
+ for (String key : path) {
+ currentNode = currentNode.get(key);
+ if (currentNode == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
public static boolean isNull(JsonNode jsonNode) {
return jsonNode == null || jsonNode.isNull();
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index 3cc77e6f5..6d5344f5c 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -18,11 +18,14 @@
package org.apache.paimon.flink.action.cdc;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.options.Options;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -32,11 +35,20 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import static org.apache.paimon.CoreOptions.TagCreationMode.WATERMARK;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
+import static org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.createExtractor;
+
/** Base {@link Action} for table/database synchronizing job. */
public abstract class SynchronizationActionBase extends ActionBase {
@@ -123,9 +135,32 @@ public abstract class SynchronizationActionBase extends ActionBase {
private DataStreamSource<String> buildDataStreamSource(Object source) {
if (source instanceof Source) {
+ boolean isAutomaticWatermarkCreationEnabled =
+ tableConfig.containsKey(CoreOptions.TAG_AUTOMATIC_CREATION.key())
+ && Objects.equals(
+ tableConfig.get(CoreOptions.TAG_AUTOMATIC_CREATION.key()),
+ WATERMARK.toString());
+
+ Options options = Options.fromMap(tableConfig);
+ Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
+ String watermarkAlignGroup = options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
+ WatermarkStrategy<String> watermarkStrategy =
+ isAutomaticWatermarkCreationEnabled
+ ? watermarkAlignGroup != null
+ ? new CdcWatermarkStrategy(createExtractor(source))
+ .withWatermarkAlignment(
+ watermarkAlignGroup,
+ options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
+ options.get(
+ SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL))
+ : new CdcWatermarkStrategy(createExtractor(source))
+ : WatermarkStrategy.noWatermarks();
+ if (idleTimeout != null) {
+ watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
+ }
return env.fromSource(
(Source<String, ?, ?>) source,
- WatermarkStrategy.noWatermarks(),
+ watermarkStrategy,
syncJobHandler.provideSourceName());
}
if (source instanceof SourceFunction) {
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
new file mode 100644
index 000000000..0e91353f9
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.watermark;
+
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+
+import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/** Factory for creating CDC timestamp extractors based on different source types. */
+public class CdcTimestampExtractorFactory implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Map<Class<?>, Supplier<CdcTimestampExtractor>> extractorMap =
+ new HashMap<>();
+
+ static {
+ extractorMap.put(MongoDBSource.class, MongoDBCdcTimestampExtractor::new);
+ extractorMap.put(MySqlSource.class, MysqlCdcTimestampExtractor::new);
+ extractorMap.put(PulsarSource.class, MessageQueueCdcTimestampExtractor::new);
+ extractorMap.put(KafkaSource.class, MessageQueueCdcTimestampExtractor::new);
+ }
+
+ public static CdcTimestampExtractor createExtractor(Object source) {
+ Supplier<CdcTimestampExtractor> extractorSupplier = extractorMap.get(source.getClass());
+ if (extractorSupplier != null) {
+ return extractorSupplier.get();
+ }
+ throw new IllegalArgumentException(
+ "Unsupported source type: " + source.getClass().getName());
+ }
+
+ /** Timestamp extractor for MongoDB sources in CDC applications. */
+ public static class MongoDBCdcTimestampExtractor implements CdcTimestampExtractor {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long extractTimestamp(String record) throws JsonProcessingException {
+ return JsonSerdeUtil.extractValue(record, Long.class, "payload", "ts_ms");
+ }
+ }
+
+ /** Timestamp extractor for Kafka/Pulsar sources in CDC applications. */
+ public static class MessageQueueCdcTimestampExtractor implements CdcTimestampExtractor {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long extractTimestamp(String record) throws JsonProcessingException {
+ if (JsonSerdeUtil.isNodeExists(record, "mysqlType")) {
+ // Canal json
+ return JsonSerdeUtil.extractValue(record, Long.class, "ts");
+ } else if (JsonSerdeUtil.isNodeExists(record, "pos")) {
+ // Ogg json
+ String dateTimeString = JsonSerdeUtil.extractValue(record, String.class, "op_ts");
+ DateTimeFormatter formatter =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+ LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
+ return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+ } else if (JsonSerdeUtil.isNodeExists(record, "xid")) {
+ // Maxwell json
+ return JsonSerdeUtil.extractValue(record, Long.class, "ts") * 1000;
+ } else if (JsonSerdeUtil.isNodeExists(record, "payload", "source", "connector")) {
+ // Dbz json
+ return JsonSerdeUtil.extractValue(record, Long.class, "payload", "ts_ms");
+ } else if (JsonSerdeUtil.isNodeExists(record, "source", "connector")) {
+ // Dbz json
+ return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms");
+ }
+ throw new RuntimeException(
+ String.format(
+ "Failed to extract timestamp: The JSON format of the message queue is unsupported. Record details: %s",
+ record));
+ }
+ }
+
+ /** Timestamp extractor for MySQL sources in CDC applications. */
+ public static class MysqlCdcTimestampExtractor implements CdcTimestampExtractor {
+
+ @Override
+ public long extractTimestamp(String record) throws JsonProcessingException {
+ return JsonSerdeUtil.extractValue(record, Long.class, "source", "ts_ms");
+ }
+ }
+
+ /** Interface defining the contract for CDC timestamp extraction. */
+ public interface CdcTimestampExtractor extends Serializable {
+
+ long extractTimestamp(String record) throws JsonProcessingException;
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
new file mode 100644
index 000000000..218d45d63
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.watermark;
+
+import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.CdcTimestampExtractor;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+
+/**
+ * Watermark strategy for CDC sources, generating watermarks based on timestamps extracted from
+ * records.
+ */
+public class CdcWatermarkStrategy implements WatermarkStrategy<String> {
+
+ private final CdcTimestampExtractor timestampExtractor;
+ private static final long serialVersionUID = 1L;
+ private long currentMaxTimestamp;
+
+ public CdcWatermarkStrategy(CdcTimestampExtractor extractor) {
+ this.timestampExtractor = extractor;
+ }
+
+ @Override
+ public WatermarkGenerator<String> createWatermarkGenerator(
+ WatermarkGeneratorSupplier.Context context) {
+ return new WatermarkGenerator<String>() {
+
+ @Override
+ public void onEvent(String record, long timestamp, WatermarkOutput output) {
+ long tMs;
+ try {
+ tMs = timestampExtractor.extractTimestamp(record);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ currentMaxTimestamp = Math.max(currentMaxTimestamp, tMs);
+ output.emitWatermark(new Watermark(currentMaxTimestamp - 1));
+ }
+
+ @Override
+ public void onPeriodicEmit(WatermarkOutput output) {
+ long timeMillis = System.currentTimeMillis();
+ currentMaxTimestamp = Math.max(timeMillis, currentMaxTimestamp);
+ output.emitWatermark(new Watermark(currentMaxTimestamp - 1));
+ }
+ };
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index ed8930d41..e81ed986d 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -20,6 +20,8 @@ package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -1241,4 +1243,38 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
rowType,
Collections.singletonList("_id"));
}
+
+ @Test
+ @Timeout(60)
+ public void testWaterMarkSyncTable() throws Exception {
+ String topic = "watermark";
+ createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic, readLines("kafka/canal/table/watermark/canal-data-1.txt"));
+
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+
+ Map<String, String> config = getBasicTableConfig();
+ config.put("tag.automatic-creation", "watermark");
+ config.put("tag.creation-period", "hourly");
+ config.put("scan.watermark.alignment.group", "alignment-group-1");
+ config.put("scan.watermark.alignment.max-drift", "20 s");
+ config.put("scan.watermark.alignment.update-interval", "1 s");
+
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig).withTableConfig(config).build();
+ runActionWithDefaultEnv(action);
+
+ AbstractFileStoreTable table =
+ (AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
+ while (true) {
+ if (table.snapshotManager().snapshotCount() > 0
+ && table.snapshotManager().latestSnapshot().watermark()
+ != -9223372036854775808L) {
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
index 3cbe76053..d4d69e6e8 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.action.cdc.kafka;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -218,4 +220,38 @@ public class KafkaDebeziumSyncTableActionITCase extends KafkaActionITCaseBase {
Collections.singletonList("+I[101, hammer, {\"row_key\":\"value\"}]");
waitForResult(expected, table, rowType, primaryKeys);
}
+
+ @Test
+ @Timeout(60)
+ public void testWaterMarkSyncTable() throws Exception {
+ String topic = "watermark";
+ createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic, readLines("kafka/debezium/table/watermark/debezium-data-1.txt"));
+
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+
+ Map<String, String> config = getBasicTableConfig();
+ config.put("tag.automatic-creation", "watermark");
+ config.put("tag.creation-period", "hourly");
+ config.put("scan.watermark.alignment.group", "alignment-group-1");
+ config.put("scan.watermark.alignment.max-drift", "20 s");
+ config.put("scan.watermark.alignment.update-interval", "1 s");
+
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig).withTableConfig(config).build();
+ runActionWithDefaultEnv(action);
+
+ AbstractFileStoreTable table =
+ (AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
+ while (true) {
+ if (table.snapshotManager().snapshotCount() > 0
+ && table.snapshotManager().latestSnapshot().watermark()
+ != -9223372036854775808L) {
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
index 6a8605081..856f66a1e 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.action.cdc.kafka;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -502,4 +504,38 @@ public class KafkaMaxwellSyncTableActionITCase extends KafkaActionITCaseBase {
rowType,
Arrays.asList("_id", "_year"));
}
+
+ @Test
+ @Timeout(60)
+ public void testWaterMarkSyncTable() throws Exception {
+ String topic = "watermark";
+ createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic, readLines("kafka/maxwell/table/watermark/maxwell-data-1.txt"));
+
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+
+ Map<String, String> config = getBasicTableConfig();
+ config.put("tag.automatic-creation", "watermark");
+ config.put("tag.creation-period", "hourly");
+ config.put("scan.watermark.alignment.group", "alignment-group-1");
+ config.put("scan.watermark.alignment.max-drift", "20 s");
+ config.put("scan.watermark.alignment.update-interval", "1 s");
+
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig).withTableConfig(config).build();
+ runActionWithDefaultEnv(action);
+
+ AbstractFileStoreTable table =
+ (AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
+ while (true) {
+ if (table.snapshotManager().snapshotCount() > 0
+ && table.snapshotManager().latestSnapshot().watermark()
+ != -9223372036854775808L) {
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
index 3ca5504cc..1c4497021 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.action.cdc.kafka;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -558,4 +560,38 @@ public class KafkaOggSyncTableActionITCase extends KafkaActionITCaseBase {
"+I[102, car battery, 12V car battery, 8.100000381469727]");
waitForResult(expectedReplace, table, rowType, primaryKeys);
}
+
+ @Test
+ @Timeout(60)
+ public void testWaterMarkSyncTable() throws Exception {
+ String topic = "watermark";
+ createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic, readLines("kafka/ogg/table/watermark/ogg-data-1.txt"));
+
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+
+ Map<String, String> config = getBasicTableConfig();
+ config.put("tag.automatic-creation", "watermark");
+ config.put("tag.creation-period", "hourly");
+ config.put("scan.watermark.alignment.group", "alignment-group-1");
+ config.put("scan.watermark.alignment.max-drift", "20 s");
+ config.put("scan.watermark.alignment.update-interval", "1 s");
+
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig).withTableConfig(config).build();
+ runActionWithDefaultEnv(action);
+
+ AbstractFileStoreTable table =
+ (AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
+ while (true) {
+ if (table.snapshotManager().snapshotCount() > 0
+ && table.snapshotManager().latestSnapshot().watermark()
+ != -9223372036854775808L) {
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java
index 22688ae67..6d1a2137b 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.action.cdc.pulsar;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -193,4 +195,44 @@ public class PulsarSyncTableActionITCase extends PulsarActionITCaseBase {
"+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]");
waitForResult(expected, table, rowType, primaryKeys);
}
+
+ @Test
+ @Timeout(60)
+ public void testWaterMarkSyncTable() throws Exception {
+ String topic = "watermark";
+ topics = Collections.singletonList(topic);
+ createTopic(topic, 1);
+ sendMessages(topic, getMessages("kafka/canal/table/watermark/canal-data-1.txt"));
+
+ Map<String, String> pulsarConfig = getBasicPulsarConfig();
+ pulsarConfig.put(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ pulsarConfig.put(TOPIC.key(), topic);
+ } else {
+ pulsarConfig.put(TOPIC_PATTERN.key(), "schema_.*");
+ }
+ pulsarConfig.put(VALUE_FORMAT.key(), "canal-json");
+ Map<String, String> config = getBasicTableConfig();
+ config.put("tag.automatic-creation", "watermark");
+ config.put("tag.creation-period", "hourly");
+
+ PulsarSyncTableAction action =
+ syncTableActionBuilder(pulsarConfig)
+ .withPartitionKeys("pt")
+ .withPrimaryKeys("pt", "_id")
+ .withTableConfig(config)
+ .build();
+ runActionWithDefaultEnv(action);
+
+ AbstractFileStoreTable table =
+ (AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
+ while (true) {
+ if (table.snapshotManager().snapshotCount() > 0
+ && table.snapshotManager().latestSnapshot().watermark()
+ != -9223372036854775808L) {
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/watermark/canal-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/watermark/canal-data-1.txt
new file mode 100644
index 000000000..974930b7f
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/watermark/canal-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"pt":"1","_id":"1","v1":"one"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12},"table":"schema_evolution_1","ts":1683006706728,"type":"INSERT"}
+{"data":[{"pt":"1","_id":"2","v1":"two"}],"database":"paimon_sync_table","es":1683006806000,"id":92,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12},"table":"schema_evolution_1","ts":1683011381000,"type":"INSERT"}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/watermark/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/watermark/debezium-data-1.txt
new file mode 100644
index 000000000..b3ff4e23a
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/watermark/debezium-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before": null, "after": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}
+{"before": null, "after": {"id": 102, "name": "car battery", "description": "12V car battery", "weight": 8.1}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/watermark/maxwell-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/watermark/maxwell-data-1.txt
new file mode 100644
index 000000000..58a04a82a
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/watermark/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"primary_key_columns": ["id"]}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/ogg/table/watermark/ogg-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/ogg/table/watermark/ogg-data-1.txt
new file mode 100644
index 000000000..5b6ef1085
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/ogg/table/watermark/ogg-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000143","primary_keys":["id"],"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"op_type":"I", "current_ts":"2020-05-13T13:39:35.766000", "op_ts":"2020-05-13 15:40:06.000000"}
+{"table":"OGG.TBL_TEST","pos":"00000000000000000000144","primary_keys":["id"],"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}