You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/14 07:20:32 UTC

[incubator-eventmesh] branch master updated: Fixes ISSUE #1494 Enable authentication for pulsar client in connector

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 91ada6fc Fixes ISSUE #1494  Enable authentication for pulsar client in connector
     new c86f270d Merge pull request #1519 from fengyongshe/feat-add-authentication
91ada6fc is described below

commit 91ada6fcaa3e84cdebbfc3b811955996d0c02fe0
Author: fengyongshe <fe...@139.com>
AuthorDate: Wed Oct 12 09:46:04 2022 +0800

    Fixes ISSUE #1494  Enable authentication for pulsar client in connector
    
    Motivation
      When enable authentication in pulsar cluster, Client needs authentication paramters.
    
    Modifications
      Add auth parameters in ClientConfiguration and configured in producer & consumer
    
    Documentation
---
 .../pulsar/config/ClientConfiguration.java         | 17 +++++++++++++++
 .../pulsar/consumer/PulsarConsumerImpl.java        | 22 +++++++++++++++-----
 .../connector/pulsar/producer/ProducerImpl.java    | 24 +++++++++++++++++++---
 .../pulsar/producer/PulsarProducerImpl.java        |  5 -----
 4 files changed, 55 insertions(+), 13 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/config/ClientConfiguration.java b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/config/ClientConfiguration.java
index 67f403c1..ae0bc562 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/config/ClientConfiguration.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/config/ClientConfiguration.java
@@ -29,15 +29,32 @@ import lombok.Setter;
 public class ClientConfiguration {
 
     private String serviceAddr;
+    private String authPlugin;
+    private String authParams;
+
+    private static ClientConfiguration INSTANCE = null;
 
     public void init() {
         String serviceAddrStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_PULSAR_SERVICE_ADDR);
         Preconditions.checkState(StringUtils.isNotEmpty(serviceAddrStr),
                 String.format("%s error", ConfKeys.KEYS_EVENTMESH_PULSAR_SERVICE_ADDR));
         serviceAddr = StringUtils.trim(serviceAddrStr);
+        authPlugin = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_PULSAR_AUTH_PLUGIN);
+        authParams = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_PULSAR_AUTH_PARAMS);
+    }
+
+    public static ClientConfiguration getInstance() {
+        if (INSTANCE == null) {
+            INSTANCE = new ClientConfiguration();
+            INSTANCE.init();
+        }
+        return INSTANCE;
     }
 
     static class ConfKeys {
         public static final String KEYS_EVENTMESH_PULSAR_SERVICE_ADDR = "eventMesh.server.pulsar.service";
+        public static final String KEYS_EVENTMESH_PULSAR_AUTH_PLUGIN = "eventMesh.server.pulsar.authPlugin";
+        public static final String KEYS_EVENTMESH_PULSAR_AUTH_PARAMS = "eventMesh.server.pulsar.authParams";
     }
+
 }
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/consumer/PulsarConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/consumer/PulsarConsumerImpl.java
index c5e61fe5..0584f100 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/consumer/PulsarConsumerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/consumer/PulsarConsumerImpl.java
@@ -26,6 +26,7 @@ import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.connector.pulsar.config.ClientConfiguration;
 
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -39,6 +40,8 @@ import io.cloudevents.core.format.EventDeserializationException;
 import io.cloudevents.core.provider.EventFormatProvider;
 import io.cloudevents.jackson.JsonFormat;
 
+import com.google.common.base.Preconditions;
+
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -54,13 +57,22 @@ public class PulsarConsumerImpl implements Consumer {
     public void init(Properties properties) throws Exception {
         this.properties = properties;
 
-        final ClientConfiguration clientConfiguration = new ClientConfiguration();
-        clientConfiguration.init();
+        final ClientConfiguration clientConfiguration = ClientConfiguration.getInstance();
 
         try {
-            this.pulsarClient = PulsarClient.builder()
-                .serviceUrl(clientConfiguration.getServiceAddr())
-                .build();
+            ClientBuilder clientBuilder = PulsarClient.builder()
+                .serviceUrl(clientConfiguration.getServiceAddr());
+
+            if (clientConfiguration.getAuthPlugin() != null) {
+                Preconditions.checkNotNull(clientConfiguration.getAuthParams(),
+                    "Authentication Enabled in pulsar cluster, Please set authParams in pulsar-client.properties");
+                clientBuilder.authentication(
+                    clientConfiguration.getAuthPlugin(),
+                    clientConfiguration.getAuthParams()
+                );
+            }
+
+            this.pulsarClient = clientBuilder.build();
         } catch (Exception ex) {
             throw new ConnectorRuntimeException(
               String.format("Failed to connect pulsar with exception: {}", ex.getMessage()));
diff --git a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/ProducerImpl.java
index 2e52cb0d..084c7fe3 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/ProducerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/ProducerImpl.java
@@ -20,8 +20,10 @@ package org.apache.eventmesh.connector.pulsar.producer;
 import org.apache.eventmesh.api.SendCallback;
 import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
 import org.apache.eventmesh.api.exception.OnExceptionContext;
+import org.apache.eventmesh.connector.pulsar.config.ClientConfiguration;
 import org.apache.eventmesh.connector.pulsar.utils.CloudEventUtils;
 
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 
@@ -33,12 +35,18 @@ import io.cloudevents.CloudEvent;
 import io.cloudevents.core.provider.EventFormatProvider;
 import io.cloudevents.jackson.JsonFormat;
 
+import com.google.common.base.Preconditions;
+
 public class ProducerImpl extends AbstractProducer {
+
     private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private final ClientConfiguration clientConfiguration;
     private PulsarClient pulsarClient;
 
     public ProducerImpl(final Properties properties) {
         super(properties);
+        this.clientConfiguration = ClientConfiguration.getInstance();
     }
 
     public void publish(CloudEvent cloudEvent, SendCallback sendCallback) {
@@ -75,9 +83,19 @@ public class ProducerImpl extends AbstractProducer {
     public void start() {
         try {
             this.started.compareAndSet(false, true);
-            this.pulsarClient = PulsarClient.builder()
-                    .serviceUrl(this.properties().get("url").toString())
-                    .build();
+            ClientBuilder clientBuilder = PulsarClient.builder()
+                .serviceUrl(clientConfiguration.getServiceAddr());
+
+            if (clientConfiguration.getAuthPlugin() != null) {
+                Preconditions.checkNotNull(clientConfiguration.getAuthParams(),
+                    "Authentication Enabled in pulsar cluster, Please set authParams in pulsar-client.properties");
+                clientBuilder.authentication(
+                    clientConfiguration.getAuthPlugin(),
+                    clientConfiguration.getAuthParams()
+                );
+            }
+
+            this.pulsarClient = clientBuilder.build();
         } catch (Exception ignored) {
             // ignored
         }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/PulsarProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/PulsarProducerImpl.java
index f80bc588..64c08847 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/PulsarProducerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/PulsarProducerImpl.java
@@ -21,7 +21,6 @@ import org.apache.eventmesh.api.RequestReplyCallback;
 import org.apache.eventmesh.api.SendCallback;
 import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
 import org.apache.eventmesh.api.producer.Producer;
-import org.apache.eventmesh.connector.pulsar.config.ClientConfiguration;
 
 import java.util.Properties;
 
@@ -33,10 +32,6 @@ public class PulsarProducerImpl implements Producer {
 
     @Override
     public synchronized void init(Properties properties) {
-        final ClientConfiguration clientConfiguration = new ClientConfiguration();
-        clientConfiguration.init();
-
-        properties.put("url", clientConfiguration.getServiceAddr());
         producer = new ProducerImpl(properties);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org