You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/10/12 13:12:18 UTC
[camel] 01/01: CAMEL-16030: camel-pulsar - Add async send to producer
This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch CAMEL-16030/async-producer
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5bf94a8f578a867531fb2f743c053b74e338f317
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Wed Oct 12 15:11:53 2022 +0200
CAMEL-16030: camel-pulsar - Add async send to producer
---
.../component/pulsar/PulsarConfiguration.java | 5 +-
.../camel/component/pulsar/PulsarProducer.java | 145 +++++++++++++--------
2 files changed, 96 insertions(+), 54 deletions(-)
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index 1d9aa490e71..5fe9db9f06f 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -312,12 +312,15 @@ public class PulsarConfiguration implements Cloneable {
}
/**
- * Set the number of max pending messages across all the partitions. Default is 50000.
+ * Set the number of max pending messages across all the partitions. Default is 50000. This option is deprecated and
+ * will be removed in future version.
*/
+ @Deprecated
public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
}
+ @Deprecated
public int getMaxPendingMessagesAcrossPartitions() {
return maxPendingMessagesAcrossPartitions;
}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 52dcdc14dfb..100340eef2e 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,30 +16,35 @@
*/
package org.apache.camel.component.pulsar;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.TypeConversionException;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PulsarProducer extends DefaultProducer {
+public class PulsarProducer extends DefaultAsyncProducer {
private static final Logger LOG = LoggerFactory.getLogger(PulsarProducer.class);
+ private final Object mutex = new Object();
private final PulsarEndpoint pulsarEndpoint;
- private Producer<byte[]> producer;
+ private volatile Producer<byte[]> producer;
public PulsarProducer(PulsarEndpoint pulsarEndpoint) {
super(pulsarEndpoint);
@@ -47,69 +52,103 @@ public class PulsarProducer extends DefaultProducer {
}
@Override
- public void process(final Exchange exchange) throws Exception {
- final Message message = exchange.getIn();
-
- TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
- byte[] body;
+ public boolean process(Exchange exchange, AsyncCallback callback) {
try {
- body = exchange.getContext().getTypeConverter()
- .mandatoryConvertTo(byte[].class, exchange, message.getBody());
- } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
- // fallback to try to serialize the data
- body = PulsarMessageUtils.serialize(message.getBody());
- }
- messageBuilder.value(body);
+ final Message message = exchange.getIn();
+ byte[] body = serialize(exchange, message.getBody());
+ TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
+ messageBuilder.value(body);
- String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
- if (ObjectHelper.isNotEmpty(key)) {
- messageBuilder.key(key);
- }
+ String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
+ if (ObjectHelper.isNotEmpty(key)) {
+ messageBuilder.key(key);
+ }
- Map<String, String> properties
- = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
- if (ObjectHelper.isNotEmpty(properties)) {
- messageBuilder.properties(properties);
- }
+ Map<String, String> properties
+ = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
+ if (ObjectHelper.isNotEmpty(properties)) {
+ messageBuilder.properties(properties);
+ }
- Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
- if (eventTime != null) {
- messageBuilder.eventTime(eventTime);
+ Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
+ if (eventTime != null) {
+ messageBuilder.eventTime(eventTime);
+ }
+
+ messageBuilder.sendAsync()
+ .thenAccept(r -> exchange.getIn().setBody(r))
+ .whenComplete(
+ (r, e) -> {
+ try {
+ if (e != null) {
+ exchange.setException(new CamelExchangeException(
+ "An error occurred while sending a message to pulsar", exchange, e));
+ }
+ } finally {
+ callback.done(false);
+ }
+ });
+ } catch (Exception e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
}
+ return false;
+ }
- messageBuilder.send();
+ /**
+ * Serialize the given content using the appropriate converter if any, otherwise it relies on
+ * {@link PulsarMessageUtils#serialize(Object)}.
+ *
+ * @param exchange the exchange used as context for the serialization process.
+ * @param content the content to serialize.
+ * @return the serialized counterpart of the given content
+ * @throws IOException if an error occurs while serializing the content.
+ */
+ private static byte[] serialize(Exchange exchange, Object content) throws IOException {
+ byte[] result;
+ try {
+ result = exchange.getContext().getTypeConverter()
+ .mandatoryConvertTo(byte[].class, exchange, content);
+ } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
+ // fallback to try to serialize the data
+ result = PulsarMessageUtils.serialize(content);
+ }
+ return result;
}
- private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException {
- if (producer == null) {
- final String topicUri = pulsarEndpoint.getUri();
- PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
- String producerName = configuration.getProducerName();
- final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
- .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
- .blockIfQueueFull(configuration.isBlockIfQueueFull())
- .maxPendingMessages(configuration.getMaxPendingMessages())
- .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
- .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
- .batchingMaxMessages(configuration.getMaxPendingMessages())
- .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
- .initialSequenceId(configuration.getInitialSequenceId())
- .compressionType(configuration.getCompressionType());
- if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
- producerBuilder.messageRouter(configuration.getMessageRouter());
- } else {
- producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
- }
- if (producerName != null) {
- producerBuilder.producerName(producerName);
+ private void createProducer() throws PulsarClientException {
+ synchronized (mutex) {
+ if (producer == null) {
+ final String topicUri = pulsarEndpoint.getUri();
+ PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
+ String producerName = configuration.getProducerName();
+ final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
+ .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
+ .blockIfQueueFull(configuration.isBlockIfQueueFull())
+ .maxPendingMessages(configuration.getMaxPendingMessages())
+ .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+ .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
+ .batchingMaxMessages(configuration.getMaxPendingMessages())
+ .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
+ .initialSequenceId(configuration.getInitialSequenceId())
+ .compressionType(configuration.getCompressionType());
+ if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
+ producerBuilder.messageRouter(configuration.getMessageRouter());
+ } else {
+ producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
+ }
+ if (producerName != null) {
+ producerBuilder.producerName(producerName);
+ }
+ producer = producerBuilder.create();
}
- producer = producerBuilder.create();
}
}
@Override
protected void doStart() throws Exception {
- LOG.debug("Starting producer: {}", this);
+ LOG.debug("Starting the pulsar producer: {}", this);
if (producer == null) {
createProducer();
}
@@ -117,7 +156,7 @@ public class PulsarProducer extends DefaultProducer {
@Override
protected void doStop() throws Exception {
- LOG.debug("Stopping producer: {}", this);
+ LOG.debug("Stopping the pulsar producer: {}", this);
if (producer != null) {
producer.close();
producer = null;