You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/10/12 15:04:34 UTC

[camel] branch main updated: CAMEL-16030: camel-pulsar - Add async send to producer (#8529)

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new ae2f8e250c5 CAMEL-16030: camel-pulsar - Add async send to producer (#8529)
ae2f8e250c5 is described below

commit ae2f8e250c5ab7210e1b3ff5ee06aa68e28de2c4
Author: Nicolas Filotto <es...@users.noreply.github.com>
AuthorDate: Wed Oct 12 17:04:27 2022 +0200

    CAMEL-16030: camel-pulsar - Add async send to producer (#8529)
    
    ## Motivation
    
    The producer is synchronous today. But pulsar allows sending asynchronously, where a `CompletableFuture` is returned. We can leverage this for async send, and call `AsyncCallback` from the future.
    
    ## Modifications:
    
    * Make `PulsarProducer` extend `DefaultAsyncProducer` and implement the corresponding `process` method
    * Ensure the thread safety of the producer initialization (not directly related to the initial issue)
    * Deprecate the option `maxPendingMessagesAcrossPartitions` as it is deprecated in the pulsar client
---
 .../org/apache/camel/component/pulsar/pulsar.json  |   4 +-
 .../component/pulsar/PulsarConfiguration.java      |   6 +-
 .../camel/component/pulsar/PulsarProducer.java     | 145 +++++++++++++--------
 3 files changed, 99 insertions(+), 56 deletions(-)

diff --git a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index b95da9667fe..183bf83b416 100644
--- a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++ b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -57,7 +57,7 @@
     "initialSequenceId": { "kind": "property", "displayName": "Initial Sequence Id", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": -1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "The first message published will have a sequence Id of initialSequenceId 1." },
     "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during star [...]
     "maxPendingMessages": { "kind": "property", "displayName": "Max Pending Messages", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Size of the pending massages queue. When the queue is full, by default, any further sends wi [...]
-    "maxPendingMessagesAcrossPartitions": { "kind": "property", "displayName": "Max Pending Messages Across Partitions", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 50000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "The maximum number of pending messages for partitioned to [...]
+    "maxPendingMessagesAcrossPartitions": { "kind": "property", "displayName": "Max Pending Messages Across Partitions", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": true, "autowired": false, "secret": false, "defaultValue": 50000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "The maximum number of pending messages for partitioned top [...]
     "messageRouter": { "kind": "property", "displayName": "Message Router", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRouter", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Custom Message Router to use" },
     "messageRoutingMode": { "kind": "property", "displayName": "Message Routing Mode", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRoutingMode", "enum": [ "SinglePartition", "RoundRobinPartition", "CustomPartition" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "RoundRobinPartition", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configuration [...]
     "producerName": { "kind": "property", "displayName": "Producer Name", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Name of the producer. If unset, lets Pulsar select a unique identifier." },
@@ -121,7 +121,7 @@
     "compressionType": { "kind": "parameter", "displayName": "Compression Type", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.CompressionType", "enum": [ "NONE", "LZ4", "ZLIB", "ZSTD", "SNAPPY" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "NONE", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description" [...]
     "initialSequenceId": { "kind": "parameter", "displayName": "Initial Sequence Id", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": -1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "The first message published will have a sequence Id of initialSequenceId 1." },
     "maxPendingMessages": { "kind": "parameter", "displayName": "Max Pending Messages", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Size of the pending massages queue. When the queue is full, by default, any further s [...]
-    "maxPendingMessagesAcrossPartitions": { "kind": "parameter", "displayName": "Max Pending Messages Across Partitions", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 50000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "The maximum number of pending messages for partiti [...]
+    "maxPendingMessagesAcrossPartitions": { "kind": "parameter", "displayName": "Max Pending Messages Across Partitions", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": true, "autowired": false, "secret": false, "defaultValue": 50000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "The maximum number of pending messages for partitio [...]
     "messageRouter": { "kind": "parameter", "displayName": "Message Router", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRouter", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Custom Message Router to use" },
     "messageRoutingMode": { "kind": "parameter", "displayName": "Message Routing Mode", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRoutingMode", "enum": [ "SinglePartition", "RoundRobinPartition", "CustomPartition" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "RoundRobinPartition", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configuratio [...]
     "producerName": { "kind": "parameter", "displayName": "Producer Name", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the producer. If unset, lets Pulsar select a unique identifier." },
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index 1d9aa490e71..fb5695ce294 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -102,6 +102,7 @@ public class PulsarConfiguration implements Cloneable {
               description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if "
                             + "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.",
               defaultValue = "50000")
+    @Deprecated
     private int maxPendingMessagesAcrossPartitions = 50000;
     @UriParam(label = "producer",
               description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.",
@@ -312,12 +313,15 @@ public class PulsarConfiguration implements Cloneable {
     }
 
     /**
-     * Set the number of max pending messages across all the partitions. Default is 50000.
+     * Set the number of max pending messages across all the partitions. Default is 50000. This option is deprecated and
+     * will be removed in a future version.
      */
+    @Deprecated
     public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
         this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
     }
 
+    @Deprecated
     public int getMaxPendingMessagesAcrossPartitions() {
         return maxPendingMessagesAcrossPartitions;
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 52dcdc14dfb..100340eef2e 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,30 +16,35 @@
  */
 package org.apache.camel.component.pulsar;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConversionException;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PulsarProducer extends DefaultProducer {
+public class PulsarProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(PulsarProducer.class);
 
+    private final Object mutex = new Object();
     private final PulsarEndpoint pulsarEndpoint;
-    private Producer<byte[]> producer;
+    private volatile Producer<byte[]> producer;
 
     public PulsarProducer(PulsarEndpoint pulsarEndpoint) {
         super(pulsarEndpoint);
@@ -47,69 +52,103 @@ public class PulsarProducer extends DefaultProducer {
     }
 
     @Override
-    public void process(final Exchange exchange) throws Exception {
-        final Message message = exchange.getIn();
-
-        TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
-        byte[] body;
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            body = exchange.getContext().getTypeConverter()
-                    .mandatoryConvertTo(byte[].class, exchange, message.getBody());
-        } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
-            // fallback to try to serialize the data
-            body = PulsarMessageUtils.serialize(message.getBody());
-        }
-        messageBuilder.value(body);
+            final Message message = exchange.getIn();
+            byte[] body = serialize(exchange, message.getBody());
+            TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
+            messageBuilder.value(body);
 
-        String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
-        if (ObjectHelper.isNotEmpty(key)) {
-            messageBuilder.key(key);
-        }
+            String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
+            if (ObjectHelper.isNotEmpty(key)) {
+                messageBuilder.key(key);
+            }
 
-        Map<String, String> properties
-                = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
-        if (ObjectHelper.isNotEmpty(properties)) {
-            messageBuilder.properties(properties);
-        }
+            Map<String, String> properties
+                    = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
+            if (ObjectHelper.isNotEmpty(properties)) {
+                messageBuilder.properties(properties);
+            }
 
-        Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
-        if (eventTime != null) {
-            messageBuilder.eventTime(eventTime);
+            Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
+            if (eventTime != null) {
+                messageBuilder.eventTime(eventTime);
+            }
+
+            messageBuilder.sendAsync()
+                    .thenAccept(r -> exchange.getIn().setBody(r))
+                    .whenComplete(
+                            (r, e) -> {
+                                try {
+                                    if (e != null) {
+                                        exchange.setException(new CamelExchangeException(
+                                                "An error occurred while sending a message to pulsar", exchange, e));
+                                    }
+                                } finally {
+                                    callback.done(false);
+                                }
+                            });
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
+        return false;
+    }
 
-        messageBuilder.send();
+    /**
+     * Serialize the given content using the appropriate converter if any, otherwise it relies on
+     * {@link PulsarMessageUtils#serialize(Object)}.
+     *
+     * @param  exchange    the exchange used as context for the serialization process.
+     * @param  content     the content to serialize.
+     * @return             the serialized counterpart of the given content
+     * @throws IOException if an error occurs while serializing the content.
+     */
+    private static byte[] serialize(Exchange exchange, Object content) throws IOException {
+        byte[] result;
+        try {
+            result = exchange.getContext().getTypeConverter()
+                    .mandatoryConvertTo(byte[].class, exchange, content);
+        } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
+            // fallback to try to serialize the data
+            result = PulsarMessageUtils.serialize(content);
+        }
+        return result;
     }
 
-    private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException {
-        if (producer == null) {
-            final String topicUri = pulsarEndpoint.getUri();
-            PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
-            String producerName = configuration.getProducerName();
-            final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
-                    .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
-                    .blockIfQueueFull(configuration.isBlockIfQueueFull())
-                    .maxPendingMessages(configuration.getMaxPendingMessages())
-                    .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
-                    .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
-                    .batchingMaxMessages(configuration.getMaxPendingMessages())
-                    .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
-                    .initialSequenceId(configuration.getInitialSequenceId())
-                    .compressionType(configuration.getCompressionType());
-            if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
-                producerBuilder.messageRouter(configuration.getMessageRouter());
-            } else {
-                producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
-            }
-            if (producerName != null) {
-                producerBuilder.producerName(producerName);
+    private void createProducer() throws PulsarClientException {
+        synchronized (mutex) {
+            if (producer == null) {
+                final String topicUri = pulsarEndpoint.getUri();
+                PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
+                String producerName = configuration.getProducerName();
+                final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
+                        .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
+                        .blockIfQueueFull(configuration.isBlockIfQueueFull())
+                        .maxPendingMessages(configuration.getMaxPendingMessages())
+                        .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+                        .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
+                        .batchingMaxMessages(configuration.getMaxPendingMessages())
+                        .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
+                        .initialSequenceId(configuration.getInitialSequenceId())
+                        .compressionType(configuration.getCompressionType());
+                if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
+                    producerBuilder.messageRouter(configuration.getMessageRouter());
+                } else {
+                    producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
+                }
+                if (producerName != null) {
+                    producerBuilder.producerName(producerName);
+                }
+                producer = producerBuilder.create();
             }
-            producer = producerBuilder.create();
         }
     }
 
     @Override
     protected void doStart() throws Exception {
-        LOG.debug("Starting producer: {}", this);
+        LOG.debug("Starting the pulsar producer: {}", this);
         if (producer == null) {
             createProducer();
         }
@@ -117,7 +156,7 @@ public class PulsarProducer extends DefaultProducer {
 
     @Override
     protected void doStop() throws Exception {
-        LOG.debug("Stopping producer: {}", this);
+        LOG.debug("Stopping the pulsar producer: {}", this);
         if (producer != null) {
             producer.close();
             producer = null;