You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/10/12 13:12:17 UTC

[camel] branch CAMEL-16030/async-producer created (now 5bf94a8f578)

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a change to branch CAMEL-16030/async-producer
in repository https://gitbox.apache.org/repos/asf/camel.git


      at 5bf94a8f578 CAMEL-16030: camel-pulsar - Add async send to producer

This branch includes the following new commits:

     new 5bf94a8f578 CAMEL-16030: camel-pulsar - Add async send to producer

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 01/01: CAMEL-16030: camel-pulsar - Add async send to producer

Posted by nf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-16030/async-producer
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5bf94a8f578a867531fb2f743c053b74e338f317
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Wed Oct 12 15:11:53 2022 +0200

    CAMEL-16030: camel-pulsar - Add async send to producer
---
 .../component/pulsar/PulsarConfiguration.java      |   5 +-
 .../camel/component/pulsar/PulsarProducer.java     | 145 +++++++++++++--------
 2 files changed, 96 insertions(+), 54 deletions(-)

diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index 1d9aa490e71..5fe9db9f06f 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -312,12 +312,15 @@ public class PulsarConfiguration implements Cloneable {
     }
 
     /**
-     * Set the number of max pending messages across all the partitions. Default is 50000.
+     * Set the number of max pending messages across all the partitions. Default is 50000. This option is deprecated and
+     * will be removed in future version.
      */
+    @Deprecated
     public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
         this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
     }
 
+    @Deprecated
     public int getMaxPendingMessagesAcrossPartitions() {
         return maxPendingMessagesAcrossPartitions;
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 52dcdc14dfb..100340eef2e 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,30 +16,35 @@
  */
 package org.apache.camel.component.pulsar;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConversionException;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PulsarProducer extends DefaultProducer {
+public class PulsarProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(PulsarProducer.class);
 
+    private final Object mutex = new Object();
     private final PulsarEndpoint pulsarEndpoint;
-    private Producer<byte[]> producer;
+    private volatile Producer<byte[]> producer;
 
     public PulsarProducer(PulsarEndpoint pulsarEndpoint) {
         super(pulsarEndpoint);
@@ -47,69 +52,103 @@ public class PulsarProducer extends DefaultProducer {
     }
 
     @Override
-    public void process(final Exchange exchange) throws Exception {
-        final Message message = exchange.getIn();
-
-        TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
-        byte[] body;
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            body = exchange.getContext().getTypeConverter()
-                    .mandatoryConvertTo(byte[].class, exchange, message.getBody());
-        } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
-            // fallback to try to serialize the data
-            body = PulsarMessageUtils.serialize(message.getBody());
-        }
-        messageBuilder.value(body);
+            final Message message = exchange.getIn();
+            byte[] body = serialize(exchange, message.getBody());
+            TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
+            messageBuilder.value(body);
 
-        String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
-        if (ObjectHelper.isNotEmpty(key)) {
-            messageBuilder.key(key);
-        }
+            String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
+            if (ObjectHelper.isNotEmpty(key)) {
+                messageBuilder.key(key);
+            }
 
-        Map<String, String> properties
-                = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
-        if (ObjectHelper.isNotEmpty(properties)) {
-            messageBuilder.properties(properties);
-        }
+            Map<String, String> properties
+                    = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
+            if (ObjectHelper.isNotEmpty(properties)) {
+                messageBuilder.properties(properties);
+            }
 
-        Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
-        if (eventTime != null) {
-            messageBuilder.eventTime(eventTime);
+            Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
+            if (eventTime != null) {
+                messageBuilder.eventTime(eventTime);
+            }
+
+            messageBuilder.sendAsync()
+                    .thenAccept(r -> exchange.getIn().setBody(r))
+                    .whenComplete(
+                            (r, e) -> {
+                                try {
+                                    if (e != null) {
+                                        exchange.setException(new CamelExchangeException(
+                                                "An error occurred while sending a message to pulsar", exchange, e));
+                                    }
+                                } finally {
+                                    callback.done(false);
+                                }
+                            });
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
+        return false;
+    }
 
-        messageBuilder.send();
+    /**
+     * Serialize the given content using the appropriate converter if any, otherwise it relies on
+     * {@link PulsarMessageUtils#serialize(Object)}.
+     *
+     * @param  exchange    the exchange used as context for the serialization process.
+     * @param  content     the content to serialize.
+     * @return             the serialized counterpart of the given content
+     * @throws IOException if an error occurs while serializing the content.
+     */
+    private static byte[] serialize(Exchange exchange, Object content) throws IOException {
+        byte[] result;
+        try {
+            result = exchange.getContext().getTypeConverter()
+                    .mandatoryConvertTo(byte[].class, exchange, content);
+        } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
+            // fallback to try to serialize the data
+            result = PulsarMessageUtils.serialize(content);
+        }
+        return result;
     }
 
-    private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException {
-        if (producer == null) {
-            final String topicUri = pulsarEndpoint.getUri();
-            PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
-            String producerName = configuration.getProducerName();
-            final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
-                    .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
-                    .blockIfQueueFull(configuration.isBlockIfQueueFull())
-                    .maxPendingMessages(configuration.getMaxPendingMessages())
-                    .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
-                    .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
-                    .batchingMaxMessages(configuration.getMaxPendingMessages())
-                    .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
-                    .initialSequenceId(configuration.getInitialSequenceId())
-                    .compressionType(configuration.getCompressionType());
-            if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
-                producerBuilder.messageRouter(configuration.getMessageRouter());
-            } else {
-                producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
-            }
-            if (producerName != null) {
-                producerBuilder.producerName(producerName);
+    private void createProducer() throws PulsarClientException {
+        synchronized (mutex) {
+            if (producer == null) {
+                final String topicUri = pulsarEndpoint.getUri();
+                PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
+                String producerName = configuration.getProducerName();
+                final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
+                        .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
+                        .blockIfQueueFull(configuration.isBlockIfQueueFull())
+                        .maxPendingMessages(configuration.getMaxPendingMessages())
+                        .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+                        .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
+                        .batchingMaxMessages(configuration.getMaxPendingMessages())
+                        .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
+                        .initialSequenceId(configuration.getInitialSequenceId())
+                        .compressionType(configuration.getCompressionType());
+                if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
+                    producerBuilder.messageRouter(configuration.getMessageRouter());
+                } else {
+                    producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
+                }
+                if (producerName != null) {
+                    producerBuilder.producerName(producerName);
+                }
+                producer = producerBuilder.create();
             }
-            producer = producerBuilder.create();
         }
     }
 
     @Override
     protected void doStart() throws Exception {
-        LOG.debug("Starting producer: {}", this);
+        LOG.debug("Starting the pulsar producer: {}", this);
         if (producer == null) {
             createProducer();
         }
@@ -117,7 +156,7 @@ public class PulsarProducer extends DefaultProducer {
 
     @Override
     protected void doStop() throws Exception {
-        LOG.debug("Stopping producer: {}", this);
+        LOG.debug("Stopping the pulsar producer: {}", this);
         if (producer != null) {
             producer.close();
             producer = null;