You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/12/01 05:54:19 UTC
[camel] branch main updated: CAMEL-17240: cleanup the camel-kafka producer code (#6491)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 0ddc8e3 CAMEL-17240: cleanup the camel-kafka producer code (#6491)
0ddc8e3 is described below
commit 0ddc8e33b0d1035478239875424388a5f676ca6e
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Wed Dec 1 06:53:35 2021 +0100
CAMEL-17240: cleanup the camel-kafka producer code (#6491)
Includes:
- rework the producer to reduce the number of allocations and GC
pressure
- replace AtomicInteger with LongAdder to reduce risk of contention in AtomicInteger (and a minor improvement in performance)
- logging improvements in the hot path
- rework the callbacks to prevent creating threads unnecessarily
- avoid creating arrays/collections for the delegating callback
- use collection singleton for single-item arrays when possible
- cleanup to use generics in some of the code
- use an assert to check the internal worker pool for not being null as it seems faster
- do a single pass of the set when iterating over it to get the propagated headers
- cleanup the record iterator to simplify and reduce calls to getHeader
---
.../camel/component/kafka/KafkaProducer.java | 347 ++++++++++++++-------
.../kafka/producer/support/DelegatingCallback.java | 15 +-
.../producer/support/KafkaProducerCallBack.java | 86 +++--
...ack.java => KafkaProducerMetadataCallBack.java} | 23 +-
.../producer/support/KeyValueHolderIterator.java | 117 +++----
.../kafka/producer/support/ProducerUtil.java | 32 ++
6 files changed, 389 insertions(+), 231 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 5560514..a89ecdd 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -18,31 +18,29 @@ package org.apache.camel.component.kafka;
import java.net.URI;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.stream.Collectors;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.kafka.producer.support.DelegatingCallback;
import org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack;
+import org.apache.camel.component.kafka.producer.support.KafkaProducerMetadataCallBack;
import org.apache.camel.component.kafka.producer.support.KeyValueHolderIterator;
+import org.apache.camel.component.kafka.producer.support.ProducerUtil;
import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
-import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
@@ -60,20 +58,29 @@ public class KafkaProducer extends DefaultAsyncProducer {
@SuppressWarnings("rawtypes")
private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
private final KafkaEndpoint endpoint;
+ private final KafkaConfiguration configuration;
private ExecutorService workerPool;
private boolean shutdownWorkerPool;
private volatile boolean closeKafkaProducer;
+ private final String endpointTopic;
+ private final Integer configPartitionKey;
+ private final String configKey;
public KafkaProducer(KafkaEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
+ this.configuration = endpoint.getConfiguration();
+
+ endpointTopic = URISupport.extractRemainderPath(URI.create(endpoint.getEndpointUri()), true);
+ configPartitionKey = configuration.getPartitionKey();
+ configKey = configuration.getKey();
}
Properties getProps() {
- Properties props = endpoint.getConfiguration().createProducerProperties();
+ Properties props = configuration.createProducerProperties();
endpoint.updateClassProperties(props);
- String brokers = endpoint.getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
+ String brokers = endpoint.getKafkaClientFactory().getBrokers(configuration);
if (brokers != null) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
}
@@ -111,7 +118,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
}
// if we are in asynchronous mode we need a worker pool
- if (!endpoint.getConfiguration().isSynchronous() && workerPool == null) {
+ if (!configuration.isSynchronous() && workerPool == null) {
workerPool = endpoint.createProducerExecutor();
// we create a thread pool so we should also shut it down
shutdownWorkerPool = true;
@@ -143,167 +150,281 @@ public class KafkaProducer extends DefaultAsyncProducer {
}
if (shutdownWorkerPool && workerPool != null) {
- int timeout = endpoint.getConfiguration().getShutdownTimeout();
+ int timeout = configuration.getShutdownTimeout();
LOG.debug("Shutting down Kafka producer worker threads with timeout {} millis", timeout);
endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(workerPool, timeout);
workerPool = null;
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
- protected Iterator<KeyValueHolder<Object, ProducerRecord>> createRecorder(Exchange exchange) throws Exception {
- String topic = endpoint.getConfiguration().getTopic();
- Long timeStamp = null;
+ protected Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> createRecordIterable(
+ Exchange exchange, Message message) {
+ String topic = evaluateTopic(message);
- // must remove header so its not propagated
- Object overrideTopic = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TOPIC);
- if (overrideTopic != null) {
- LOG.debug("Using override topic: {}", overrideTopic);
- topic = overrideTopic.toString();
- }
+ // extracting headers which need to be propagated
+ List<Header> propagatedHeaders = getPropagatedHeaders(exchange, message);
- if (topic == null) {
- // if topic property was not received from configuration or header
- // parameters take it from the remaining URI
- topic = URISupport.extractRemainderPath(new URI(endpoint.getEndpointUri()), true);
- }
+ Object body = message.getBody();
- Object overrideTimeStamp = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
+ Iterator<Object> iterator = getObjectIterator(body);
+
+ return new KeyValueHolderIterator(iterator, exchange, configuration, topic, propagatedHeaders);
+ }
+
+ protected ProducerRecord<Object, Object> createRecord(Exchange exchange, Message message) {
+ String topic = evaluateTopic(message);
+
+ Long timeStamp = null;
+ Object overrideTimeStamp = message.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
if (overrideTimeStamp instanceof Long) {
LOG.debug("Using override TimeStamp: {}", overrideTimeStamp);
timeStamp = (Long) overrideTimeStamp;
}
// extracting headers which need to be propagated
- List<Header> propagatedHeaders = getPropagatedHeaders(exchange, endpoint.getConfiguration());
+ List<Header> propagatedHeaders = getPropagatedHeaders(exchange, message);
- Object msg = exchange.getIn().getBody();
+ final Integer msgPartitionKey = getOverridePartitionKey(message);
+ Object msgKey = getOverrideKey(message);
- // is the message body a list or something that contains multiple values
- Iterator<Object> iterator = null;
- if (msg instanceof Iterable) {
- iterator = ((Iterable<Object>) msg).iterator();
- } else if (msg instanceof Iterator) {
- iterator = (Iterator<Object>) msg;
+ if (msgKey != null) {
+ msgKey = tryConvertToSerializedType(exchange, msgKey, configuration.getKeySerializer());
+ }
+
+ // must convert each entry of the iterator into the value according to
+ // the serializer
+ Object value = tryConvertToSerializedType(exchange, message.getBody(), configuration.getValueSerializer());
+
+ return new ProducerRecord<>(topic, msgPartitionKey, timeStamp, msgKey, value, propagatedHeaders);
+ }
+
+ private Object getOverrideKey(Message message) {
+ if (ObjectHelper.isEmpty(configKey)) {
+ return message.getHeader(KafkaConstants.KEY);
}
- if (iterator != null) {
- final Iterator<Object> msgList = iterator;
- final String msgTopic = topic;
- return new KeyValueHolderIterator(msgList, exchange, endpoint.getConfiguration(), msgTopic, propagatedHeaders);
+ return configKey;
+ }
+
+ private Integer getOverridePartitionKey(Message message) {
+ if (ObjectHelper.isEmpty(configPartitionKey)) {
+ return message.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
}
- // endpoint take precedence over header configuration
- final Integer partitionKey = ObjectHelper.supplyIfEmpty(endpoint.getConfiguration().getPartitionKey(),
- () -> exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class));
+ return configPartitionKey;
+ }
- // endpoint take precedence over header configuration
- Object key = ObjectHelper.supplyIfEmpty(endpoint.getConfiguration().getKey(),
- () -> exchange.getIn().getHeader(KafkaConstants.KEY));
+ protected KeyValueHolder<Object, ProducerRecord<Object, Object>> createKeyValueHolder(Exchange exchange, Message message) {
+ ProducerRecord<Object, Object> record = createRecord(exchange, message);
- if (key != null) {
- key = tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializer());
+ return new KeyValueHolder<>(exchange, record);
+ }
+
+ private String evaluateTopic(Message message) {
+ // must remove header so it's not propagated.
+ Object overrideTopic = message.removeHeader(KafkaConstants.OVERRIDE_TOPIC);
+ if (overrideTopic != null) {
+ LOG.debug("Using override topic: {}", overrideTopic);
+ return overrideTopic.toString();
}
- // must convert each entry of the iterator into the value according to
- // the serializer
- Object value = tryConvertToSerializedType(exchange, msg, endpoint.getConfiguration().getValueSerializer());
+ String topic = configuration.getTopic();
+ if (topic != null) {
+ return topic;
+ }
+
+ return endpointTopic;
+ }
+
+ private boolean isIterable(Object body) {
+ if (body instanceof Iterable || body instanceof Iterator) {
+ return true;
+ }
+
+ return false;
+ }
- ProducerRecord record = new ProducerRecord(topic, partitionKey, timeStamp, key, value, propagatedHeaders);
- return Collections.singletonList(new KeyValueHolder<Object, ProducerRecord>((Object) exchange, record)).iterator();
+ private Iterator<Object> getObjectIterator(Object body) {
+ Iterator<Object> iterator = null;
+ if (body instanceof Iterable) {
+ iterator = ((Iterable<Object>) body).iterator();
+ } else if (body instanceof Iterator) {
+ iterator = (Iterator<Object>) body;
+ }
+ return iterator;
}
- private List<Header> getPropagatedHeaders(Exchange exchange, KafkaConfiguration getConfiguration) {
- HeaderFilterStrategy headerFilterStrategy = getConfiguration.getHeaderFilterStrategy();
- KafkaHeaderSerializer headerSerializer = getConfiguration.getHeaderSerializer();
- return exchange.getIn().getHeaders().entrySet().stream()
- .filter(entry -> shouldBeFiltered(entry, exchange, headerFilterStrategy))
- .map(entry -> getRecordHeader(entry, headerSerializer)).filter(Objects::nonNull).collect(Collectors.toList());
+ private List<Header> getPropagatedHeaders(Exchange exchange, Message message) {
+ Map<String, Object> messageHeaders = message.getHeaders();
+ List<Header> propagatedHeaders = new ArrayList<>(messageHeaders.size());
+
+ for (Map.Entry<String, Object> header : messageHeaders.entrySet()) {
+ RecordHeader recordHeader = getRecordHeader(header, exchange);
+ if (recordHeader != null) {
+ propagatedHeaders.add(recordHeader);
+ }
+ }
+
+ return propagatedHeaders;
}
- private boolean shouldBeFiltered(
- Map.Entry<String, Object> entry, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
- return !headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange);
+ private boolean shouldBeFiltered(String key, Object value, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
+ return !headerFilterStrategy.applyFilterToCamelHeaders(key, value, exchange);
}
- private RecordHeader getRecordHeader(Map.Entry<String, Object> entry, KafkaHeaderSerializer headerSerializer) {
- byte[] headerValue = headerSerializer.serialize(entry.getKey(), entry.getValue());
- if (headerValue == null) {
- return null;
+ private RecordHeader getRecordHeader(
+ Map.Entry<String, Object> entry, Exchange exchange) {
+
+ final HeaderFilterStrategy headerFilterStrategy = configuration.getHeaderFilterStrategy();
+
+ final String key = entry.getKey();
+ final Object value = entry.getValue();
+
+ if (shouldBeFiltered(key, value, exchange, headerFilterStrategy)) {
+ final KafkaHeaderSerializer headerSerializer = configuration.getHeaderSerializer();
+ final byte[] headerValue = headerSerializer.serialize(key, value);
+
+ if (headerValue == null) {
+ return null;
+ }
+ return new RecordHeader(key, headerValue);
}
- return new RecordHeader(entry.getKey(), headerValue);
+
+ return null;
}
@Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
// Camel calls this method if the endpoint isSynchronous(), as the
// KafkaEndpoint creates a SynchronousDelegateProducer for it
public void process(Exchange exchange) throws Exception {
- Iterator<KeyValueHolder<Object, ProducerRecord>> c = createRecorder(exchange);
- List<KeyValueHolder<Object, Future<RecordMetadata>>> futures = new LinkedList<>();
- List<RecordMetadata> recordMetadatas = new ArrayList<>();
+ // is the message body a list or something that contains multiple values
+ Message message = exchange.getIn();
- if (endpoint.getConfiguration().isRecordMetadata()) {
- exchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+ if (isIterable(message.getBody())) {
+ processIterableSync(exchange, message);
+ } else {
+ processSingleMessageSync(exchange, message);
}
+ }
+
+ private void processSingleMessageSync(Exchange exchange, Message message) throws InterruptedException, ExecutionException {
+ final ProducerRecord<Object, Object> producerRecord = createRecord(exchange, message);
+
+ final Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
+
+ postProcessMetadata(exchange, future);
+ }
+
+ private void processIterableSync(Exchange exchange, Message message) throws ExecutionException, InterruptedException {
+ List<KeyValueHolder<Object, Future<RecordMetadata>>> futures = new ArrayList<>();
+
+ Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> recordIterable
+ = createRecordIterable(exchange, message);
+
+ // This sets an empty metadata for the very first message on the batch
+ List<RecordMetadata> recordMetadata = new ArrayList<>();
+ if (configuration.isRecordMetadata()) {
+ exchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadata);
+ }
+
+ while (recordIterable.hasNext()) {
+ KeyValueHolder<Object, ProducerRecord<Object, Object>> exchangeRecord = recordIterable.next();
+ ProducerRecord<Object, Object> rec = exchangeRecord.getValue();
- while (c.hasNext()) {
- KeyValueHolder<Object, ProducerRecord> exrec = c.next();
- ProducerRecord rec = exrec.getValue();
if (LOG.isDebugEnabled()) {
LOG.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
}
- futures.add(new KeyValueHolder(exrec.getKey(), kafkaProducer.send(rec)));
+
+ futures.add(new KeyValueHolder<>(exchangeRecord.getKey(), kafkaProducer.send(rec)));
}
+
+ postProcessMetadata(futures, recordMetadata);
+ }
+
+ private void postProcessMetadata(
+ List<KeyValueHolder<Object, Future<RecordMetadata>>> futures, List<RecordMetadata> metadataList)
+ throws InterruptedException, ExecutionException {
for (KeyValueHolder<Object, Future<RecordMetadata>> f : futures) {
- // wait for them all to be sent
- List<RecordMetadata> metadata = Collections.singletonList(f.getValue().get());
- recordMetadatas.addAll(metadata);
- Exchange innerExchange = null;
- if (f.getKey() instanceof Exchange) {
- innerExchange = (Exchange) f.getKey();
- if (innerExchange != null) {
- if (endpoint.getConfiguration().isRecordMetadata()) {
- innerExchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
- }
- }
- }
- Message innerMessage = null;
- if (f.getKey() instanceof Message) {
- innerMessage = (Message) f.getKey();
- if (innerMessage != null) {
- if (endpoint.getConfiguration().isRecordMetadata()) {
- innerMessage.setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
- }
- }
- }
+ metadataList.addAll(postProcessMetadata(f.getKey(), f.getValue()));
}
}
+ private List<RecordMetadata> postProcessMetadata(Object key, Future<RecordMetadata> f)
+ throws InterruptedException, ExecutionException {
+ // wait for them all to be sent
+ RecordMetadata metadata = f.get();
+
+ if (configuration.isRecordMetadata()) {
+ List<RecordMetadata> metadataList = Collections.singletonList(metadata);
+
+ ProducerUtil.setRecordMetadata(key, metadataList);
+
+ return metadataList;
+ }
+
+ return Collections.EMPTY_LIST;
+ }
+
@Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
public boolean process(Exchange exchange, AsyncCallback callback) {
+ final KafkaProducerCallBack producerCallBack
+ = new KafkaProducerCallBack(exchange, callback, workerPool, configuration.isRecordMetadata());
+
+ Message message = exchange.getMessage();
+ Object body = message.getBody();
+
try {
- Iterator<KeyValueHolder<Object, ProducerRecord>> c = createRecorder(exchange);
- KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback, workerPool, endpoint.getConfiguration());
- while (c.hasNext()) {
- cb.increment();
- KeyValueHolder<Object, ProducerRecord> exrec = c.next();
- ProducerRecord rec = exrec.getValue();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
- }
- List<Callback> delegates = new ArrayList<>(Arrays.asList(cb));
- if (exrec.getKey() != null) {
- delegates.add(new KafkaProducerCallBack(exrec.getKey(), workerPool, endpoint.getConfiguration()));
- }
- kafkaProducer.send(rec, new DelegatingCallback(delegates.toArray(new Callback[0])));
+ // is the message body a list or something that contains multiple values
+ if (isIterable(body)) {
+ processIterableAsync(exchange, producerCallBack, message);
+ } else {
+ final ProducerRecord<Object, Object> record = createRecord(exchange, message);
+
+ doSend(exchange, record, producerCallBack);
}
- return cb.allSent();
- } catch (Exception ex) {
- exchange.setException(ex);
+
+ return producerCallBack.allSent();
+ } catch (Exception e) {
+ exchange.setException(e);
}
+
callback.done(true);
return true;
}
+
+ private void processIterableAsync(Exchange exchange, KafkaProducerCallBack producerCallBack, Message message) {
+ final Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> c = createRecordIterable(exchange, message);
+
+ while (c.hasNext()) {
+ doSend(c, producerCallBack);
+ }
+ }
+
+ private void doSend(Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> kvIterator, KafkaProducerCallBack cb) {
+ doSend(kvIterator.next(), cb);
+ }
+
+ private void doSend(KeyValueHolder<Object, ProducerRecord<Object, Object>> exchangeRecord, KafkaProducerCallBack cb) {
+ doSend(exchangeRecord.getKey(), exchangeRecord.getValue(), cb);
+ }
+
+ private void doSend(Object key, ProducerRecord<Object, Object> record, KafkaProducerCallBack cb) {
+ cb.increment();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending message to topic: {}, partition: {}, key: {}", record.topic(), record.partition(),
+ record.key());
+ }
+
+ if (key != null) {
+ KafkaProducerMetadataCallBack metadataCallBack = new KafkaProducerMetadataCallBack(
+ key, configuration.isRecordMetadata());
+
+ DelegatingCallback delegatingCallback = new DelegatingCallback(cb, metadataCallBack);
+
+ kafkaProducer.send(record, delegatingCallback);
+ } else {
+ kafkaProducer.send(record, cb);
+ }
+ }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
index 21ced69..cc71667 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
@@ -17,22 +17,23 @@
package org.apache.camel.component.kafka.producer.support;
-import java.util.Arrays;
-import java.util.List;
-
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
public final class DelegatingCallback implements Callback {
- private final List<Callback> callbacks;
+ private final Callback callback1;
+ private final Callback callback2;
- public DelegatingCallback(Callback... callbacks) {
- this.callbacks = Arrays.asList(callbacks);
+ public DelegatingCallback(Callback callback1, Callback callback2) {
+ this.callback1 = callback1;
+ this.callback2 = callback2;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
- callbacks.forEach(c -> c.onCompletion(metadata, exception));
+ callback1.onCompletion(metadata, exception);
+ callback2.onCompletion(metadata, exception);
+
}
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
index 83f09a0..32d0602 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
@@ -19,93 +19,81 @@ package org.apache.camel.component.kafka.producer.support;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.component.kafka.KafkaConfiguration;
-import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class KafkaProducerCallBack implements Callback {
+import static org.apache.camel.component.kafka.producer.support.ProducerUtil.setException;
+import static org.apache.camel.component.kafka.producer.support.ProducerUtil.setRecordMetadata;
+
+public final class KafkaProducerCallBack implements Callback {
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerCallBack.class);
private final Object body;
private final AsyncCallback callback;
- private final AtomicInteger count = new AtomicInteger(1);
- private final List<RecordMetadata> recordMetadatas = new ArrayList<>();
+ private final LongAdder count = new LongAdder();
private final ExecutorService workerPool;
-
- public KafkaProducerCallBack(Object body, ExecutorService workerPool, KafkaConfiguration configuration) {
- this(body, null, workerPool, configuration);
- }
+ private final boolean record;
+ private final List<RecordMetadata> recordMetadataList = new ArrayList<>();
public KafkaProducerCallBack(Object body, AsyncCallback callback, ExecutorService workerPool,
- KafkaConfiguration configuration) {
+ boolean record) {
this.body = body;
this.callback = callback;
- this.workerPool = Objects.requireNonNull(workerPool, "A worker pool must be provided");
-
- if (configuration.isRecordMetadata()) {
- if (body instanceof Exchange) {
- Exchange ex = (Exchange) body;
- ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
- }
- if (body instanceof Message) {
- Message msg = (Message) body;
- msg.setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
- }
+ // The worker pool should be created for both sync and async modes, so checking it
+ // is merely a safeguard
+ assert workerPool != null;
+ this.workerPool = workerPool;
+ this.record = record;
+ count.increment();
+
+ if (record) {
+ setRecordMetadata(body, recordMetadataList);
}
}
public void increment() {
- count.incrementAndGet();
+ count.increment();
}
public boolean allSent() {
- if (count.decrementAndGet() == 0) {
+ count.decrement();
+ if (count.intValue() == 0) {
LOG.trace("All messages sent, continue routing.");
// was able to get all the work done while queuing the requests
- if (callback != null) {
- callback.done(true);
- }
+ callback.done(true);
+
return true;
}
+
return false;
}
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e != null) {
- if (body instanceof Exchange) {
- ((Exchange) body).setException(e);
- }
- if (body instanceof Message && ((Message) body).getExchange() != null) {
- ((Message) body).getExchange().setException(e);
- }
- }
+ setException(body, e);
- recordMetadatas.add(recordMetadata);
+ if (record) {
+ recordMetadataList.add(recordMetadata);
+ }
- if (count.decrementAndGet() == 0) {
+ count.decrement();
+ if (count.intValue() == 0) {
// use worker pool to continue routing the exchange
// as this thread is from Kafka Callback and should not be used
// by Camel routing
- workerPool.submit(new Runnable() {
- @Override
- public void run() {
- LOG.trace("All messages sent, continue routing.");
- if (callback != null) {
- callback.done(false);
- }
- }
- });
+ workerPool.submit(this::doContinueRouting);
}
}
+
+ private void doContinueRouting() {
+ LOG.trace("All messages sent, continue routing (within thread).");
+ callback.done(false);
+ }
+
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerMetadataCallBack.java
similarity index 61%
copy from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
copy to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerMetadataCallBack.java
index 21ced69..a199881 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerMetadataCallBack.java
@@ -17,22 +17,27 @@
package org.apache.camel.component.kafka.producer.support;
-import java.util.Arrays;
-import java.util.List;
-
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
-public final class DelegatingCallback implements Callback {
+import static org.apache.camel.component.kafka.producer.support.ProducerUtil.setException;
+import static org.apache.camel.component.kafka.producer.support.ProducerUtil.setRecordMetadata;
- private final List<Callback> callbacks;
+public class KafkaProducerMetadataCallBack implements Callback {
+ private final Object body;
+ private final boolean record;
- public DelegatingCallback(Callback... callbacks) {
- this.callbacks = Arrays.asList(callbacks);
+ public KafkaProducerMetadataCallBack(Object body, boolean record) {
+ this.body = body;
+ this.record = record;
}
@Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- callbacks.forEach(c -> c.onCompletion(metadata, exception));
+ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+ setException(body, e);
+
+ if (record) {
+ setRecordMetadata(body, recordMetadata);
+ }
}
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
index 63df928..b3ee5e4 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
@@ -30,7 +30,7 @@ import org.apache.kafka.common.header.Header;
import static org.apache.camel.component.kafka.producer.support.ProducerUtil.tryConvertToSerializedType;
-public class KeyValueHolderIterator implements Iterator<KeyValueHolder<Object, ProducerRecord>> {
+public class KeyValueHolderIterator implements Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> {
private final Iterator<Object> msgList;
private final Exchange exchange;
private final KafkaConfiguration kafkaConfiguration;
@@ -52,91 +52,102 @@ public class KeyValueHolderIterator implements Iterator<KeyValueHolder<Object, P
}
@Override
- public KeyValueHolder<Object, ProducerRecord> next() {
+ public KeyValueHolder<Object, ProducerRecord<Object, Object>> next() {
// must convert each entry of the iterator into the value
// according to the serializer
- Object next = msgList.next();
- String innerTopic = msgTopic;
- Object innerKey = null;
- Integer innerPartitionKey = null;
- Long innerTimestamp = null;
-
- Object value = next;
- Exchange ex = null;
- Object body = next;
-
- if (next instanceof Exchange || next instanceof Message) {
- Exchange innerExchange = null;
- Message innerMessage = null;
- if (next instanceof Exchange) {
- innerExchange = (Exchange) next;
- innerMessage = innerExchange.getIn();
- } else {
- innerMessage = (Message) next;
- }
+ final Object body = msgList.next();
- innerTopic = getInnerTopic(innerTopic, innerMessage);
+ if (body instanceof Exchange || body instanceof Message) {
+ final Message innerMessage = getInnerMessage(body);
+ final Exchange innerExchange = getInnerExchange(body);
- if (innerMessage.getHeader(KafkaConstants.PARTITION_KEY) != null) {
- innerPartitionKey = getInnerPartitionKey(innerMessage);
- }
+ final String innerTopic = getInnerTopic(innerMessage);
+ final Integer innerPartitionKey = getInnerPartitionKey(innerMessage);
+ final Object innerKey = getInnerKey(innerExchange, innerMessage);
+ final Long innerTimestamp = getOverrideTimestamp(innerMessage);
- if (innerMessage.getHeader(KafkaConstants.KEY) != null) {
- innerKey = getInnerKey(innerExchange, innerMessage);
- }
+ final Exchange ex = innerExchange == null ? exchange : innerExchange;
- innerTimestamp = getOverrideTimestamp(innerTimestamp, innerMessage);
-
- ex = innerExchange == null ? exchange : innerExchange;
- value = tryConvertToSerializedType(ex, innerMessage.getBody(),
+ final Object value = tryConvertToSerializedType(ex, innerMessage.getBody(),
kafkaConfiguration.getValueSerializer());
+
+ return new KeyValueHolder<>(
+ body,
+ new ProducerRecord<>(
+ innerTopic, innerPartitionKey, innerTimestamp, innerKey, value, propagatedHeaders));
}
- return new KeyValueHolder(
+ return new KeyValueHolder<>(
body,
- new ProducerRecord(
- innerTopic, innerPartitionKey, innerTimestamp, innerKey, value, propagatedHeaders));
+ new ProducerRecord<>(
+ msgTopic, null, null, null, body, propagatedHeaders));
+ }
+
+ private Message getInnerMessage(Object body) {
+ if (body instanceof Exchange) {
+ return ((Exchange) body).getIn();
+ }
+
+ return (Message) body;
+ }
+
+ private Exchange getInnerExchange(Object body) {
+ if (body instanceof Exchange) {
+ return (Exchange) body;
+ }
+
+ return null;
}
- private boolean hasValidTimestampHeader(Message innerMessage) {
- if (innerMessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP) != null) {
- return innerMessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP) instanceof Long;
+ private boolean hasValidTimestampHeader(Object timeStamp) {
+ if (timeStamp != null) {
+ return timeStamp instanceof Long;
}
return false;
}
- private Long getOverrideTimestamp(Long innerTimestamp, Message innerMessage) {
- if (hasValidTimestampHeader(innerMessage)) {
- innerTimestamp = (Long) innerMessage.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
+ private Long getOverrideTimestamp(Message innerMessage) {
+ Object objTimestamp = innerMessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
+
+ if (hasValidTimestampHeader(objTimestamp)) {
+ return (Long) innerMessage.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
}
- return innerTimestamp;
+ return null;
}
- private String getInnerTopic(String innerTopic, Message innerMessage) {
+ private String getInnerTopic(Message innerMessage) {
if (innerMessage.getHeader(KafkaConstants.OVERRIDE_TOPIC) != null) {
- innerTopic = (String) innerMessage.removeHeader(KafkaConstants.OVERRIDE_TOPIC);
+ return (String) innerMessage.removeHeader(KafkaConstants.OVERRIDE_TOPIC);
}
- return innerTopic;
+ return msgTopic;
}
- private Object getInnerKey(Exchange innerExchange, Message innerMmessage) {
- Object innerKey;
- innerKey = kafkaConfiguration.getKey() != null
- ? kafkaConfiguration.getKey() : innerMmessage.getHeader(KafkaConstants.KEY);
+ private Object getInnerKey(Exchange innerExchange, Message innerMessage) {
+ Object innerKey = innerMessage.getHeader(KafkaConstants.KEY);
if (innerKey != null) {
- innerKey = tryConvertToSerializedType(innerExchange, innerKey,
- kafkaConfiguration.getKeySerializer());
+
+ innerKey = kafkaConfiguration.getKey() != null ? kafkaConfiguration.getKey() : innerKey;
+
+ if (innerKey != null) {
+ innerKey = tryConvertToSerializedType(innerExchange, innerKey,
+ kafkaConfiguration.getKeySerializer());
+ }
+
+ return innerKey;
}
- return innerKey;
+
+ return null;
}
private Integer getInnerPartitionKey(Message innerMessage) {
+ Integer partitionKey = innerMessage.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
+
return kafkaConfiguration.getPartitionKey() != null
? kafkaConfiguration.getPartitionKey()
- : innerMessage.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
+ : partitionKey;
}
@Override
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
index 0295942..5b0499b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
@@ -18,9 +18,13 @@
package org.apache.camel.component.kafka.producer.support;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.utils.Bytes;
public final class ProducerUtil {
@@ -52,4 +56,32 @@ public final class ProducerUtil {
return answer != null ? answer : object;
}
+
+ static void setException(Object body, Exception e) {
+ if (e != null) {
+ if (body instanceof Exchange) {
+ ((Exchange) body).setException(e);
+ }
+ if (body instanceof Message && ((Message) body).getExchange() != null) {
+ ((Message) body).getExchange().setException(e);
+ }
+ }
+ }
+
+ static void setRecordMetadata(Object body, RecordMetadata recordMetadata) {
+ final List<RecordMetadata> recordMetadataList = Collections.singletonList(recordMetadata);
+
+ setRecordMetadata(body, recordMetadataList);
+ }
+
+ public static void setRecordMetadata(Object body, List<RecordMetadata> recordMetadataList) {
+ if (body instanceof Exchange) {
+ Exchange ex = (Exchange) body;
+ ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadataList);
+ }
+ if (body instanceof Message) {
+ Message msg = (Message) body;
+ msg.setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadataList);
+ }
+ }
}