You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/25 17:10:20 UTC

[pulsar] branch branch-2.9 updated: [improve][connector] add reader config to `pulsar-io-debezium` and `pulsar-io-kafka-connect-adaptor` (#16675)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new a16f03e93a7 [improve][connector] add reader config to `pulsar-io-debezium` and `pulsar-io-kafka-connect-adaptor` (#16675)
a16f03e93a7 is described below

commit a16f03e93a75b597f926d0beddf08cf1fedb948d
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Mon Jul 25 16:44:41 2022 +0000

    [improve][connector] add reader config to `pulsar-io-debezium` and `pulsar-io-kafka-connect-adaptor` (#16675)
    
    (cherry picked from commit 9c199761387dcad97316f482b9ceeaf1980a3334)
---
 .../org/apache/pulsar/io/common/IOConfigUtils.java | 24 ++++++++---
 .../pulsar/io/debezium/PulsarDatabaseHistory.java  | 48 ++++++++++++++++------
 .../io/debezium/PulsarDatabaseHistoryTest.java     | 30 ++++++++++++--
 pulsar-io/kafka-connect-adaptor/pom.xml            |  6 +++
 .../io/kafka/connect/PulsarKafkaWorkerConfig.java  | 15 ++++++-
 .../io/kafka/connect/PulsarOffsetBackingStore.java | 11 +++++
 .../connect/PulsarOffsetBackingStoreTest.java      | 24 +++++++++--
 7 files changed, 133 insertions(+), 25 deletions(-)

diff --git a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
index efb72e82567..bfa3cfd1cdb 100644
--- a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
+++ b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
@@ -18,18 +18,21 @@
  */
 package org.apache.pulsar.io.common;
 
+import static org.apache.commons.lang.StringUtils.isBlank;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.util.Reflections;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.core.SourceContext;
-import org.apache.pulsar.io.core.annotations.FieldDoc;
-
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Function;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.Reflections;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Slf4j
 public class IOConfigUtils {
@@ -41,6 +44,15 @@ public class IOConfigUtils {
         return loadWithSecrets(map, clazz, secretName -> sinkContext.getSecret(secretName));
     }
 
+    public static Map<String, Object> loadConfigFromJsonString(String config) throws JsonProcessingException {
+        if (!isBlank(config)) {
+            ObjectMapper mapper = new ObjectMapper();
+            return mapper.readValue(config, new TypeReference<Map<String, Object>>() {
+            });
+        } else {
+            return Collections.emptyMap();
+        }
+    }
 
     private static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz, Function<String, String> secretsGetter) {
         Map<String, Object> configs = new HashMap<>(map);
diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index c97e101a397..00a0408873f 100644
--- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -19,6 +19,9 @@
 package org.apache.pulsar.io.debezium;
 
 import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.pulsar.io.common.IOConfigUtils.loadConfigFromJsonString;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
 import io.debezium.annotation.ThreadSafe;
 import io.debezium.config.Configuration;
 import io.debezium.config.Field;
@@ -30,6 +33,8 @@ import io.debezium.relational.history.DatabaseHistoryListener;
 import io.debezium.relational.history.HistoryRecord;
 import io.debezium.relational.history.HistoryRecordComparator;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 import java.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
@@ -77,14 +82,26 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
         .withDescription("Pulsar client builder")
         .withValidation(Field::isOptional);
 
-    public static Field.Set ALL_FIELDS = Field.setOf(
+    public static final Field READER_CONFIG = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.reader.config")
+            .withDisplayName("Extra configs of the reader")
+            .withType(Type.STRING)
+            .withWidth(Width.LONG)
+            .withImportance(Importance.HIGH)
+            .withDescription("The configs of the reader for the database schema history topic, "
+                    + "in the form of a JSON string with key-value pairs")
+            .withDefault((String) null)
+            .withValidation(Field::isOptional);
+
+    public static final Field.Set ALL_FIELDS = Field.setOf(
         TOPIC,
         SERVICE_URL,
         CLIENT_BUILDER,
-        DatabaseHistory.NAME);
+        DatabaseHistory.NAME,
+        READER_CONFIG);
 
     private final DocumentReader reader = DocumentReader.defaultReader();
     private String topicName;
+    private Map<String, Object> readerConfigMap = new HashMap<>();
     private String dbHistoryName;
     private ClientBuilder clientBuilder;
     private volatile PulsarClient pulsarClient;
@@ -102,6 +119,12 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
                 + getClass().getSimpleName() + "; check the logs for details");
         }
         this.topicName = config.getString(TOPIC);
+        try {
+            this.readerConfigMap = loadConfigFromJsonString(config.getString(READER_CONFIG));
+        } catch (JsonProcessingException exception) {
+            log.warn("The provided reader configs are invalid, "
+                    + "will not passing any extra config to the reader builder.", exception);
+        }
 
         String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
         if (isBlank(clientBuilderBase64Encoded) && isBlank(config.getString(SERVICE_URL))) {
@@ -209,11 +232,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
     @Override
     protected void recoverRecords(Consumer<HistoryRecord> records) {
         setupClientIfNeeded();
-        try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
-                .topic(topicName)
-                .startMessageId(MessageId.earliest)
-            .create()
-        ) {
+        try (Reader<String> historyReader = createHistoryReader()) {
             log.info("Scanning the database history topic '{}'", topicName);
 
             // Read all messages in the topic ...
@@ -256,11 +275,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
     @Override
     public boolean exists() {
         setupClientIfNeeded();
-        try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
-                .topic(topicName)
-                .startMessageId(MessageId.earliest)
-            .create()
-        ) {
+        try (Reader<String> historyReader = createHistoryReader()) {
             return historyReader.hasMessageAvailable();
         } catch (IOException e) {
             log.error("Encountered issues on checking existence of database history", e);
@@ -280,4 +295,13 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
         }
         return "Pulsar topic";
     }
+
+    @VisibleForTesting
+    Reader<String> createHistoryReader() throws PulsarClientException {
+        return pulsarClient.newReader(Schema.STRING)
+                .topic(topicName)
+                .startMessageId(MessageId.earliest)
+                .loadConf(readerConfigMap)
+                .create();
+    }
 }
diff --git a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
index 04334da5e43..081cfdcc543 100644
--- a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
+++ b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.debezium;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import io.debezium.config.Configuration;
 import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
@@ -34,12 +35,14 @@ import io.debezium.util.Collect;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectOutputStream;
 import java.util.Base64;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -74,7 +77,7 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
-    private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder) throws Exception {
+    private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder, boolean testWithReaderConfig) throws Exception {
         Configuration.Builder configBuidler = Configuration.create()
                 .with(PulsarDatabaseHistory.TOPIC, topicName)
                 .with(DatabaseHistory.NAME, "my-db-history")
@@ -93,6 +96,10 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
             configBuidler.with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString());
         }
 
+        if (testWithReaderConfig) {
+            configBuidler.with(PulsarDatabaseHistory.READER_CONFIG, "{\"subscriptionName\":\"my-subscription\"}");
+        }
+
         // Start up the history ...
         history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true);
         history.start();
