You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/14 07:20:32 UTC
[incubator-eventmesh] branch master updated: Fixes ISSUE #1494 Enable authentication for pulsar client in connector
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 91ada6fc Fixes ISSUE #1494 Enable authentication for pulsar client in connector
new c86f270d Merge pull request #1519 from fengyongshe/feat-add-authentication
91ada6fc is described below
commit 91ada6fcaa3e84cdebbfc3b811955996d0c02fe0
Author: fengyongshe <fe...@139.com>
AuthorDate: Wed Oct 12 09:46:04 2022 +0800
Fixes ISSUE #1494 Enable authentication for pulsar client in connector
Motivation
When enable authentication in pulsar cluster, Client needs authentication paramters.
Modifications
Add auth parameters in ClientConfiguration and configured in producer & consumer
Documentation
---
.../pulsar/config/ClientConfiguration.java | 17 +++++++++++++++
.../pulsar/consumer/PulsarConsumerImpl.java | 22 +++++++++++++++-----
.../connector/pulsar/producer/ProducerImpl.java | 24 +++++++++++++++++++---
.../pulsar/producer/PulsarProducerImpl.java | 5 -----
4 files changed, 55 insertions(+), 13 deletions(-)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/config/ClientConfiguration.java b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/config/ClientConfiguration.java
index 67f403c1..ae0bc562 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/config/ClientConfiguration.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/config/ClientConfiguration.java
@@ -29,15 +29,32 @@ import lombok.Setter;
public class ClientConfiguration {
private String serviceAddr;
+ private String authPlugin;
+ private String authParams;
+
+ private static ClientConfiguration INSTANCE = null;
public void init() {
String serviceAddrStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_PULSAR_SERVICE_ADDR);
Preconditions.checkState(StringUtils.isNotEmpty(serviceAddrStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_PULSAR_SERVICE_ADDR));
serviceAddr = StringUtils.trim(serviceAddrStr);
+ authPlugin = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_PULSAR_AUTH_PLUGIN);
+ authParams = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_PULSAR_AUTH_PARAMS);
+ }
+
+ public static ClientConfiguration getInstance() {
+ if (INSTANCE == null) {
+ INSTANCE = new ClientConfiguration();
+ INSTANCE.init();
+ }
+ return INSTANCE;
}
static class ConfKeys {
public static final String KEYS_EVENTMESH_PULSAR_SERVICE_ADDR = "eventMesh.server.pulsar.service";
+ public static final String KEYS_EVENTMESH_PULSAR_AUTH_PLUGIN = "eventMesh.server.pulsar.authPlugin";
+ public static final String KEYS_EVENTMESH_PULSAR_AUTH_PARAMS = "eventMesh.server.pulsar.authParams";
}
+
}
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/consumer/PulsarConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/consumer/PulsarConsumerImpl.java
index c5e61fe5..0584f100 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/consumer/PulsarConsumerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/consumer/PulsarConsumerImpl.java
@@ -26,6 +26,7 @@ import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.pulsar.config.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -39,6 +40,8 @@ import io.cloudevents.core.format.EventDeserializationException;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
+import com.google.common.base.Preconditions;
+
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -54,13 +57,22 @@ public class PulsarConsumerImpl implements Consumer {
public void init(Properties properties) throws Exception {
this.properties = properties;
- final ClientConfiguration clientConfiguration = new ClientConfiguration();
- clientConfiguration.init();
+ final ClientConfiguration clientConfiguration = ClientConfiguration.getInstance();
try {
- this.pulsarClient = PulsarClient.builder()
- .serviceUrl(clientConfiguration.getServiceAddr())
- .build();
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .serviceUrl(clientConfiguration.getServiceAddr());
+
+ if (clientConfiguration.getAuthPlugin() != null) {
+ Preconditions.checkNotNull(clientConfiguration.getAuthParams(),
+ "Authentication Enabled in pulsar cluster, Please set authParams in pulsar-client.properties");
+ clientBuilder.authentication(
+ clientConfiguration.getAuthPlugin(),
+ clientConfiguration.getAuthParams()
+ );
+ }
+
+ this.pulsarClient = clientBuilder.build();
} catch (Exception ex) {
throw new ConnectorRuntimeException(
String.format("Failed to connect pulsar with exception: {}", ex.getMessage()));
diff --git a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/ProducerImpl.java
index 2e52cb0d..084c7fe3 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/ProducerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/ProducerImpl.java
@@ -20,8 +20,10 @@ package org.apache.eventmesh.connector.pulsar.producer;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
import org.apache.eventmesh.api.exception.OnExceptionContext;
+import org.apache.eventmesh.connector.pulsar.config.ClientConfiguration;
import org.apache.eventmesh.connector.pulsar.utils.CloudEventUtils;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -33,12 +35,18 @@ import io.cloudevents.CloudEvent;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
+import com.google.common.base.Preconditions;
+
public class ProducerImpl extends AbstractProducer {
+
private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private final ClientConfiguration clientConfiguration;
private PulsarClient pulsarClient;
public ProducerImpl(final Properties properties) {
super(properties);
+ this.clientConfiguration = ClientConfiguration.getInstance();
}
public void publish(CloudEvent cloudEvent, SendCallback sendCallback) {
@@ -75,9 +83,19 @@ public class ProducerImpl extends AbstractProducer {
public void start() {
try {
this.started.compareAndSet(false, true);
- this.pulsarClient = PulsarClient.builder()
- .serviceUrl(this.properties().get("url").toString())
- .build();
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .serviceUrl(clientConfiguration.getServiceAddr());
+
+ if (clientConfiguration.getAuthPlugin() != null) {
+ Preconditions.checkNotNull(clientConfiguration.getAuthParams(),
+ "Authentication Enabled in pulsar cluster, Please set authParams in pulsar-client.properties");
+ clientBuilder.authentication(
+ clientConfiguration.getAuthPlugin(),
+ clientConfiguration.getAuthParams()
+ );
+ }
+
+ this.pulsarClient = clientBuilder.build();
} catch (Exception ignored) {
// ignored
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/PulsarProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/PulsarProducerImpl.java
index f80bc588..64c08847 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/PulsarProducerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/PulsarProducerImpl.java
@@ -21,7 +21,6 @@ import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
import org.apache.eventmesh.api.producer.Producer;
-import org.apache.eventmesh.connector.pulsar.config.ClientConfiguration;
import java.util.Properties;
@@ -33,10 +32,6 @@ public class PulsarProducerImpl implements Producer {
@Override
public synchronized void init(Properties properties) {
- final ClientConfiguration clientConfiguration = new ClientConfiguration();
- clientConfiguration.init();
-
- properties.put("url", clientConfiguration.getServiceAddr());
producer = new ProducerImpl(properties);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org