You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rf...@apache.org on 2021/09/23 03:35:20 UTC

[pulsar] branch freeznet/fix-debezium-client-auth created (now 5ae04fb)

This is an automated email from the ASF dual-hosted git repository.

rfu pushed a change to branch freeznet/fix-debezium-client-auth
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


      at 5ae04fb  stash

This branch includes the following new commits:

     new 5ae04fb  stash

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[pulsar] 01/01: stash

Posted by rf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rfu pushed a commit to branch freeznet/fix-debezium-client-auth
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5ae04fb4f6b1634800b79593a55ba570880deaed
Author: Rui Fu <fr...@gmail.com>
AuthorDate: Thu Sep 23 11:04:23 2021 +0800

    stash
---
 .../java/org/apache/pulsar/io/debezium/DebeziumSource.java     | 10 +++++-----
 .../org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java   |  7 ++++---
 2 files changed, 9 insertions(+), 8 deletions(-)

diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
index b9074b9..6f75233 100644
--- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
+++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
@@ -81,9 +81,6 @@ public abstract class DebeziumSource extends KafkaConnectSource {
 
         // database.history.pulsar.service.url
         String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name());
-        if (StringUtils.isEmpty(pulsarUrl)) {
-            throw new IllegalArgumentException("Pulsar service URL for History Database not provided.");
-        }
 
         String topicNamespace = topicNamespace(sourceContext);
         // topic.namespace
@@ -97,8 +94,11 @@ public abstract class DebeziumSource extends KafkaConnectSource {
         setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
             topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);
 
-        config.put(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.client.builder",
-                SerDeUtils.serialize(sourceContext.getPulsarClientBuilder()));
+        // pass pulsar.client.builder if database.history.pulsar.service.url is not provided
+        if (StringUtils.isEmpty(pulsarUrl)) {
+            String pulsarClientBuilder = SerDeUtils.serialize(sourceContext.getPulsarClientBuilder());
+            config.put(PulsarDatabaseHistory.CLIENT_BUILDER.name(), pulsarClientBuilder);
+        }
 
         super.open(config, sourceContext);
     }
diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index be152a6..a98e817 100644
--- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -103,14 +103,15 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
         }
         this.topicName = config.getString(TOPIC);
 
-        if (config.getString(CLIENT_BUILDER) == null && config.getString(SERVICE_URL) == null) {
+        if (StringUtils.isEmpty(config.getString(CLIENT_BUILDER)) && StringUtils.isEmpty(config.getString(SERVICE_URL))) {
             throw new IllegalArgumentException("Neither Pulsar Service URL nor ClientBuilder provided.");
         }
         String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
         this.clientBuilder = PulsarClient.builder();
-        if (null != clientBuilderBase64Encoded) {
+        if (!StringUtils.isEmpty(clientBuilderBase64Encoded)) {
             // deserialize the client builder to the same classloader
-            this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded, this.clientBuilder.getClass().getClassLoader());
+            this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded,
+                    this.clientBuilder.getClass().getClassLoader());
         } else {
             this.clientBuilder.serviceUrl(config.getString(SERVICE_URL));
         }