@@ -122,8 +129,8 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
         // Now record schema changes, which writes out to kafka but doesn't actually change the Tables ...
         setLogPosition(10);
         ddl = "CREATE TABLE foo ( first VARCHAR(22) NOT NULL ); \n" +
-            "CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
-            "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL ); \n";
+                "CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
+                "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL ); \n";
         history.record(source, position, "db1", ddl);
 
         // Parse the DDL statement 3x and each time update a different Tables object ...
@@ -181,6 +188,10 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
         assertEquals(recoveredTables, tables3);
     }
 
+    private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder) throws Exception {
+        testHistoryTopicContent(skipUnparseableDDL, testWithClientBuilder, false);
+    }
+
     protected void setLogPosition(int index) {
         this.position = Collect.hashMapOf("filename", "my-txn-file.log",
             "position", index);
@@ -239,4 +250,17 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
         // dummytopic should not exist yet
         assertFalse(history.exists());
     }
+
+    @Test
+    public void testSubscriptionName() throws Exception {
+        testHistoryTopicContent(true, false, true);
+        assertTrue(history.exists());
+        try (Reader<String> ignored = history.createHistoryReader()) {
+            List<String> subscriptions = admin.topics().getSubscriptions(topicName);
+            assertEquals(subscriptions.size(), 1);
+            assertTrue(subscriptions.contains("my-subscription"));
+        } catch (Exception e) {
+            fail("Failed to create history reader");
+        }
+    }
 }
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index f43933dfc83..334ba9255c4 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -38,6 +38,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_${scala.binary.version}</artifactId>
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
index a6cc72517fe..bf66b6fc98a 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
@@ -43,6 +43,14 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
     public static final String TOPIC_NAMESPACE_CONFIG = "topic.namespace";
     private static final String TOPIC_NAMESPACE_CONFIG_DOC = "namespace of topic name to store the output topics";
 
