You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by ke...@apache.org on 2023/12/21 04:32:43 UTC

(incubator-paimon) branch master updated: [cdc] Support 'topic-pattern' for kafka and pulsar cdc (#2544)

This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 79af94db9 [cdc] Support 'topic-pattern' for kafka and pulsar cdc (#2544)
79af94db9 is described below

commit 79af94db99b2702719132210535766c2661e64bc
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Thu Dec 21 12:32:39 2023 +0800

    [cdc] Support 'topic-pattern' for kafka and pulsar cdc (#2544)
---
 docs/content/cdc-ingestion/pulsar-cdc.md                 |  9 +++++++++
 .../shortcodes/generated/kafka_sync_database.html        |  2 +-
 docs/layouts/shortcodes/generated/kafka_sync_table.html  |  2 +-
 .../shortcodes/generated/pulsar_sync_database.html       |  2 +-
 docs/layouts/shortcodes/generated/pulsar_sync_table.html |  2 +-
 .../paimon/flink/action/cdc/CdcActionCommonUtils.java    | 12 ++++++++++++
 .../apache/paimon/flink/action/cdc/SyncJobHandler.java   | 13 +++++++++++--
 .../paimon/flink/action/cdc/kafka/KafkaActionUtils.java  | 16 +++++++++++-----
 .../flink/action/cdc/pulsar/PulsarActionUtils.java       | 14 +++++++++++++-
 .../cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java    | 10 ++++++++--
 .../cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java  |  2 +-
 .../cdc/kafka/KafkaOggSyncDatabaseActionITCase.java      |  2 +-
 .../cdc/pulsar/PulsarSyncDatabaseActionITCase.java       |  9 ++++++++-
 13 files changed, 78 insertions(+), 17 deletions(-)

diff --git a/docs/content/cdc-ingestion/pulsar-cdc.md b/docs/content/cdc-ingestion/pulsar-cdc.md
index b89151358..7999ac16b 100644
--- a/docs/content/cdc-ingestion/pulsar-cdc.md
+++ b/docs/content/cdc-ingestion/pulsar-cdc.md
@@ -257,6 +257,15 @@ There are some useful options to build Flink Pulsar Source, but they are not pro
               like 'topic-1;topic-2'. Note, only one of "topic-pattern" and "topic" can be specified.
           </td>
         </tr>
+        <tr>
+          <td>topic-pattern</td>
+          <td>(none)</td>
+          <td>String</td>
+          <td>The regular expression for a pattern of topic names to read from. All topics with names that match the 
+              specified regular expression will be subscribed by the consumer when the job starts running. Note, only 
+              one of "topic-pattern" and "topic" can be specified.
+          </td>
+        </tr>
         <tr>
           <td>pulsar.startCursor.fromMessageId</td>
           <td>EARLIEST</td>
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index 8f5709bb7..2b2533b93 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -71,7 +71,7 @@ under the License.
     </tr>
     <tr>
         <td><h5>--kafka_conf</h5></td>
-        <td>The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic`, `properties.group.id`,  and `value.format` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a> for a complete list of configurations.</td>
+        <td>The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic/topic-pattern`, `properties.group.id`,  and `value.format` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a> for a complete list of configurations.</td>
     </tr>
     <tr>
         <td><h5>--catalog_conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_table.html b/docs/layouts/shortcodes/generated/kafka_sync_table.html
index ce4207cc8..765aade20 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_table.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_table.html
@@ -67,7 +67,7 @@ under the License.
     </tr>
     <tr>
         <td><h5>--kafka_conf</h5></td>
-        <td>The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic`, `properties.group.id`,  and `value.format` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a> for a complete list of configurations.</td>
+        <td>The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic/topic-pattern`, `properties.group.id`,  and `value.format` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a> for a complete list of configurations.</td>
     </tr>
     <tr>
         <td><h5>--catalog_conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/pulsar_sync_database.html b/docs/layouts/shortcodes/generated/pulsar_sync_database.html
index 4ff120c35..307e1b41c 100644
--- a/docs/layouts/shortcodes/generated/pulsar_sync_database.html
+++ b/docs/layouts/shortcodes/generated/pulsar_sync_database.html
@@ -71,7 +71,7 @@ under the License.
     </tr>
     <tr>
         <td><h5>--pulsar_conf</h5></td>
-        <td>The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl`, and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options">document</a> for a complete list of configurations.</td>
+        <td>The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic/topic-pattern`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl`, and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options">document</a> for a complete list of configurations.</td>
     </tr>
     <tr>
         <td><h5>--catalog_conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/pulsar_sync_table.html b/docs/layouts/shortcodes/generated/pulsar_sync_table.html
index 1399b5355..2fcd9f7eb 100644
--- a/docs/layouts/shortcodes/generated/pulsar_sync_table.html
+++ b/docs/layouts/shortcodes/generated/pulsar_sync_table.html
@@ -67,7 +67,7 @@ under the License.
     </tr>
     <tr>
         <td><h5>--pulsar_conf</h5></td>
-        <td>The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl`, and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options">document</a> for a complete list of configurations.</td>
+        <td>The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic/topic-pattern`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl`, and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options">document</a> for a complete list of configurations.</td>
     </tr>
     <tr>
         <td><h5>--catalog_conf</h5></td>
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index b3fdab390..d4a02ad54 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -312,4 +313,15 @@ public class CdcActionCommonUtils {
                     configOption.key());
         }
     }
+
+    public static void checkOneRequiredOption(
+            Configuration config, String confName, ConfigOption<?>... configOptions) {
+        checkArgument(
+                Arrays.stream(configOptions).filter(config::contains).count() == 1,
+                "%s must and can only set one of the following options: %s.",
+                confName,
+                Arrays.stream(configOptions)
+                        .map(ConfigOption::key)
+                        .collect(Collectors.joining(",")));
+    }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
index e469723b1..60acc5f7e 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
@@ -42,6 +42,7 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.KAFKA_CONF
 import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MONGODB_CONF;
 import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MYSQL_CONF;
 import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PULSAR_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.checkOneRequiredOption;
 import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.checkRequiredOptions;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -115,18 +116,26 @@ public class SyncJobHandler {
                         cdcSourceConfig,
                         KAFKA_CONF,
                         KafkaConnectorOptions.VALUE_FORMAT,
-                        KafkaConnectorOptions.TOPIC,
                         KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS);
+                checkOneRequiredOption(
+                        cdcSourceConfig,
+                        KAFKA_CONF,
+                        KafkaConnectorOptions.TOPIC,
+                        KafkaConnectorOptions.TOPIC_PATTERN);
                 break;
             case PULSAR:
                 checkRequiredOptions(
                         cdcSourceConfig,
                         PULSAR_CONF,
-                        PulsarActionUtils.TOPIC,
                         PulsarActionUtils.VALUE_FORMAT,
                         PulsarOptions.PULSAR_SERVICE_URL,
                         PulsarOptions.PULSAR_ADMIN_URL,
                         PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME);
+                checkOneRequiredOption(
+                        cdcSourceConfig,
+                        PULSAR_CONF,
+                        PulsarActionUtils.TOPIC,
+                        PulsarActionUtils.TOPIC_PATTERN);
                 break;
             case MONGODB:
                 checkRequiredOptions(
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 996289078..e0dcfb5b5 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -51,6 +51,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -67,13 +68,18 @@ public class KafkaActionUtils {
     public static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
         KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
 
-        List<String> topics =
-                kafkaConfig.get(KafkaConnectorOptions.TOPIC).stream()
-                        .flatMap(topic -> Arrays.stream(topic.split(",")))
-                        .collect(Collectors.toList());
+        if (kafkaConfig.contains(KafkaConnectorOptions.TOPIC)) {
+            List<String> topics =
+                    kafkaConfig.get(KafkaConnectorOptions.TOPIC).stream()
+                            .flatMap(topic -> Arrays.stream(topic.split(",")))
+                            .collect(Collectors.toList());
+            kafkaSourceBuilder.setTopics(topics);
+        } else {
+            kafkaSourceBuilder.setTopicPattern(
+                    Pattern.compile(kafkaConfig.get(KafkaConnectorOptions.TOPIC_PATTERN)));
+        }
 
         kafkaSourceBuilder
-                .setTopics(topics)
                 .setValueOnlyDeserializer(new SimpleStringSchema())
                 .setGroupId(kafkaPropertiesGroupId(kafkaConfig));
         Properties properties = createKafkaProperties(kafkaConfig);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
index 0f8a17625..0a8bd7860 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
@@ -79,6 +79,16 @@ public class PulsarActionUtils {
                                     + "by semicolon like 'topic-1;topic-2'. Note, only one of \"topic-pattern\" and \"topic\" "
                                     + "can be specified.");
 
+    public static final ConfigOption<String> TOPIC_PATTERN =
+            ConfigOptions.key("topic-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The regular expression for a pattern of topic names to read from. All topics with names "
+                                    + "that match the specified regular expression will be subscribed by the consumer "
+                                    + "when the job starts running. Note, only one of \"topic-pattern\" and \"topic\" "
+                                    + "can be specified.");
+
     static final ConfigOption<String> PULSAR_START_CURSOR_FROM_MESSAGE_ID =
             ConfigOptions.key("pulsar.startCursor.fromMessageId")
                     .stringType()
@@ -151,9 +161,11 @@ public class PulsarActionUtils {
                 .setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
                 .setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
                 .setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
-                .setTopics(pulsarConfig.get(TOPIC))
                 .setDeserializationSchema(new SimpleStringSchema());
 
+        pulsarConfig.getOptional(TOPIC).ifPresent(pulsarSourceBuilder::setTopics);
+        pulsarConfig.getOptional(TOPIC_PATTERN).ifPresent(pulsarSourceBuilder::setTopicPattern);
+
         // other settings
 
         // consumer name
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index fdc946565..d9530d85a 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -37,6 +37,7 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
 import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
 import static org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
@@ -74,7 +75,12 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
-        kafkaConfig.put(TOPIC.key(), String.join(";", topics));
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            kafkaConfig.put(TOPIC.key(), String.join(";", topics));
+        } else {
+            kafkaConfig.put(TOPIC_PATTERN.key(), "schema_evolution_.+");
+        }
+
         KafkaSyncDatabaseAction action =
                 syncDatabaseActionBuilder(kafkaConfig)
                         .withTableConfig(getBasicTableConfig())
@@ -252,7 +258,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
                 .satisfies(
                         anyCauseMatches(
                                 IllegalArgumentException.class,
-                                "kafka_conf [topic] must be specified."));
+                                "kafka_conf must and can only set one of the following options: topic,topic-pattern."));
     }
 
     @Test
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
index 60ce40773..29ef6451d 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
@@ -214,7 +214,7 @@ public class KafkaMaxwellSyncDatabaseActionITCase extends KafkaActionITCaseBase
                 .satisfies(
                         AssertionUtils.anyCauseMatches(
                                 IllegalArgumentException.class,
-                                "kafka_conf [topic] must be specified."));
+                                "kafka_conf must and can only set one of the following options: topic,topic-pattern."));
     }
 
     @Test
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
index c930e1054..07ea1e89f 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
@@ -215,7 +215,7 @@ public class KafkaOggSyncDatabaseActionITCase extends KafkaActionITCaseBase {
                 .satisfies(
                         AssertionUtils.anyCauseMatches(
                                 IllegalArgumentException.class,
-                                "kafka_conf [topic] must be specified."));
+                                "kafka_conf must and can only set one of the following options: topic,topic-pattern."));
     }
 
     @Test
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
index dbd3db5d5..37e6b66bf 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
@@ -30,9 +30,11 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
 import static org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils.TOPIC;
+import static org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils.TOPIC_PATTERN;
 import static org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils.VALUE_FORMAT;
 
 /** IT cases for {@link PulsarSyncDatabaseAction}. */
@@ -67,7 +69,12 @@ public class PulsarSyncDatabaseActionITCase extends PulsarActionITCaseBase {
         Map<String, String> pulsarConfig = getBasicPulsarConfig();
         pulsarConfig.put(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
         pulsarConfig.put(VALUE_FORMAT.key(), "canal-json");
-        pulsarConfig.put(TOPIC.key(), String.join(";", topics));
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            pulsarConfig.put(TOPIC.key(), String.join(";", topics));
+        } else {
+            pulsarConfig.put(TOPIC_PATTERN.key(), "schema_evolution_.+");
+        }
+
         PulsarSyncDatabaseAction action =
                 syncDatabaseActionBuilder(pulsarConfig)
                         .withTableConfig(getBasicTableConfig())