You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/25 17:27:02 UTC
[pulsar] branch branch-2.10 updated: [improve][connector] add reader config to `pulsar-io-debezium` and `pulsar-io-kafka-connect-adaptor` (#16675)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new d923c842b47 [improve][connector] add reader config to `pulsar-io-debezium` and `pulsar-io-kafka-connect-adaptor` (#16675)
d923c842b47 is described below
commit d923c842b473e4c32daa46a73e1e71c4db23ec67
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Mon Jul 25 16:44:41 2022 +0000
[improve][connector] add reader config to `pulsar-io-debezium` and `pulsar-io-kafka-connect-adaptor` (#16675)
(cherry picked from commit 9c199761387dcad97316f482b9ceeaf1980a3334)
---
.../org/apache/pulsar/io/common/IOConfigUtils.java | 13 ++++++
.../pulsar/io/debezium/PulsarDatabaseHistory.java | 46 ++++++++++++++++------
.../io/debezium/PulsarDatabaseHistoryTest.java | 30 ++++++++++++--
pulsar-io/kafka-connect-adaptor/pom.xml | 6 +++
.../io/kafka/connect/PulsarKafkaWorkerConfig.java | 15 ++++++-
.../io/kafka/connect/PulsarOffsetBackingStore.java | 11 ++++++
.../connect/PulsarOffsetBackingStoreTest.java | 24 +++++++++--
7 files changed, 127 insertions(+), 18 deletions(-)
diff --git a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
index 1dd50525d38..b96585828b0 100644
--- a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
+++ b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
@@ -18,9 +18,13 @@
*/
package org.apache.pulsar.io.common;
+import static org.apache.commons.lang.StringUtils.isBlank;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
@@ -40,6 +44,15 @@ public class IOConfigUtils {
return loadWithSecrets(map, clazz, secretName -> sinkContext.getSecret(secretName));
}
+ public static Map<String, Object> loadConfigFromJsonString(String config) throws JsonProcessingException {
+ if (!isBlank(config)) {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(config, new TypeReference<Map<String, Object>>() {
+ });
+ } else {
+ return Collections.emptyMap();
+ }
+ }
private static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz,
Function<String, String> secretsGetter) {
diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index f004d244611..306a791fa9e 100644
--- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -19,6 +19,9 @@
package org.apache.pulsar.io.debezium;
import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.pulsar.io.common.IOConfigUtils.loadConfigFromJsonString;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
@@ -30,6 +33,8 @@ import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
@@ -77,14 +82,26 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
.withDescription("Pulsar client builder")
.withValidation(Field::isOptional);
+ public static final Field READER_CONFIG = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.reader.config")
+ .withDisplayName("Extra configs of the reader")
+ .withType(Type.STRING)
+ .withWidth(Width.LONG)
+ .withImportance(Importance.HIGH)
+ .withDescription("The configs of the reader for the database schema history topic, "
+ + "in the form of a JSON string with key-value pairs")
+ .withDefault((String) null)
+ .withValidation(Field::isOptional);
+
public static final Field.Set ALL_FIELDS = Field.setOf(
TOPIC,
SERVICE_URL,
CLIENT_BUILDER,
- DatabaseHistory.NAME);
+ DatabaseHistory.NAME,
+ READER_CONFIG);
private final DocumentReader reader = DocumentReader.defaultReader();
private String topicName;
+ private Map<String, Object> readerConfigMap = new HashMap<>();
private String dbHistoryName;
private ClientBuilder clientBuilder;
private volatile PulsarClient pulsarClient;
@@ -102,6 +119,12 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
+ getClass().getSimpleName() + "; check the logs for details");
}
this.topicName = config.getString(TOPIC);
+ try {
+ this.readerConfigMap = loadConfigFromJsonString(config.getString(READER_CONFIG));
+ } catch (JsonProcessingException exception) {
+ log.warn("The provided reader configs are invalid, "
+ + "will not passing any extra config to the reader builder.", exception);
+ }
String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
if (isBlank(clientBuilderBase64Encoded) && isBlank(config.getString(SERVICE_URL))) {
@@ -210,11 +233,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
@Override
protected void recoverRecords(Consumer<HistoryRecord> records) {
setupClientIfNeeded();
- try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
- .topic(topicName)
- .startMessageId(MessageId.earliest)
- .create()
- ) {
+ try (Reader<String> historyReader = createHistoryReader()) {
log.info("Scanning the database history topic '{}'", topicName);
// Read all messages in the topic ...
@@ -257,11 +276,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
@Override
public boolean exists() {
setupClientIfNeeded();
- try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
- .topic(topicName)
- .startMessageId(MessageId.earliest)
- .create()
- ) {
+ try (Reader<String> historyReader = createHistoryReader()) {
return historyReader.hasMessageAvailable();
} catch (IOException e) {
log.error("Encountered issues on checking existence of database history", e);
@@ -281,4 +296,13 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
}
return "Pulsar topic";
}
+
+ @VisibleForTesting
+ Reader<String> createHistoryReader() throws PulsarClientException {
+ return pulsarClient.newReader(Schema.STRING)
+ .topic(topicName)
+ .startMessageId(MessageId.earliest)
+ .loadConf(readerConfigMap)
+ .create();
+ }
}
diff --git a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
index 04334da5e43..081cfdcc543 100644
--- a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
+++ b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.debezium;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
@@ -34,12 +35,14 @@ import io.debezium.util.Collect;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Base64;
+import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -74,7 +77,7 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
super.internalCleanup();
}
- private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder) throws Exception {
+ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder, boolean testWithReaderConfig) throws Exception {
Configuration.Builder configBuidler = Configuration.create()
.with(PulsarDatabaseHistory.TOPIC, topicName)
.with(DatabaseHistory.NAME, "my-db-history")
@@ -93,6 +96,10 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
configBuidler.with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString());
}
+ if (testWithReaderConfig) {
+ configBuidler.with(PulsarDatabaseHistory.READER_CONFIG, "{\"subscriptionName\":\"my-subscription\"}");
+ }
+
// Start up the history ...
history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true);
history.start();
@@ -122,8 +129,8 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
// Now record schema changes, which writes out to kafka but doesn't actually change the Tables ...
setLogPosition(10);
ddl = "CREATE TABLE foo ( first VARCHAR(22) NOT NULL ); \n" +
- "CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
- "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL ); \n";
+ "CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
+ "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL ); \n";
history.record(source, position, "db1", ddl);
// Parse the DDL statement 3x and each time update a different Tables object ...
@@ -181,6 +188,10 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
assertEquals(recoveredTables, tables3);
}
+ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder) throws Exception {
+ testHistoryTopicContent(skipUnparseableDDL, testWithClientBuilder, false);
+ }
+
protected void setLogPosition(int index) {
this.position = Collect.hashMapOf("filename", "my-txn-file.log",
"position", index);
@@ -239,4 +250,17 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
// dummytopic should not exist yet
assertFalse(history.exists());
}
+
+ @Test
+ public void testSubscriptionName() throws Exception {
+ testHistoryTopicContent(true, false, true);
+ assertTrue(history.exists());
+ try (Reader<String> ignored = history.createHistoryReader()) {
+ List<String> subscriptions = admin.topics().getSubscriptions(topicName);
+ assertEquals(subscriptions.size(), 1);
+ assertTrue(subscriptions.contains("my-subscription"));
+ } catch (Exception e) {
+ fail("Failed to create history reader");
+ }
+ }
}
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index 99150617956..98f3cc431f4 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -38,6 +38,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
index b367307cf7d..70b0ab887c8 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
@@ -43,6 +43,14 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
public static final String TOPIC_NAMESPACE_CONFIG = "topic.namespace";
private static final String TOPIC_NAMESPACE_CONFIG_DOC = "namespace of topic name to store the output topics";
+ /**
+ * <code>offset.storage.reader.config</code>.
+ */
+ public static final String OFFSET_STORAGE_READER_CONFIG = "offset.storage.reader.config";
+ private static final String OFFSET_STORAGE_READER_CONFIG_DOC = "The configs of the reader for the "
+ + "kafka connector offsets topic, in the form of a JSON string with key-value pairs";
+
+
static {
CONFIG = new ConfigDef()
.define(OFFSET_STORAGE_TOPIC_CONFIG,
@@ -53,7 +61,12 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
Type.STRING,
"public/default",
Importance.HIGH,
- TOPIC_NAMESPACE_CONFIG_DOC);
+ TOPIC_NAMESPACE_CONFIG_DOC)
+ .define(OFFSET_STORAGE_READER_CONFIG,
+ Type.STRING,
+ null,
+ Importance.HIGH,
+ OFFSET_STORAGE_READER_CONFIG_DOC);
}
public PulsarKafkaWorkerConfig(Map<String, String> props) {
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index 86905ad8990..d2f5aeef72a 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.io.kafka.connect;
import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.pulsar.io.common.IOConfigUtils.loadConfigFromJsonString;
+import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
@@ -54,6 +56,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
private final Map<ByteBuffer, ByteBuffer> data = new ConcurrentHashMap<>();
private PulsarClient client;
private String topic;
+ private Map<String, Object> readerConfigMap = new HashMap<>();
private Producer<byte[]> producer;
private Reader<byte[]> reader;
private volatile CompletableFuture<Void> outstandingReadToEnd = null;
@@ -67,6 +70,13 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
public void configure(WorkerConfig workerConfig) {
this.topic = workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG);
checkArgument(!isBlank(topic), "Offset storage topic must be specified");
+ try {
+ this.readerConfigMap = loadConfigFromJsonString(
+ workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_READER_CONFIG));
+ } catch (JsonProcessingException exception) {
+ log.warn("The provided reader configs are invalid, "
+ + "will not passing any extra config to the reader builder.", exception);
+ }
log.info("Configure offset backing store on pulsar topic {}", topic);
}
@@ -148,6 +158,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
reader = client.newReader(Schema.BYTES)
.topic(topic)
.startMessageId(MessageId.earliest)
+ .loadConf(readerConfigMap)
.create();
log.info("Successfully created reader to replay updates from topic {}", topic);
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
index 52a004510a7..f45df6ea216 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
@@ -61,13 +61,10 @@ public class PulsarOffsetBackingStoreTest extends ProducerConsumerBase {
this.topicName = "persistent://my-property/my-ns/offset-topic";
this.defaultProps.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, topicName);
- this.distributedConfig = new PulsarKafkaWorkerConfig(this.defaultProps);
this.client = PulsarClient.builder()
.serviceUrl(brokerUrl.toString())
.build();
this.offsetBackingStore = new PulsarOffsetBackingStore(client);
- this.offsetBackingStore.configure(distributedConfig);
- this.offsetBackingStore.start();
}
@AfterMethod(alwaysRun = true)
@@ -81,8 +78,19 @@ public class PulsarOffsetBackingStoreTest extends ProducerConsumerBase {
super.internalCleanup();
}
+ private void testOffsetBackingStore(boolean testWithReaderConfig) throws Exception {
+ if (testWithReaderConfig) {
+ this.defaultProps.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_READER_CONFIG,
+ "{\"subscriptionName\":\"my-subscription\"}");
+ }
+ this.distributedConfig = new PulsarKafkaWorkerConfig(this.defaultProps);
+ this.offsetBackingStore.configure(distributedConfig);
+ this.offsetBackingStore.start();
+ }
+
@Test
public void testGetFromEmpty() throws Exception {
+ testOffsetBackingStore(false);
assertTrue(offsetBackingStore.get(
Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8)))
).get().isEmpty());
@@ -90,11 +98,13 @@ public class PulsarOffsetBackingStoreTest extends ProducerConsumerBase {
@Test
public void testGetSet() throws Exception {
+ testOffsetBackingStore(false);
testGetSet(false);
}
@Test
public void testGetSetCallback() throws Exception {
+ testOffsetBackingStore(false);
testGetSet(true);
}
@@ -136,4 +146,12 @@ public class PulsarOffsetBackingStoreTest extends ProducerConsumerBase {
assertEquals(new String(valData, UTF_8), "test-val-" + idx);
});
}
+
+ @Test
+ public void testWithReaderConfig() throws Exception {
+ testOffsetBackingStore(true);
+ testGetSet(false);
+ List<String> subscriptions = admin.topics().getSubscriptions(topicName);
+ assertTrue(subscriptions.contains("my-subscription"));
+ }
}