+    /**
+     * <code>offset.storage.reader.config</code>.
+     */
+    public static final String OFFSET_STORAGE_READER_CONFIG = "offset.storage.reader.config";
+    private static final String OFFSET_STORAGE_READER_CONFIG_DOC = "The configs of the reader for the "
+            + "kafka connector offsets topic, in the form of a JSON string with key-value pairs";
+
+
     static {
         CONFIG = new ConfigDef()
             .define(OFFSET_STORAGE_TOPIC_CONFIG,
@@ -53,7 +61,12 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
                 Type.STRING,
                 "public/default",
                 Importance.HIGH,
-                TOPIC_NAMESPACE_CONFIG_DOC);
+                TOPIC_NAMESPACE_CONFIG_DOC)
+            .define(OFFSET_STORAGE_READER_CONFIG,
+                    Type.STRING,
+                    null,
+                    Importance.HIGH,
+                    OFFSET_STORAGE_READER_CONFIG_DOC);
     }
 
     public PulsarKafkaWorkerConfig(Map<String, String> props) {
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index 86905ad8990..d2f5aeef72a 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.io.kafka.connect;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.pulsar.io.common.IOConfigUtils.loadConfigFromJsonString;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
@@ -54,6 +56,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
     private final Map<ByteBuffer, ByteBuffer> data = new ConcurrentHashMap<>();
     private PulsarClient client;
     private String topic;
+    private Map<String, Object> readerConfigMap = new HashMap<>();
     private Producer<byte[]> producer;
     private Reader<byte[]> reader;
     private volatile CompletableFuture<Void> outstandingReadToEnd = null;
@@ -67,6 +70,13 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
     public void configure(WorkerConfig workerConfig) {
         this.topic = workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG);
         checkArgument(!isBlank(topic), "Offset storage topic must be specified");
+        try {
+            this.readerConfigMap = loadConfigFromJsonString(
+                    workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_READER_CONFIG));
+        } catch (JsonProcessingException exception) {
+            log.warn("The provided reader configs are invalid, "
+                    + "will not passing any extra config to the reader builder.", exception);
+        }
 
         log.info("Configure offset backing store on pulsar topic {}", topic);
     }
@@ -148,6 +158,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
             reader = client.newReader(Schema.BYTES)
                     .topic(topic)
                     .startMessageId(MessageId.earliest)
+                    .loadConf(readerConfigMap)
                 .create();
             log.info("Successfully created reader to replay updates from topic {}", topic);
 
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
index bb2eced011f..7b71ba1a13f 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
@@ -64,13 +64,10 @@ public class PulsarOffsetBackingStoreTest extends ProducerConsumerBase {
 
         this.topicName = "persistent://my-property/my-ns/offset-topic";
         this.defaultProps.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, topicName);
-        this.distributedConfig = new PulsarKafkaWorkerConfig(this.defaultProps);
         this.client = PulsarClient.builder()
                 .serviceUrl(brokerUrl.toString())
                 .build();
         this.offsetBackingStore = new PulsarOffsetBackingStore(client);
-        this.offsetBackingStore.configure(distributedConfig);
-        this.offsetBackingStore.start();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -84,8 +81,19 @@ public class PulsarOffsetBackingStoreTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    private void testOffsetBackingStore(boolean testWithReaderConfig) throws Exception {
+        if (testWithReaderConfig) {
+            this.defaultProps.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_READER_CONFIG,
+                    "{\"subscriptionName\":\"my-subscription\"}");
+        }
+        this.distributedConfig = new PulsarKafkaWorkerConfig(this.defaultProps);
+        this.offsetBackingStore.configure(distributedConfig);
+        this.offsetBackingStore.start();
+    }
+
     @Test
     public void testGetFromEmpty() throws Exception {
+        testOffsetBackingStore(false);
         assertTrue(offsetBackingStore.get(
             Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8)))
         ).get().isEmpty());
@@ -93,11 +101,13 @@ public class PulsarOffsetBackingStoreTest extends ProducerConsumerBase {
 
     @Test
     public void testGetSet() throws Exception {
+        testOffsetBackingStore(false);
         testGetSet(false);
     }
 
     @Test
     public void testGetSetCallback() throws Exception {
+        testOffsetBackingStore(false);
         testGetSet(true);
     }
 
@@ -139,4 +149,12 @@ public class PulsarOffsetBackingStoreTest extends ProducerConsumerBase {
             assertEquals(new String(valData, UTF_8), "test-val-" + idx);
         });
     }
+
+    @Test
+    public void testWithReaderConfig() throws Exception {
+        testOffsetBackingStore(true);
+        testGetSet(false);
+        List<String> subscriptions = admin.topics().getSubscriptions(topicName);
+        assertTrue(subscriptions.contains("my-subscription"));
+    }
 }