You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/26 02:27:37 UTC
[incubator-eventmesh] branch master updated: Fixes ISSUE #1636
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new d86adeab Fixes ISSUE #1636
new 383c840d Merge pull request #1862 from fengyongshe/issue-1636
d86adeab is described below
commit d86adeabde7b2cf674fd48e450fb20458215ff85
Author: fengyongshe <fe...@139.com>
AuthorDate: Tue Oct 25 23:50:56 2022 +0800
Fixes ISSUE #1636
Motivation
Improve the performance of publish event in connector-pulsar
Modifications
Hold the producer instance in map, create when used for the first time
Documentation
---
.../pulsar/client/PulsarClientWrapper.java | 110 +++++++++++++++++++++
.../connector/pulsar/producer/ProducerImpl.java | 70 +++----------
2 files changed, 123 insertions(+), 57 deletions(-)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/client/PulsarClientWrapper.java b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/client/PulsarClientWrapper.java
new file mode 100644
index 00000000..076ca005
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/client/PulsarClientWrapper.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.pulsar.client;
+
+import org.apache.eventmesh.api.SendCallback;
+import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
+import org.apache.eventmesh.connector.pulsar.config.ClientConfiguration;
+import org.apache.eventmesh.connector.pulsar.utils.CloudEventUtils;
+
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.provider.EventFormatProvider;
+import io.cloudevents.jackson.JsonFormat;
+
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PulsarClientWrapper {
+
+ private ClientConfiguration config;
+ private PulsarClient pulsarClient;
+ private Map<String, Producer<byte[]>> producerMap = new HashMap<>();
+
+ public PulsarClientWrapper(ClientConfiguration config) {
+ this.config = config;
+ try {
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .serviceUrl(config.getServiceAddr());
+
+ if (config.getAuthPlugin() != null) {
+ Preconditions.checkNotNull(config.getAuthParams(),
+ "Authentication Enabled in pulsar cluster, Please set authParams in pulsar-client.properties");
+ clientBuilder.authentication(
+ config.getAuthPlugin(),
+ config.getAuthParams()
+ );
+ }
+
+ this.pulsarClient = clientBuilder.build();
+ } catch (PulsarClientException ex) {
+ throw new ConnectorRuntimeException(
+ String.format("Failed to connect pulsar cluster %s with exception: %s", config.getServiceAddr(), ex.getMessage()));
+ }
+ }
+
+ private Producer<byte[]> createProducer(String topic) {
+ try {
+ return this.pulsarClient.newProducer()
+ .topic(topic)
+ .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
+ .sendTimeout(10, TimeUnit.SECONDS)
+ .blockIfQueueFull(true)
+ .create();
+ } catch (PulsarClientException ex) {
+ throw new ConnectorRuntimeException(
+ String.format("Failed to create pulsar producer for %s with exception: %s", topic, ex.getMessage()));
+ }
+ }
+
+ public void publish(CloudEvent cloudEvent, SendCallback sendCallback) {
+ String topic = cloudEvent.getSubject();
+ Producer<byte[]> producer = producerMap.computeIfAbsent(topic, k -> createProducer(topic));
+ try {
+ byte[] serializedCloudEvent = EventFormatProvider
+ .getInstance()
+ .resolveFormat(JsonFormat.CONTENT_TYPE)
+ .serialize(cloudEvent);
+ producer.sendAsync(serializedCloudEvent).thenAccept(messageId -> {
+ sendCallback.onSuccess(CloudEventUtils.convertSendResult(cloudEvent));
+ });
+ } catch (Exception ex) {
+ log.error("Failed to publish cloudEvent for {} with exception: {}",
+ cloudEvent.getSubject(), ex.getMessage());
+ }
+ }
+
+ public void shutdown() throws PulsarClientException {
+ pulsarClient.close();
+ for (Map.Entry<String, Producer<byte[]>> producerEntry : producerMap.entrySet()) {
+ producerEntry.getValue().close();
+ }
+ producerMap.clear();
+ }
+
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/ProducerImpl.java
index 084c7fe3..e49ef3b3 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/ProducerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/producer/ProducerImpl.java
@@ -18,62 +18,32 @@
package org.apache.eventmesh.connector.pulsar.producer;
import org.apache.eventmesh.api.SendCallback;
-import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
-import org.apache.eventmesh.api.exception.OnExceptionContext;
+import org.apache.eventmesh.connector.pulsar.client.PulsarClientWrapper;
import org.apache.eventmesh.connector.pulsar.config.ClientConfiguration;
-import org.apache.eventmesh.connector.pulsar.utils.CloudEventUtils;
-
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
import java.util.Properties;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.cloudevents.CloudEvent;
-import io.cloudevents.core.provider.EventFormatProvider;
-import io.cloudevents.jackson.JsonFormat;
-import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
public class ProducerImpl extends AbstractProducer {
private final AtomicBoolean started = new AtomicBoolean(false);
- private final ClientConfiguration clientConfiguration;
- private PulsarClient pulsarClient;
+ private ClientConfiguration config;
+ private PulsarClientWrapper pulsarClient;
public ProducerImpl(final Properties properties) {
super(properties);
- this.clientConfiguration = ClientConfiguration.getInstance();
+ this.config = new ClientConfiguration();
+ this.config.init();
}
public void publish(CloudEvent cloudEvent, SendCallback sendCallback) {
-
- try {
- Producer<byte[]> producer = this.pulsarClient.newProducer()
- .topic(cloudEvent.getSubject())
- .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
- .sendTimeout(10, TimeUnit.SECONDS)
- .blockIfQueueFull(true)
- .create();
-
- byte[] serializedCloudEvent = EventFormatProvider
- .getInstance()
- .resolveFormat(JsonFormat.CONTENT_TYPE)
- .serialize(cloudEvent);
-
- producer.sendAsync(serializedCloudEvent).thenAccept(messageId -> {
- sendCallback.onSuccess(CloudEventUtils.convertSendResult(cloudEvent));
- });
- } catch (Exception e) {
- ConnectorRuntimeException onsEx = this.checkProducerException(cloudEvent, e);
- OnExceptionContext context = new OnExceptionContext();
- context.setTopic(cloudEvent.getSubject());
- context.setException(onsEx);
- sendCallback.onException(context);
- }
+ this.pulsarClient.publish(cloudEvent, sendCallback);
}
public void init(Properties properties) {
@@ -81,39 +51,25 @@ public class ProducerImpl extends AbstractProducer {
}
public void start() {
- try {
- this.started.compareAndSet(false, true);
- ClientBuilder clientBuilder = PulsarClient.builder()
- .serviceUrl(clientConfiguration.getServiceAddr());
-
- if (clientConfiguration.getAuthPlugin() != null) {
- Preconditions.checkNotNull(clientConfiguration.getAuthParams(),
- "Authentication Enabled in pulsar cluster, Please set authParams in pulsar-client.properties");
- clientBuilder.authentication(
- clientConfiguration.getAuthPlugin(),
- clientConfiguration.getAuthParams()
- );
- }
-
- this.pulsarClient = clientBuilder.build();
- } catch (Exception ignored) {
- // ignored
- }
+ this.started.compareAndSet(false, true);
+ this.pulsarClient = new PulsarClientWrapper(config);
}
public void shutdown() {
try {
this.started.compareAndSet(true, false);
- this.pulsarClient.close();
+ this.pulsarClient.shutdown();
} catch (Exception ignored) {
// ignored
}
}
+ @Override
public boolean isStarted() {
return this.started.get();
}
+ @Override
public boolean isClosed() {
return !this.isStarted();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org