You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by ke...@apache.org on 2023/12/21 04:32:43 UTC
(incubator-paimon) branch master updated: [cdc] Support 'topic-pattern' for kafka and pulsar cdc (#2544)
This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 79af94db9 [cdc] Support 'topic-pattern' for kafka and pulsar cdc (#2544)
79af94db9 is described below
commit 79af94db99b2702719132210535766c2661e64bc
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Thu Dec 21 12:32:39 2023 +0800
[cdc] Support 'topic-pattern' for kafka and pulsar cdc (#2544)
---
docs/content/cdc-ingestion/pulsar-cdc.md | 9 +++++++++
.../shortcodes/generated/kafka_sync_database.html | 2 +-
docs/layouts/shortcodes/generated/kafka_sync_table.html | 2 +-
.../shortcodes/generated/pulsar_sync_database.html | 2 +-
docs/layouts/shortcodes/generated/pulsar_sync_table.html | 2 +-
.../paimon/flink/action/cdc/CdcActionCommonUtils.java | 12 ++++++++++++
.../apache/paimon/flink/action/cdc/SyncJobHandler.java | 13 +++++++++++--
.../paimon/flink/action/cdc/kafka/KafkaActionUtils.java | 16 +++++++++++-----
.../flink/action/cdc/pulsar/PulsarActionUtils.java | 14 +++++++++++++-
.../cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java | 10 ++++++++--
.../cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java | 2 +-
.../cdc/kafka/KafkaOggSyncDatabaseActionITCase.java | 2 +-
.../cdc/pulsar/PulsarSyncDatabaseActionITCase.java | 9 ++++++++-
13 files changed, 78 insertions(+), 17 deletions(-)
diff --git a/docs/content/cdc-ingestion/pulsar-cdc.md b/docs/content/cdc-ingestion/pulsar-cdc.md
index b89151358..7999ac16b 100644
--- a/docs/content/cdc-ingestion/pulsar-cdc.md
+++ b/docs/content/cdc-ingestion/pulsar-cdc.md
@@ -257,6 +257,15 @@ There are some useful options to build Flink Pulsar Source, but they are not pro
like 'topic-1;topic-2'. Note, only one of "topic-pattern" and "topic" can be specified.
</td>
</tr>
+ <tr>
+ <td>topic-pattern</td>
+ <td>(none)</td>
+ <td>String</td>
+ <td>The regular expression for a pattern of topic names to read from. All topics with names that match the
+ specified regular expression will be subscribed by the consumer when the job starts running. Note, only
+ one of "topic-pattern" and "topic" can be specified.
+ </td>
+ </tr>
<tr>
<td>pulsar.startCursor.fromMessageId</td>
<td>EARLIEST</td>
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index 8f5709bb7..2b2533b93 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -71,7 +71,7 @@ under the License.
</tr>
<tr>
<td><h5>--kafka_conf</h5></td>
- <td>The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic`, `properties.group.id`, and `value.format` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a> for a complete list of configurations.</td>
+ <td>The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic/topic-pattern`, `properties.group.id`, and `value.format` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a> for a complete list of configurations.</td>
</tr>
<tr>
<td><h5>--catalog_conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_table.html b/docs/layouts/shortcodes/generated/kafka_sync_table.html
index ce4207cc8..765aade20 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_table.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_table.html
@@ -67,7 +67,7 @@ under the License.
</tr>
<tr>
<td><h5>--kafka_conf</h5></td>
- <td>The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic`, `properties.group.id`, and `value.format` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a> for a complete list of configurations.</td>
+ <td>The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic/topic-pattern`, `properties.group.id`, and `value.format` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a> for a complete list of configurations.</td>
</tr>
<tr>
<td><h5>--catalog_conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/pulsar_sync_database.html b/docs/layouts/shortcodes/generated/pulsar_sync_database.html
index 4ff120c35..307e1b41c 100644
--- a/docs/layouts/shortcodes/generated/pulsar_sync_database.html
+++ b/docs/layouts/shortcodes/generated/pulsar_sync_database.html
@@ -71,7 +71,7 @@ under the License.
</tr>
<tr>
<td><h5>--pulsar_conf</h5></td>
- <td>The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl`, and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options">document</a> for a complete list of configurations.</td>
+ <td>The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic/topic-pattern`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl`, and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options">document</a> for a complete list of configurations.</td>
</tr>
<tr>
<td><h5>--catalog_conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/pulsar_sync_table.html b/docs/layouts/shortcodes/generated/pulsar_sync_table.html
index 1399b5355..2fcd9f7eb 100644
--- a/docs/layouts/shortcodes/generated/pulsar_sync_table.html
+++ b/docs/layouts/shortcodes/generated/pulsar_sync_table.html
@@ -67,7 +67,7 @@ under the License.
</tr>
<tr>
<td><h5>--pulsar_conf</h5></td>
- <td>The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl`, and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options">document</a> for a complete list of configurations.</td>
+ <td>The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic/topic-pattern`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl`, and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options">document</a> for a complete list of configurations.</td>
</tr>
<tr>
<td><h5>--catalog_conf</h5></td>
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index b3fdab390..d4a02ad54 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -312,4 +313,15 @@ public class CdcActionCommonUtils {
configOption.key());
}
}
+
+ public static void checkOneRequiredOption(
+ Configuration config, String confName, ConfigOption<?>... configOptions) {
+ checkArgument(
+ Arrays.stream(configOptions).filter(config::contains).count() == 1,
+ "%s must and can only set one of the following options: %s.",
+ confName,
+ Arrays.stream(configOptions)
+ .map(ConfigOption::key)
+ .collect(Collectors.joining(",")));
+ }
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
index e469723b1..60acc5f7e 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
@@ -42,6 +42,7 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.KAFKA_CONF
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MONGODB_CONF;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MYSQL_CONF;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PULSAR_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.checkOneRequiredOption;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.checkRequiredOptions;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -115,18 +116,26 @@ public class SyncJobHandler {
cdcSourceConfig,
KAFKA_CONF,
KafkaConnectorOptions.VALUE_FORMAT,
- KafkaConnectorOptions.TOPIC,
KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS);
+ checkOneRequiredOption(
+ cdcSourceConfig,
+ KAFKA_CONF,
+ KafkaConnectorOptions.TOPIC,
+ KafkaConnectorOptions.TOPIC_PATTERN);
break;
case PULSAR:
checkRequiredOptions(
cdcSourceConfig,
PULSAR_CONF,
- PulsarActionUtils.TOPIC,
PulsarActionUtils.VALUE_FORMAT,
PulsarOptions.PULSAR_SERVICE_URL,
PulsarOptions.PULSAR_ADMIN_URL,
PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME);
+ checkOneRequiredOption(
+ cdcSourceConfig,
+ PULSAR_CONF,
+ PulsarActionUtils.TOPIC,
+ PulsarActionUtils.TOPIC_PATTERN);
break;
case MONGODB:
checkRequiredOptions(
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 996289078..e0dcfb5b5 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -51,6 +51,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -67,13 +68,18 @@ public class KafkaActionUtils {
public static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
- List<String> topics =
- kafkaConfig.get(KafkaConnectorOptions.TOPIC).stream()
- .flatMap(topic -> Arrays.stream(topic.split(",")))
- .collect(Collectors.toList());
+ if (kafkaConfig.contains(KafkaConnectorOptions.TOPIC)) {
+ List<String> topics =
+ kafkaConfig.get(KafkaConnectorOptions.TOPIC).stream()
+ .flatMap(topic -> Arrays.stream(topic.split(",")))
+ .collect(Collectors.toList());
+ kafkaSourceBuilder.setTopics(topics);
+ } else {
+ kafkaSourceBuilder.setTopicPattern(
+ Pattern.compile(kafkaConfig.get(KafkaConnectorOptions.TOPIC_PATTERN)));
+ }
kafkaSourceBuilder
- .setTopics(topics)
.setValueOnlyDeserializer(new SimpleStringSchema())
.setGroupId(kafkaPropertiesGroupId(kafkaConfig));
Properties properties = createKafkaProperties(kafkaConfig);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
index 0f8a17625..0a8bd7860 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
@@ -79,6 +79,16 @@ public class PulsarActionUtils {
+ "by semicolon like 'topic-1;topic-2'. Note, only one of \"topic-pattern\" and \"topic\" "
+ "can be specified.");
+ public static final ConfigOption<String> TOPIC_PATTERN =
+ ConfigOptions.key("topic-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The regular expression for a pattern of topic names to read from. All topics with names "
+ + "that match the specified regular expression will be subscribed by the consumer "
+ + "when the job starts running. Note, only one of \"topic-pattern\" and \"topic\" "
+ + "can be specified.");
+
static final ConfigOption<String> PULSAR_START_CURSOR_FROM_MESSAGE_ID =
ConfigOptions.key("pulsar.startCursor.fromMessageId")
.stringType()
@@ -151,9 +161,11 @@ public class PulsarActionUtils {
.setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
.setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
.setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
- .setTopics(pulsarConfig.get(TOPIC))
.setDeserializationSchema(new SimpleStringSchema());
+ pulsarConfig.getOptional(TOPIC).ifPresent(pulsarSourceBuilder::setTopics);
+ pulsarConfig.getOptional(TOPIC_PATTERN).ifPresent(pulsarSourceBuilder::setTopicPattern);
+
// other settings
// consumer name
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index fdc946565..d9530d85a 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -37,6 +37,7 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
@@ -74,7 +75,12 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
- kafkaConfig.put(TOPIC.key(), String.join(";", topics));
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
+ } else {
+ kafkaConfig.put(TOPIC_PATTERN.key(), "schema_evolution_.+");
+ }
+
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTableConfig(getBasicTableConfig())
@@ -252,7 +258,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
- "kafka_conf [topic] must be specified."));
+ "kafka_conf must and can only set one of the following options: topic,topic-pattern."));
}
@Test
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
index 60ce40773..29ef6451d 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
@@ -214,7 +214,7 @@ public class KafkaMaxwellSyncDatabaseActionITCase extends KafkaActionITCaseBase
.satisfies(
AssertionUtils.anyCauseMatches(
IllegalArgumentException.class,
- "kafka_conf [topic] must be specified."));
+ "kafka_conf must and can only set one of the following options: topic,topic-pattern."));
}
@Test
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
index c930e1054..07ea1e89f 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
@@ -215,7 +215,7 @@ public class KafkaOggSyncDatabaseActionITCase extends KafkaActionITCaseBase {
.satisfies(
AssertionUtils.anyCauseMatches(
IllegalArgumentException.class,
- "kafka_conf [topic] must be specified."));
+ "kafka_conf must and can only set one of the following options: topic,topic-pattern."));
}
@Test
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
index dbd3db5d5..37e6b66bf 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
@@ -30,9 +30,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
import static org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils.TOPIC;
+import static org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils.TOPIC_PATTERN;
import static org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils.VALUE_FORMAT;
/** IT cases for {@link PulsarSyncDatabaseAction}. */
@@ -67,7 +69,12 @@ public class PulsarSyncDatabaseActionITCase extends PulsarActionITCaseBase {
Map<String, String> pulsarConfig = getBasicPulsarConfig();
pulsarConfig.put(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
pulsarConfig.put(VALUE_FORMAT.key(), "canal-json");
- pulsarConfig.put(TOPIC.key(), String.join(";", topics));
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ pulsarConfig.put(TOPIC.key(), String.join(";", topics));
+ } else {
+ pulsarConfig.put(TOPIC_PATTERN.key(), "schema_evolution_.+");
+ }
+
PulsarSyncDatabaseAction action =
syncDatabaseActionBuilder(pulsarConfig)
.withTableConfig(getBasicTableConfig())