You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rf...@apache.org on 2021/09/23 03:35:21 UTC

[pulsar] 01/01: stash

This is an automated email from the ASF dual-hosted git repository.

rfu pushed a commit to branch freeznet/fix-debezium-client-auth
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5ae04fb4f6b1634800b79593a55ba570880deaed
Author: Rui Fu <fr...@gmail.com>
AuthorDate: Thu Sep 23 11:04:23 2021 +0800

    stash
---
 .../java/org/apache/pulsar/io/debezium/DebeziumSource.java     | 10 +++++-----
 .../org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java   |  7 ++++---
 2 files changed, 9 insertions(+), 8 deletions(-)

diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
index b9074b9..6f75233 100644
--- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
+++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
@@ -81,9 +81,6 @@ public abstract class DebeziumSource extends KafkaConnectSource {
 
         // database.history.pulsar.service.url
         String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name());
-        if (StringUtils.isEmpty(pulsarUrl)) {
-            throw new IllegalArgumentException("Pulsar service URL for History Database not provided.");
-        }
 
         String topicNamespace = topicNamespace(sourceContext);
         // topic.namespace
@@ -97,8 +94,11 @@ public abstract class DebeziumSource extends KafkaConnectSource {
         setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
             topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);
 
-        config.put(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.client.builder",
-                SerDeUtils.serialize(sourceContext.getPulsarClientBuilder()));
+        // pass pulsar.client.builder if database.history.pulsar.service.url is not provided
+        if (StringUtils.isEmpty(pulsarUrl)) {
+            String pulsarClientBuilder = SerDeUtils.serialize(sourceContext.getPulsarClientBuilder());
+            config.put(PulsarDatabaseHistory.CLIENT_BUILDER.name(), pulsarClientBuilder);
+        }
 
         super.open(config, sourceContext);
     }
diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index be152a6..a98e817 100644
--- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -103,14 +103,15 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
         }
         this.topicName = config.getString(TOPIC);
 
-        if (config.getString(CLIENT_BUILDER) == null && config.getString(SERVICE_URL) == null) {
+        if (StringUtils.isEmpty(config.getString(CLIENT_BUILDER)) && StringUtils.isEmpty(config.getString(SERVICE_URL))) {
             throw new IllegalArgumentException("Neither Pulsar Service URL nor ClientBuilder provided.");
         }
         String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
         this.clientBuilder = PulsarClient.builder();
-        if (null != clientBuilderBase64Encoded) {
+        if (!StringUtils.isEmpty(clientBuilderBase64Encoded)) {
             // deserialize the client builder to the same classloader
-            this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded, this.clientBuilder.getClass().getClassLoader());
+            this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded,
+                    this.clientBuilder.getClass().getClassLoader());
         } else {
             this.clientBuilder.serviceUrl(config.getString(SERVICE_URL));
         }