You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/10/12 13:13:19 UTC

[camel] branch CAMEL-16030/async-producer updated (5bf94a8f578 -> e58c997431a)

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a change to branch CAMEL-16030/async-producer
in repository https://gitbox.apache.org/repos/asf/camel.git


 discard 5bf94a8f578 CAMEL-16030: camel-pulsar - Add async send to producer
     new e58c997431a CAMEL-16030: camel-pulsar - Add async send to producer

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (5bf94a8f578)
            \
             N -- N -- N   refs/heads/CAMEL-16030/async-producer (e58c997431a)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/camel/component/pulsar/PulsarConfiguration.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[camel] 01/01: CAMEL-16030: camel-pulsar - Add async send to producer

Posted by nf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-16030/async-producer
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e58c997431a120a5d8b45d291a80d30955298977
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Wed Oct 12 15:11:53 2022 +0200

    CAMEL-16030: camel-pulsar - Add async send to producer
---
 .../component/pulsar/PulsarConfiguration.java      |   5 +-
 .../camel/component/pulsar/PulsarProducer.java     | 145 +++++++++++++--------
 2 files changed, 96 insertions(+), 54 deletions(-)

diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index 1d9aa490e71..8c475284911 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -312,12 +312,15 @@ public class PulsarConfiguration implements Cloneable {
     }
 
     /**
-     * Set the number of max pending messages across all the partitions. Default is 50000.
+     * Set the number of max pending messages across all the partitions. Default is 50000. This option is deprecated and
+     * will be removed in a future version.
      */
+    @Deprecated
     public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
         this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
     }
 
+    @Deprecated
     public int getMaxPendingMessagesAcrossPartitions() {
         return maxPendingMessagesAcrossPartitions;
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 52dcdc14dfb..100340eef2e 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,30 +16,35 @@
  */
 package org.apache.camel.component.pulsar;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConversionException;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PulsarProducer extends DefaultProducer {
+public class PulsarProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(PulsarProducer.class);
 
+    private final Object mutex = new Object();
     private final PulsarEndpoint pulsarEndpoint;
-    private Producer<byte[]> producer;
+    private volatile Producer<byte[]> producer;
 
     public PulsarProducer(PulsarEndpoint pulsarEndpoint) {
         super(pulsarEndpoint);
@@ -47,69 +52,103 @@ public class PulsarProducer extends DefaultProducer {
     }
 
     @Override
-    public void process(final Exchange exchange) throws Exception {
-        final Message message = exchange.getIn();
-
-        TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
-        byte[] body;
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            body = exchange.getContext().getTypeConverter()
-                    .mandatoryConvertTo(byte[].class, exchange, message.getBody());
-        } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
-            // fallback to try to serialize the data
-            body = PulsarMessageUtils.serialize(message.getBody());
-        }
-        messageBuilder.value(body);
+            final Message message = exchange.getIn();
+            byte[] body = serialize(exchange, message.getBody());
+            TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
+            messageBuilder.value(body);
 
-        String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
-        if (ObjectHelper.isNotEmpty(key)) {
-            messageBuilder.key(key);
-        }
+            String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
+            if (ObjectHelper.isNotEmpty(key)) {
+                messageBuilder.key(key);
+            }
 
-        Map<String, String> properties
-                = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
-        if (ObjectHelper.isNotEmpty(properties)) {
-            messageBuilder.properties(properties);
-        }
+            Map<String, String> properties
+                    = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
+            if (ObjectHelper.isNotEmpty(properties)) {
+                messageBuilder.properties(properties);
+            }
 
-        Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
-        if (eventTime != null) {
-            messageBuilder.eventTime(eventTime);
+            Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
+            if (eventTime != null) {
+                messageBuilder.eventTime(eventTime);
+            }
+
+            messageBuilder.sendAsync()
+                    .thenAccept(r -> exchange.getIn().setBody(r))
+                    .whenComplete(
+                            (r, e) -> {
+                                try {
+                                    if (e != null) {
+                                        exchange.setException(new CamelExchangeException(
+                                                "An error occurred while sending a message to pulsar", exchange, e));
+                                    }
+                                } finally {
+                                    callback.done(false);
+                                }
+                            });
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
+        return false;
+    }
 
-        messageBuilder.send();
+    /**
+     * Serialize the given content using the appropriate converter if any, otherwise it relies on
+     * {@link PulsarMessageUtils#serialize(Object)}.
+     *
+     * @param  exchange    the exchange used as context for the serialization process.
+     * @param  content     the content to serialize.
+     * @return             the serialized counterpart of the given content
+     * @throws IOException if an error occurs while serializing the content.
+     */
+    private static byte[] serialize(Exchange exchange, Object content) throws IOException {
+        byte[] result;
+        try {
+            result = exchange.getContext().getTypeConverter()
+                    .mandatoryConvertTo(byte[].class, exchange, content);
+        } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
+            // fallback to try to serialize the data
+            result = PulsarMessageUtils.serialize(content);
+        }
+        return result;
     }
 
-    private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException {
-        if (producer == null) {
-            final String topicUri = pulsarEndpoint.getUri();
-            PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
-            String producerName = configuration.getProducerName();
-            final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
-                    .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
-                    .blockIfQueueFull(configuration.isBlockIfQueueFull())
-                    .maxPendingMessages(configuration.getMaxPendingMessages())
-                    .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
-                    .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
-                    .batchingMaxMessages(configuration.getMaxPendingMessages())
-                    .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
-                    .initialSequenceId(configuration.getInitialSequenceId())
-                    .compressionType(configuration.getCompressionType());
-            if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
-                producerBuilder.messageRouter(configuration.getMessageRouter());
-            } else {
-                producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
-            }
-            if (producerName != null) {
-                producerBuilder.producerName(producerName);
+    private void createProducer() throws PulsarClientException {
+        synchronized (mutex) {
+            if (producer == null) {
+                final String topicUri = pulsarEndpoint.getUri();
+                PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
+                String producerName = configuration.getProducerName();
+                final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
+                        .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
+                        .blockIfQueueFull(configuration.isBlockIfQueueFull())
+                        .maxPendingMessages(configuration.getMaxPendingMessages())
+                        .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+                        .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
+                        .batchingMaxMessages(configuration.getMaxPendingMessages())
+                        .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
+                        .initialSequenceId(configuration.getInitialSequenceId())
+                        .compressionType(configuration.getCompressionType());
+                if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
+                    producerBuilder.messageRouter(configuration.getMessageRouter());
+                } else {
+                    producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
+                }
+                if (producerName != null) {
+                    producerBuilder.producerName(producerName);
+                }
+                producer = producerBuilder.create();
             }
-            producer = producerBuilder.create();
         }
     }
 
     @Override
     protected void doStart() throws Exception {
-        LOG.debug("Starting producer: {}", this);
+        LOG.debug("Starting the pulsar producer: {}", this);
         if (producer == null) {
             createProducer();
         }
@@ -117,7 +156,7 @@ public class PulsarProducer extends DefaultProducer {
 
     @Override
     protected void doStop() throws Exception {
-        LOG.debug("Stopping producer: {}", this);
+        LOG.debug("Stopping the pulsar producer: {}", this);
         if (producer != null) {
             producer.close();
             producer = null;