You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/10/12 15:04:34 UTC
[camel] branch main updated: CAMEL-16030: camel-pulsar - Add async send to producer (#8529)
This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new ae2f8e250c5 CAMEL-16030: camel-pulsar - Add async send to producer (#8529)
ae2f8e250c5 is described below
commit ae2f8e250c5ab7210e1b3ff5ee06aa68e28de2c4
Author: Nicolas Filotto <es...@users.noreply.github.com>
AuthorDate: Wed Oct 12 17:04:27 2022 +0200
CAMEL-16030: camel-pulsar - Add async send to producer (#8529)
## Motivation
The producer is synchronous today. But pulsar allows sending asynchronously, where a `CompletableFuture` is returned. We can leverage this for async send, and call `AsyncCallback` from the future.
## Modifications:
* Make `PulsarProducer` extend `DefaultAsyncProducer` and implement the corresponding `process` method
* Ensure the thread safety of the producer initialization (not directly related to the initial issue)
* Deprecate the option `maxPendingMessagesAcrossPartitions` as it is deprecated in the pulsar client
---
.../org/apache/camel/component/pulsar/pulsar.json | 4 +-
.../component/pulsar/PulsarConfiguration.java | 6 +-
.../camel/component/pulsar/PulsarProducer.java | 145 +++++++++++++--------
3 files changed, 99 insertions(+), 56 deletions(-)
diff --git a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index b95da9667fe..183bf83b416 100644
--- a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++ b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -57,7 +57,7 @@
"initialSequenceId": { "kind": "property", "displayName": "Initial Sequence Id", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": -1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "The first message published will have a sequence Id of initialSequenceId 1." },
"lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during star [...]
"maxPendingMessages": { "kind": "property", "displayName": "Max Pending Messages", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Size of the pending massages queue. When the queue is full, by default, any further sends wi [...]
- "maxPendingMessagesAcrossPartitions": { "kind": "property", "displayName": "Max Pending Messages Across Partitions", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 50000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "The maximum number of pending messages for partitioned to [...]
+ "maxPendingMessagesAcrossPartitions": { "kind": "property", "displayName": "Max Pending Messages Across Partitions", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": true, "autowired": false, "secret": false, "defaultValue": 50000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "The maximum number of pending messages for partitioned top [...]
"messageRouter": { "kind": "property", "displayName": "Message Router", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRouter", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Custom Message Router to use" },
"messageRoutingMode": { "kind": "property", "displayName": "Message Routing Mode", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRoutingMode", "enum": [ "SinglePartition", "RoundRobinPartition", "CustomPartition" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "RoundRobinPartition", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configuration [...]
"producerName": { "kind": "property", "displayName": "Producer Name", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Name of the producer. If unset, lets Pulsar select a unique identifier." },
@@ -121,7 +121,7 @@
"compressionType": { "kind": "parameter", "displayName": "Compression Type", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.CompressionType", "enum": [ "NONE", "LZ4", "ZLIB", "ZSTD", "SNAPPY" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "NONE", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description" [...]
"initialSequenceId": { "kind": "parameter", "displayName": "Initial Sequence Id", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": -1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "The first message published will have a sequence Id of initialSequenceId 1." },
"maxPendingMessages": { "kind": "parameter", "displayName": "Max Pending Messages", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Size of the pending massages queue. When the queue is full, by default, any further s [...]
- "maxPendingMessagesAcrossPartitions": { "kind": "parameter", "displayName": "Max Pending Messages Across Partitions", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 50000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "The maximum number of pending messages for partiti [...]
+ "maxPendingMessagesAcrossPartitions": { "kind": "parameter", "displayName": "Max Pending Messages Across Partitions", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": true, "autowired": false, "secret": false, "defaultValue": 50000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "The maximum number of pending messages for partitio [...]
"messageRouter": { "kind": "parameter", "displayName": "Message Router", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRouter", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Custom Message Router to use" },
"messageRoutingMode": { "kind": "parameter", "displayName": "Message Routing Mode", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRoutingMode", "enum": [ "SinglePartition", "RoundRobinPartition", "CustomPartition" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "RoundRobinPartition", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configuratio [...]
"producerName": { "kind": "parameter", "displayName": "Producer Name", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the producer. If unset, lets Pulsar select a unique identifier." },
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index 1d9aa490e71..fb5695ce294 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -102,6 +102,7 @@ public class PulsarConfiguration implements Cloneable {
description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if "
+ "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.",
defaultValue = "50000")
+ @Deprecated
private int maxPendingMessagesAcrossPartitions = 50000;
@UriParam(label = "producer",
description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.",
@@ -312,12 +313,15 @@ public class PulsarConfiguration implements Cloneable {
}
/**
- * Set the number of max pending messages across all the partitions. Default is 50000.
+ * Set the number of max pending messages across all the partitions. Default is 50000. This option is deprecated and
+ * will be removed in a future version.
*/
+ @Deprecated
public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
}
+ @Deprecated
public int getMaxPendingMessagesAcrossPartitions() {
return maxPendingMessagesAcrossPartitions;
}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 52dcdc14dfb..100340eef2e 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,30 +16,35 @@
*/
package org.apache.camel.component.pulsar;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.TypeConversionException;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PulsarProducer extends DefaultProducer {
+public class PulsarProducer extends DefaultAsyncProducer {
private static final Logger LOG = LoggerFactory.getLogger(PulsarProducer.class);
+ private final Object mutex = new Object();
private final PulsarEndpoint pulsarEndpoint;
- private Producer<byte[]> producer;
+ private volatile Producer<byte[]> producer;
public PulsarProducer(PulsarEndpoint pulsarEndpoint) {
super(pulsarEndpoint);
@@ -47,69 +52,103 @@ public class PulsarProducer extends DefaultProducer {
}
@Override
- public void process(final Exchange exchange) throws Exception {
- final Message message = exchange.getIn();
-
- TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
- byte[] body;
+ public boolean process(Exchange exchange, AsyncCallback callback) {
try {
- body = exchange.getContext().getTypeConverter()
- .mandatoryConvertTo(byte[].class, exchange, message.getBody());
- } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
- // fallback to try to serialize the data
- body = PulsarMessageUtils.serialize(message.getBody());
- }
- messageBuilder.value(body);
+ final Message message = exchange.getIn();
+ byte[] body = serialize(exchange, message.getBody());
+ TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
+ messageBuilder.value(body);
- String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
- if (ObjectHelper.isNotEmpty(key)) {
- messageBuilder.key(key);
- }
+ String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
+ if (ObjectHelper.isNotEmpty(key)) {
+ messageBuilder.key(key);
+ }
- Map<String, String> properties
- = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
- if (ObjectHelper.isNotEmpty(properties)) {
- messageBuilder.properties(properties);
- }
+ Map<String, String> properties
+ = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
+ if (ObjectHelper.isNotEmpty(properties)) {
+ messageBuilder.properties(properties);
+ }
- Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
- if (eventTime != null) {
- messageBuilder.eventTime(eventTime);
+ Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
+ if (eventTime != null) {
+ messageBuilder.eventTime(eventTime);
+ }
+
+ messageBuilder.sendAsync()
+ .thenAccept(r -> exchange.getIn().setBody(r))
+ .whenComplete(
+ (r, e) -> {
+ try {
+ if (e != null) {
+ exchange.setException(new CamelExchangeException(
+ "An error occurred while sending a message to pulsar", exchange, e));
+ }
+ } finally {
+ callback.done(false);
+ }
+ });
+ } catch (Exception e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
}
+ return false;
+ }
- messageBuilder.send();
+ /**
+ * Serialize the given content using the appropriate converter if any, otherwise it relies on
+ * {@link PulsarMessageUtils#serialize(Object)}.
+ *
+ * @param exchange the exchange used as context for the serialization process.
+ * @param content the content to serialize.
+ * @return the serialized counterpart of the given content
+ * @throws IOException if an error occurs while serializing the content.
+ */
+ private static byte[] serialize(Exchange exchange, Object content) throws IOException {
+ byte[] result;
+ try {
+ result = exchange.getContext().getTypeConverter()
+ .mandatoryConvertTo(byte[].class, exchange, content);
+ } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
+ // fallback to try to serialize the data
+ result = PulsarMessageUtils.serialize(content);
+ }
+ return result;
}
- private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException {
- if (producer == null) {
- final String topicUri = pulsarEndpoint.getUri();
- PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
- String producerName = configuration.getProducerName();
- final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
- .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
- .blockIfQueueFull(configuration.isBlockIfQueueFull())
- .maxPendingMessages(configuration.getMaxPendingMessages())
- .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
- .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
- .batchingMaxMessages(configuration.getMaxPendingMessages())
- .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
- .initialSequenceId(configuration.getInitialSequenceId())
- .compressionType(configuration.getCompressionType());
- if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
- producerBuilder.messageRouter(configuration.getMessageRouter());
- } else {
- producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
- }
- if (producerName != null) {
- producerBuilder.producerName(producerName);
+ private void createProducer() throws PulsarClientException {
+ synchronized (mutex) {
+ if (producer == null) {
+ final String topicUri = pulsarEndpoint.getUri();
+ PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
+ String producerName = configuration.getProducerName();
+ final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
+ .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
+ .blockIfQueueFull(configuration.isBlockIfQueueFull())
+ .maxPendingMessages(configuration.getMaxPendingMessages())
+ .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+ .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
+ .batchingMaxMessages(configuration.getMaxPendingMessages())
+ .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
+ .initialSequenceId(configuration.getInitialSequenceId())
+ .compressionType(configuration.getCompressionType());
+ if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
+ producerBuilder.messageRouter(configuration.getMessageRouter());
+ } else {
+ producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
+ }
+ if (producerName != null) {
+ producerBuilder.producerName(producerName);
+ }
+ producer = producerBuilder.create();
}
- producer = producerBuilder.create();
}
}
@Override
protected void doStart() throws Exception {
- LOG.debug("Starting producer: {}", this);
+ LOG.debug("Starting the pulsar producer: {}", this);
if (producer == null) {
createProducer();
}
@@ -117,7 +156,7 @@ public class PulsarProducer extends DefaultProducer {
@Override
protected void doStop() throws Exception {
- LOG.debug("Stopping producer: {}", this);
+ LOG.debug("Stopping the pulsar producer: {}", this);
if (producer != null) {
producer.close();
producer = null;