You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/10/12 13:12:18 UTC

[camel] 01/01: CAMEL-16030: camel-pulsar - Add async send to producer

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-16030/async-producer
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5bf94a8f578a867531fb2f743c053b74e338f317
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Wed Oct 12 15:11:53 2022 +0200

    CAMEL-16030: camel-pulsar - Add async send to producer
---
 .../component/pulsar/PulsarConfiguration.java      |   5 +-
 .../camel/component/pulsar/PulsarProducer.java     | 145 +++++++++++++--------
 2 files changed, 96 insertions(+), 54 deletions(-)

diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index 1d9aa490e71..5fe9db9f06f 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -312,12 +312,15 @@ public class PulsarConfiguration implements Cloneable {
     }
 
     /**
-     * Set the number of max pending messages across all the partitions. Default is 50000.
+     * Set the number of max pending messages across all the partitions. Default is 50000. This option is deprecated and
+     * will be removed in future version.
      */
+    @Deprecated
     public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
         this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
     }
 
+    @Deprecated
     public int getMaxPendingMessagesAcrossPartitions() {
         return maxPendingMessagesAcrossPartitions;
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 52dcdc14dfb..100340eef2e 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,30 +16,35 @@
  */
 package org.apache.camel.component.pulsar;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConversionException;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PulsarProducer extends DefaultProducer {
+public class PulsarProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(PulsarProducer.class);
 
+    private final Object mutex = new Object();
     private final PulsarEndpoint pulsarEndpoint;
-    private Producer<byte[]> producer;
+    private volatile Producer<byte[]> producer;
 
     public PulsarProducer(PulsarEndpoint pulsarEndpoint) {
         super(pulsarEndpoint);
@@ -47,69 +52,103 @@ public class PulsarProducer extends DefaultProducer {
     }
 
     @Override
-    public void process(final Exchange exchange) throws Exception {
-        final Message message = exchange.getIn();
-
-        TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
-        byte[] body;
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            body = exchange.getContext().getTypeConverter()
-                    .mandatoryConvertTo(byte[].class, exchange, message.getBody());
-        } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
-            // fallback to try to serialize the data
-            body = PulsarMessageUtils.serialize(message.getBody());
-        }
-        messageBuilder.value(body);
+            final Message message = exchange.getIn();
+            byte[] body = serialize(exchange, message.getBody());
+            TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
+            messageBuilder.value(body);
 
-        String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
-        if (ObjectHelper.isNotEmpty(key)) {
-            messageBuilder.key(key);
-        }
+            String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
+            if (ObjectHelper.isNotEmpty(key)) {
+                messageBuilder.key(key);
+            }
 
-        Map<String, String> properties
-                = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
-        if (ObjectHelper.isNotEmpty(properties)) {
-            messageBuilder.properties(properties);
-        }
+            Map<String, String> properties
+                    = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
+            if (ObjectHelper.isNotEmpty(properties)) {
+                messageBuilder.properties(properties);
+            }
 
-        Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
-        if (eventTime != null) {
-            messageBuilder.eventTime(eventTime);
+            Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
+            if (eventTime != null) {
+                messageBuilder.eventTime(eventTime);
+            }
+
+            messageBuilder.sendAsync()
+                    .thenAccept(r -> exchange.getIn().setBody(r))
+                    .whenComplete(
+                            (r, e) -> {
+                                try {
+                                    if (e != null) {
+                                        exchange.setException(new CamelExchangeException(
+                                                "An error occurred while sending a message to pulsar", exchange, e));
+                                    }
+                                } finally {
+                                    callback.done(false);
+                                }
+                            });
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
+        return false;
+    }
 
-        messageBuilder.send();
+    /**
+     * Serialize the given content using the appropriate converter if any, otherwise it relies on
+     * {@link PulsarMessageUtils#serialize(Object)}.
+     *
+     * @param  exchange    the exchange used as context for the serialization process.
+     * @param  content     the content to serialize.
+     * @return             the serialized counterpart of the given content
+     * @throws IOException if an error occurs while serializing the content.
+     */
+    private static byte[] serialize(Exchange exchange, Object content) throws IOException {
+        byte[] result;
+        try {
+            result = exchange.getContext().getTypeConverter()
+                    .mandatoryConvertTo(byte[].class, exchange, content);
+        } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
+            // fallback to try to serialize the data
+            result = PulsarMessageUtils.serialize(content);
+        }
+        return result;
     }
 
-    private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException {
-        if (producer == null) {
-            final String topicUri = pulsarEndpoint.getUri();
-            PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
-            String producerName = configuration.getProducerName();
-            final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
-                    .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
-                    .blockIfQueueFull(configuration.isBlockIfQueueFull())
-                    .maxPendingMessages(configuration.getMaxPendingMessages())
-                    .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
-                    .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
-                    .batchingMaxMessages(configuration.getMaxPendingMessages())
-                    .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
-                    .initialSequenceId(configuration.getInitialSequenceId())
-                    .compressionType(configuration.getCompressionType());
-            if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
-                producerBuilder.messageRouter(configuration.getMessageRouter());
-            } else {
-                producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
-            }
-            if (producerName != null) {
-                producerBuilder.producerName(producerName);
+    private void createProducer() throws PulsarClientException {
+        synchronized (mutex) {
+            if (producer == null) {
+                final String topicUri = pulsarEndpoint.getUri();
+                PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
+                String producerName = configuration.getProducerName();
+                final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
+                        .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
+                        .blockIfQueueFull(configuration.isBlockIfQueueFull())
+                        .maxPendingMessages(configuration.getMaxPendingMessages())
+                        .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+                        .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
+                        .batchingMaxMessages(configuration.getMaxPendingMessages())
+                        .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
+                        .initialSequenceId(configuration.getInitialSequenceId())
+                        .compressionType(configuration.getCompressionType());
+                if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
+                    producerBuilder.messageRouter(configuration.getMessageRouter());
+                } else {
+                    producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
+                }
+                if (producerName != null) {
+                    producerBuilder.producerName(producerName);
+                }
+                producer = producerBuilder.create();
             }
-            producer = producerBuilder.create();
         }
     }
 
     @Override
     protected void doStart() throws Exception {
-        LOG.debug("Starting producer: {}", this);
+        LOG.debug("Starting the pulsar producer: {}", this);
         if (producer == null) {
             createProducer();
         }
@@ -117,7 +156,7 @@ public class PulsarProducer extends DefaultProducer {
 
     @Override
     protected void doStop() throws Exception {
-        LOG.debug("Stopping producer: {}", this);
+        LOG.debug("Stopping the pulsar producer: {}", this);
         if (producer != null) {
             producer.close();
             producer = null;