You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/12/01 05:54:19 UTC

[camel] branch main updated: CAMEL-17240: cleanup the camel-kafka producer code (#6491)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ddc8e3  CAMEL-17240: cleanup the camel-kafka producer code (#6491)
0ddc8e3 is described below

commit 0ddc8e33b0d1035478239875424388a5f676ca6e
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Wed Dec 1 06:53:35 2021 +0100

    CAMEL-17240: cleanup the camel-kafka producer code (#6491)
    
    Includes:
    - rework the producer to reduce the number of allocations and GC
      pressure
    - replace AtomicInteger with LongAdder to reduce risk of contention in AtomicInteger (and a minor improvement in performance)
    - logging improvements in the hot path
    - rework the callbacks to prevent creating threads unnecessarily
    - avoid creating arrays/collections for the delegating callback
    - use collection singleton for single-item arrays when possible
    - cleanup to use generics in some of the code
    - use an assert to check the internal worker pool for not being null as it seems faster
    - do a single pass of the set when iterating over it to get the propagated headers
    - cleanup the record iterator to simplify and reduce calls to getHeader
---
 .../camel/component/kafka/KafkaProducer.java       | 347 ++++++++++++++-------
 .../kafka/producer/support/DelegatingCallback.java |  15 +-
 .../producer/support/KafkaProducerCallBack.java    |  86 +++--
 ...ack.java => KafkaProducerMetadataCallBack.java} |  23 +-
 .../producer/support/KeyValueHolderIterator.java   | 117 +++----
 .../kafka/producer/support/ProducerUtil.java       |  32 ++
 6 files changed, 389 insertions(+), 231 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 5560514..a89ecdd 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -18,31 +18,29 @@ package org.apache.camel.component.kafka;
 
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.stream.Collectors;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.component.kafka.producer.support.DelegatingCallback;
 import org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack;
+import org.apache.camel.component.kafka.producer.support.KafkaProducerMetadataCallBack;
 import org.apache.camel.component.kafka.producer.support.KeyValueHolderIterator;
+import org.apache.camel.component.kafka.producer.support.ProducerUtil;
 import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
-import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -60,20 +58,29 @@ public class KafkaProducer extends DefaultAsyncProducer {
     @SuppressWarnings("rawtypes")
     private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
     private final KafkaEndpoint endpoint;
+    private final KafkaConfiguration configuration;
     private ExecutorService workerPool;
     private boolean shutdownWorkerPool;
     private volatile boolean closeKafkaProducer;
+    private final String endpointTopic;
+    private final Integer configPartitionKey;
+    private final String configKey;
 
     public KafkaProducer(KafkaEndpoint endpoint) {
         super(endpoint);
         this.endpoint = endpoint;
+        this.configuration = endpoint.getConfiguration();
+
+        endpointTopic = URISupport.extractRemainderPath(URI.create(endpoint.getEndpointUri()), true);
+        configPartitionKey = configuration.getPartitionKey();
+        configKey = configuration.getKey();
     }
 
     Properties getProps() {
-        Properties props = endpoint.getConfiguration().createProducerProperties();
+        Properties props = configuration.createProducerProperties();
         endpoint.updateClassProperties(props);
 
-        String brokers = endpoint.getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
+        String brokers = endpoint.getKafkaClientFactory().getBrokers(configuration);
         if (brokers != null) {
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
         }
@@ -111,7 +118,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
         }
 
         // if we are in asynchronous mode we need a worker pool
-        if (!endpoint.getConfiguration().isSynchronous() && workerPool == null) {
+        if (!configuration.isSynchronous() && workerPool == null) {
             workerPool = endpoint.createProducerExecutor();
             // we create a thread pool so we should also shut it down
             shutdownWorkerPool = true;
@@ -143,167 +150,281 @@ public class KafkaProducer extends DefaultAsyncProducer {
         }
 
         if (shutdownWorkerPool && workerPool != null) {
-            int timeout = endpoint.getConfiguration().getShutdownTimeout();
+            int timeout = configuration.getShutdownTimeout();
             LOG.debug("Shutting down Kafka producer worker threads with timeout {} millis", timeout);
             endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(workerPool, timeout);
             workerPool = null;
         }
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    protected Iterator<KeyValueHolder<Object, ProducerRecord>> createRecorder(Exchange exchange) throws Exception {
-        String topic = endpoint.getConfiguration().getTopic();
-        Long timeStamp = null;
+    protected Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> createRecordIterable(
+            Exchange exchange, Message message) {
+        String topic = evaluateTopic(message);
 
-        // must remove header so its not propagated
-        Object overrideTopic = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TOPIC);
-        if (overrideTopic != null) {
-            LOG.debug("Using override topic: {}", overrideTopic);
-            topic = overrideTopic.toString();
-        }
+        // extracting headers which need to be propagated
+        List<Header> propagatedHeaders = getPropagatedHeaders(exchange, message);
 
-        if (topic == null) {
-            // if topic property was not received from configuration or header
-            // parameters take it from the remaining URI
-            topic = URISupport.extractRemainderPath(new URI(endpoint.getEndpointUri()), true);
-        }
+        Object body = message.getBody();
 
-        Object overrideTimeStamp = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
+        Iterator<Object> iterator = getObjectIterator(body);
+
+        return new KeyValueHolderIterator(iterator, exchange, configuration, topic, propagatedHeaders);
+    }
+
+    protected ProducerRecord<Object, Object> createRecord(Exchange exchange, Message message) {
+        String topic = evaluateTopic(message);
+
+        Long timeStamp = null;
+        Object overrideTimeStamp = message.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
         if (overrideTimeStamp instanceof Long) {
             LOG.debug("Using override TimeStamp: {}", overrideTimeStamp);
             timeStamp = (Long) overrideTimeStamp;
         }
 
         // extracting headers which need to be propagated
-        List<Header> propagatedHeaders = getPropagatedHeaders(exchange, endpoint.getConfiguration());
+        List<Header> propagatedHeaders = getPropagatedHeaders(exchange, message);
 
-        Object msg = exchange.getIn().getBody();
+        final Integer msgPartitionKey = getOverridePartitionKey(message);
+        Object msgKey = getOverrideKey(message);
 
-        // is the message body a list or something that contains multiple values
-        Iterator<Object> iterator = null;
-        if (msg instanceof Iterable) {
-            iterator = ((Iterable<Object>) msg).iterator();
-        } else if (msg instanceof Iterator) {
-            iterator = (Iterator<Object>) msg;
+        if (msgKey != null) {
+            msgKey = tryConvertToSerializedType(exchange, msgKey, configuration.getKeySerializer());
+        }
+
+        // must convert each entry of the iterator into the value according to
+        // the serializer
+        Object value = tryConvertToSerializedType(exchange, message.getBody(), configuration.getValueSerializer());
+
+        return new ProducerRecord<>(topic, msgPartitionKey, timeStamp, msgKey, value, propagatedHeaders);
+    }
+
+    private Object getOverrideKey(Message message) {
+        if (ObjectHelper.isEmpty(configKey)) {
+            return message.getHeader(KafkaConstants.KEY);
         }
-        if (iterator != null) {
-            final Iterator<Object> msgList = iterator;
-            final String msgTopic = topic;
 
-            return new KeyValueHolderIterator(msgList, exchange, endpoint.getConfiguration(), msgTopic, propagatedHeaders);
+        return configKey;
+    }
+
+    private Integer getOverridePartitionKey(Message message) {
+        if (ObjectHelper.isEmpty(configPartitionKey)) {
+            return message.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
         }
 
-        // endpoint take precedence over header configuration
-        final Integer partitionKey = ObjectHelper.supplyIfEmpty(endpoint.getConfiguration().getPartitionKey(),
-                () -> exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class));
+        return configPartitionKey;
+    }
 
-        // endpoint take precedence over header configuration
-        Object key = ObjectHelper.supplyIfEmpty(endpoint.getConfiguration().getKey(),
-                () -> exchange.getIn().getHeader(KafkaConstants.KEY));
+    protected KeyValueHolder<Object, ProducerRecord<Object, Object>> createKeyValueHolder(Exchange exchange, Message message) {
+        ProducerRecord<Object, Object> record = createRecord(exchange, message);
 
-        if (key != null) {
-            key = tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializer());
+        return new KeyValueHolder<>(exchange, record);
+    }
+
+    private String evaluateTopic(Message message) {
+        // must remove header so it's not propagated.
+        Object overrideTopic = message.removeHeader(KafkaConstants.OVERRIDE_TOPIC);
+        if (overrideTopic != null) {
+            LOG.debug("Using override topic: {}", overrideTopic);
+            return overrideTopic.toString();
         }
 
-        // must convert each entry of the iterator into the value according to
-        // the serializer
-        Object value = tryConvertToSerializedType(exchange, msg, endpoint.getConfiguration().getValueSerializer());
+        String topic = configuration.getTopic();
+        if (topic != null) {
+            return topic;
+        }
+
+        return endpointTopic;
+    }
+
+    private boolean isIterable(Object body) {
+        if (body instanceof Iterable || body instanceof Iterator) {
+            return true;
+        }
+
+        return false;
+    }
 
-        ProducerRecord record = new ProducerRecord(topic, partitionKey, timeStamp, key, value, propagatedHeaders);
-        return Collections.singletonList(new KeyValueHolder<Object, ProducerRecord>((Object) exchange, record)).iterator();
+    private Iterator<Object> getObjectIterator(Object body) {
+        Iterator<Object> iterator = null;
+        if (body instanceof Iterable) {
+            iterator = ((Iterable<Object>) body).iterator();
+        } else if (body instanceof Iterator) {
+            iterator = (Iterator<Object>) body;
+        }
+        return iterator;
     }
 
-    private List<Header> getPropagatedHeaders(Exchange exchange, KafkaConfiguration getConfiguration) {
-        HeaderFilterStrategy headerFilterStrategy = getConfiguration.getHeaderFilterStrategy();
-        KafkaHeaderSerializer headerSerializer = getConfiguration.getHeaderSerializer();
-        return exchange.getIn().getHeaders().entrySet().stream()
-                .filter(entry -> shouldBeFiltered(entry, exchange, headerFilterStrategy))
-                .map(entry -> getRecordHeader(entry, headerSerializer)).filter(Objects::nonNull).collect(Collectors.toList());
+    private List<Header> getPropagatedHeaders(Exchange exchange, Message message) {
+        Map<String, Object> messageHeaders = message.getHeaders();
+        List<Header> propagatedHeaders = new ArrayList<>(messageHeaders.size());
+
+        for (Map.Entry<String, Object> header : messageHeaders.entrySet()) {
+            RecordHeader recordHeader = getRecordHeader(header, exchange);
+            if (recordHeader != null) {
+                propagatedHeaders.add(recordHeader);
+            }
+        }
+
+        return propagatedHeaders;
     }
 
-    private boolean shouldBeFiltered(
-            Map.Entry<String, Object> entry, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
-        return !headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange);
+    private boolean shouldBeFiltered(String key, Object value, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
+        return !headerFilterStrategy.applyFilterToCamelHeaders(key, value, exchange);
     }
 
-    private RecordHeader getRecordHeader(Map.Entry<String, Object> entry, KafkaHeaderSerializer headerSerializer) {
-        byte[] headerValue = headerSerializer.serialize(entry.getKey(), entry.getValue());
-        if (headerValue == null) {
-            return null;
+    private RecordHeader getRecordHeader(
+            Map.Entry<String, Object> entry, Exchange exchange) {
+
+        final HeaderFilterStrategy headerFilterStrategy = configuration.getHeaderFilterStrategy();
+
+        final String key = entry.getKey();
+        final Object value = entry.getValue();
+
+        if (shouldBeFiltered(key, value, exchange, headerFilterStrategy)) {
+            final KafkaHeaderSerializer headerSerializer = configuration.getHeaderSerializer();
+            final byte[] headerValue = headerSerializer.serialize(key, value);
+
+            if (headerValue == null) {
+                return null;
+            }
+            return new RecordHeader(key, headerValue);
         }
-        return new RecordHeader(entry.getKey(), headerValue);
+
+        return null;
     }
 
     @Override
-    @SuppressWarnings({ "unchecked", "rawtypes" })
     // Camel calls this method if the endpoint isSynchronous(), as the
     // KafkaEndpoint creates a SynchronousDelegateProducer for it
     public void process(Exchange exchange) throws Exception {
-        Iterator<KeyValueHolder<Object, ProducerRecord>> c = createRecorder(exchange);
-        List<KeyValueHolder<Object, Future<RecordMetadata>>> futures = new LinkedList<>();
-        List<RecordMetadata> recordMetadatas = new ArrayList<>();
+        // is the message body a list or something that contains multiple values
+        Message message = exchange.getIn();
 
-        if (endpoint.getConfiguration().isRecordMetadata()) {
-            exchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+        if (isIterable(message.getBody())) {
+            processIterableSync(exchange, message);
+        } else {
+            processSingleMessageSync(exchange, message);
         }
+    }
+
+    private void processSingleMessageSync(Exchange exchange, Message message) throws InterruptedException, ExecutionException {
+        final ProducerRecord<Object, Object> producerRecord = createRecord(exchange, message);
+
+        final Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
+
+        postProcessMetadata(exchange, future);
+    }
+
+    private void processIterableSync(Exchange exchange, Message message) throws ExecutionException, InterruptedException {
+        List<KeyValueHolder<Object, Future<RecordMetadata>>> futures = new ArrayList<>();
+
+        Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> recordIterable
+                = createRecordIterable(exchange, message);
+
+        // This sets an empty metadata for the very first message on the batch
+        List<RecordMetadata> recordMetadata = new ArrayList<>();
+        if (configuration.isRecordMetadata()) {
+            exchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadata);
+        }
+
+        while (recordIterable.hasNext()) {
+            KeyValueHolder<Object, ProducerRecord<Object, Object>> exchangeRecord = recordIterable.next();
+            ProducerRecord<Object, Object> rec = exchangeRecord.getValue();
 
-        while (c.hasNext()) {
-            KeyValueHolder<Object, ProducerRecord> exrec = c.next();
-            ProducerRecord rec = exrec.getValue();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
             }
-            futures.add(new KeyValueHolder(exrec.getKey(), kafkaProducer.send(rec)));
+
+            futures.add(new KeyValueHolder<>(exchangeRecord.getKey(), kafkaProducer.send(rec)));
         }
+
+        postProcessMetadata(futures, recordMetadata);
+    }
+
+    private void postProcessMetadata(
+            List<KeyValueHolder<Object, Future<RecordMetadata>>> futures, List<RecordMetadata> metadataList)
+            throws InterruptedException, ExecutionException {
         for (KeyValueHolder<Object, Future<RecordMetadata>> f : futures) {
-            // wait for them all to be sent
-            List<RecordMetadata> metadata = Collections.singletonList(f.getValue().get());
-            recordMetadatas.addAll(metadata);
-            Exchange innerExchange = null;
-            if (f.getKey() instanceof Exchange) {
-                innerExchange = (Exchange) f.getKey();
-                if (innerExchange != null) {
-                    if (endpoint.getConfiguration().isRecordMetadata()) {
-                        innerExchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
-                    }
-                }
-            }
-            Message innerMessage = null;
-            if (f.getKey() instanceof Message) {
-                innerMessage = (Message) f.getKey();
-                if (innerMessage != null) {
-                    if (endpoint.getConfiguration().isRecordMetadata()) {
-                        innerMessage.setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
-                    }
-                }
-            }
+            metadataList.addAll(postProcessMetadata(f.getKey(), f.getValue()));
         }
     }
 
+    private List<RecordMetadata> postProcessMetadata(Object key, Future<RecordMetadata> f)
+            throws InterruptedException, ExecutionException {
+        // wait for them all to be sent
+        RecordMetadata metadata = f.get();
+
+        if (configuration.isRecordMetadata()) {
+            List<RecordMetadata> metadataList = Collections.singletonList(metadata);
+
+            ProducerUtil.setRecordMetadata(key, metadataList);
+
+            return metadataList;
+        }
+
+        return Collections.EMPTY_LIST;
+    }
+
     @Override
-    @SuppressWarnings({ "unchecked", "rawtypes" })
     public boolean process(Exchange exchange, AsyncCallback callback) {
+        final KafkaProducerCallBack producerCallBack
+                = new KafkaProducerCallBack(exchange, callback, workerPool, configuration.isRecordMetadata());
+
+        Message message = exchange.getMessage();
+        Object body = message.getBody();
+
         try {
-            Iterator<KeyValueHolder<Object, ProducerRecord>> c = createRecorder(exchange);
-            KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback, workerPool, endpoint.getConfiguration());
-            while (c.hasNext()) {
-                cb.increment();
-                KeyValueHolder<Object, ProducerRecord> exrec = c.next();
-                ProducerRecord rec = exrec.getValue();
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
-                }
-                List<Callback> delegates = new ArrayList<>(Arrays.asList(cb));
-                if (exrec.getKey() != null) {
-                    delegates.add(new KafkaProducerCallBack(exrec.getKey(), workerPool, endpoint.getConfiguration()));
-                }
-                kafkaProducer.send(rec, new DelegatingCallback(delegates.toArray(new Callback[0])));
+            // is the message body a list or something that contains multiple values
+            if (isIterable(body)) {
+                processIterableAsync(exchange, producerCallBack, message);
+            } else {
+                final ProducerRecord<Object, Object> record = createRecord(exchange, message);
+
+                doSend(exchange, record, producerCallBack);
             }
-            return cb.allSent();
-        } catch (Exception ex) {
-            exchange.setException(ex);
+
+            return producerCallBack.allSent();
+        } catch (Exception e) {
+            exchange.setException(e);
         }
+
         callback.done(true);
         return true;
     }
+
+    private void processIterableAsync(Exchange exchange, KafkaProducerCallBack producerCallBack, Message message) {
+        final Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> c = createRecordIterable(exchange, message);
+
+        while (c.hasNext()) {
+            doSend(c, producerCallBack);
+        }
+    }
+
+    private void doSend(Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> kvIterator, KafkaProducerCallBack cb) {
+        doSend(kvIterator.next(), cb);
+    }
+
+    private void doSend(KeyValueHolder<Object, ProducerRecord<Object, Object>> exchangeRecord, KafkaProducerCallBack cb) {
+        doSend(exchangeRecord.getKey(), exchangeRecord.getValue(), cb);
+    }
+
+    private void doSend(Object key, ProducerRecord<Object, Object> record, KafkaProducerCallBack cb) {
+        cb.increment();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Sending message to topic: {}, partition: {}, key: {}", record.topic(), record.partition(),
+                    record.key());
+        }
+
+        if (key != null) {
+            KafkaProducerMetadataCallBack metadataCallBack = new KafkaProducerMetadataCallBack(
+                    key, configuration.isRecordMetadata());
+
+            DelegatingCallback delegatingCallback = new DelegatingCallback(cb, metadataCallBack);
+
+            kafkaProducer.send(record, delegatingCallback);
+        } else {
+            kafkaProducer.send(record, cb);
+        }
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
index 21ced69..cc71667 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
@@ -17,22 +17,23 @@
 
 package org.apache.camel.component.kafka.producer.support;
 
-import java.util.Arrays;
-import java.util.List;
-
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 public final class DelegatingCallback implements Callback {
 
-    private final List<Callback> callbacks;
+    private final Callback callback1;
+    private final Callback callback2;
 
-    public DelegatingCallback(Callback... callbacks) {
-        this.callbacks = Arrays.asList(callbacks);
+    public DelegatingCallback(Callback callback1, Callback callback2) {
+        this.callback1 = callback1;
+        this.callback2 = callback2;
     }
 
     @Override
     public void onCompletion(RecordMetadata metadata, Exception exception) {
-        callbacks.forEach(c -> c.onCompletion(metadata, exception));
+        callback1.onCompletion(metadata, exception);
+        callback2.onCompletion(metadata, exception);
+
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
index 83f09a0..32d0602 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
@@ -19,93 +19,81 @@ package org.apache.camel.component.kafka.producer.support;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.component.kafka.KafkaConfiguration;
-import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaProducerCallBack implements Callback {
+import static org.apache.camel.component.kafka.producer.support.ProducerUtil.setException;
+import static org.apache.camel.component.kafka.producer.support.ProducerUtil.setRecordMetadata;
+
+public final class KafkaProducerCallBack implements Callback {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerCallBack.class);
 
     private final Object body;
     private final AsyncCallback callback;
-    private final AtomicInteger count = new AtomicInteger(1);
-    private final List<RecordMetadata> recordMetadatas = new ArrayList<>();
+    private final LongAdder count = new LongAdder();
     private final ExecutorService workerPool;
-
-    public KafkaProducerCallBack(Object body, ExecutorService workerPool, KafkaConfiguration configuration) {
-        this(body, null, workerPool, configuration);
-    }
+    private final boolean record;
+    private final List<RecordMetadata> recordMetadataList = new ArrayList<>();
 
     public KafkaProducerCallBack(Object body, AsyncCallback callback, ExecutorService workerPool,
-                                 KafkaConfiguration configuration) {
+                                 boolean record) {
         this.body = body;
         this.callback = callback;
-        this.workerPool = Objects.requireNonNull(workerPool, "A worker pool must be provided");
-
-        if (configuration.isRecordMetadata()) {
-            if (body instanceof Exchange) {
-                Exchange ex = (Exchange) body;
-                ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
-            }
-            if (body instanceof Message) {
-                Message msg = (Message) body;
-                msg.setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
-            }
+        // The worker pool should be created for both sync and async modes, so checking it
+        // is merely a safeguard
+        assert workerPool != null;
+        this.workerPool = workerPool;
+        this.record = record;
+        count.increment();
+
+        if (record) {
+            setRecordMetadata(body, recordMetadataList);
         }
     }
 
     public void increment() {
-        count.incrementAndGet();
+        count.increment();
     }
 
     public boolean allSent() {
-        if (count.decrementAndGet() == 0) {
+        count.decrement();
+        if (count.intValue() == 0) {
             LOG.trace("All messages sent, continue routing.");
             // was able to get all the work done while queuing the requests
-            if (callback != null) {
-                callback.done(true);
-            }
+            callback.done(true);
+
             return true;
         }
+
         return false;
     }
 
     @Override
     public void onCompletion(RecordMetadata recordMetadata, Exception e) {
-        if (e != null) {
-            if (body instanceof Exchange) {
-                ((Exchange) body).setException(e);
-            }
-            if (body instanceof Message && ((Message) body).getExchange() != null) {
-                ((Message) body).getExchange().setException(e);
-            }
-        }
+        setException(body, e);
 
-        recordMetadatas.add(recordMetadata);
+        if (record) {
+            recordMetadataList.add(recordMetadata);
+        }
 
-        if (count.decrementAndGet() == 0) {
+        count.decrement();
+        if (count.intValue() == 0) {
             // use worker pool to continue routing the exchange
             // as this thread is from Kafka Callback and should not be used
             // by Camel routing
-            workerPool.submit(new Runnable() {
-                @Override
-                public void run() {
-                    LOG.trace("All messages sent, continue routing.");
-                    if (callback != null) {
-                        callback.done(false);
-                    }
-                }
-            });
+            workerPool.submit(this::doContinueRouting);
         }
     }
+
+    private void doContinueRouting() {
+        LOG.trace("All messages sent, continue routing (within thread).");
+        callback.done(false);
+    }
+
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerMetadataCallBack.java
similarity index 61%
copy from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
copy to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerMetadataCallBack.java
index 21ced69..a199881 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerMetadataCallBack.java
@@ -17,22 +17,27 @@
 
 package org.apache.camel.component.kafka.producer.support;
 
-import java.util.Arrays;
-import java.util.List;
-
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 
-public final class DelegatingCallback implements Callback {
+import static org.apache.camel.component.kafka.producer.support.ProducerUtil.setException;
+import static org.apache.camel.component.kafka.producer.support.ProducerUtil.setRecordMetadata;
 
-    private final List<Callback> callbacks;
+public class KafkaProducerMetadataCallBack implements Callback {
+    private final Object body;
+    private final boolean record;
 
-    public DelegatingCallback(Callback... callbacks) {
-        this.callbacks = Arrays.asList(callbacks);
+    public KafkaProducerMetadataCallBack(Object body, boolean record) {
+        this.body = body;
+        this.record = record;
     }
 
     @Override
-    public void onCompletion(RecordMetadata metadata, Exception exception) {
-        callbacks.forEach(c -> c.onCompletion(metadata, exception));
+    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+        setException(body, e);
+
+        if (record) {
+            setRecordMetadata(body, recordMetadata);
+        }
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
index 63df928..b3ee5e4 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
@@ -30,7 +30,7 @@ import org.apache.kafka.common.header.Header;
 
 import static org.apache.camel.component.kafka.producer.support.ProducerUtil.tryConvertToSerializedType;
 
-public class KeyValueHolderIterator implements Iterator<KeyValueHolder<Object, ProducerRecord>> {
+public class KeyValueHolderIterator implements Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> {
     private final Iterator<Object> msgList;
     private final Exchange exchange;
     private final KafkaConfiguration kafkaConfiguration;
@@ -52,91 +52,102 @@ public class KeyValueHolderIterator implements Iterator<KeyValueHolder<Object, P
     }
 
     @Override
-    public KeyValueHolder<Object, ProducerRecord> next() {
+    public KeyValueHolder<Object, ProducerRecord<Object, Object>> next() {
         // must convert each entry of the iterator into the value
         // according to the serializer
-        Object next = msgList.next();
-        String innerTopic = msgTopic;
-        Object innerKey = null;
-        Integer innerPartitionKey = null;
-        Long innerTimestamp = null;
-
-        Object value = next;
-        Exchange ex = null;
-        Object body = next;
-
-        if (next instanceof Exchange || next instanceof Message) {
-            Exchange innerExchange = null;
-            Message innerMessage = null;
-            if (next instanceof Exchange) {
-                innerExchange = (Exchange) next;
-                innerMessage = innerExchange.getIn();
-            } else {
-                innerMessage = (Message) next;
-            }
+        final Object body = msgList.next();
 
-            innerTopic = getInnerTopic(innerTopic, innerMessage);
+        if (body instanceof Exchange || body instanceof Message) {
+            final Message innerMessage = getInnerMessage(body);
+            final Exchange innerExchange = getInnerExchange(body);
 
-            if (innerMessage.getHeader(KafkaConstants.PARTITION_KEY) != null) {
-                innerPartitionKey = getInnerPartitionKey(innerMessage);
-            }
+            final String innerTopic = getInnerTopic(innerMessage);
+            final Integer innerPartitionKey = getInnerPartitionKey(innerMessage);
+            final Object innerKey = getInnerKey(innerExchange, innerMessage);
+            final Long innerTimestamp = getOverrideTimestamp(innerMessage);
 
-            if (innerMessage.getHeader(KafkaConstants.KEY) != null) {
-                innerKey = getInnerKey(innerExchange, innerMessage);
-            }
+            final Exchange ex = innerExchange == null ? exchange : innerExchange;
 
-            innerTimestamp = getOverrideTimestamp(innerTimestamp, innerMessage);
-
-            ex = innerExchange == null ? exchange : innerExchange;
-            value = tryConvertToSerializedType(ex, innerMessage.getBody(),
+            final Object value = tryConvertToSerializedType(ex, innerMessage.getBody(),
                     kafkaConfiguration.getValueSerializer());
+
+            return new KeyValueHolder<>(
+                    body,
+                    new ProducerRecord<>(
+                            innerTopic, innerPartitionKey, innerTimestamp, innerKey, value, propagatedHeaders));
         }
 
-        return new KeyValueHolder(
+        return new KeyValueHolder<>(
                 body,
-                new ProducerRecord(
-                        innerTopic, innerPartitionKey, innerTimestamp, innerKey, value, propagatedHeaders));
+                new ProducerRecord<>(
+                        msgTopic, null, null, null, body, propagatedHeaders));
+    }
+
+    private Message getInnerMessage(Object body) {
+        if (body instanceof Exchange) {
+            return ((Exchange) body).getIn();
+        }
+
+        return (Message) body;
+    }
+
+    private Exchange getInnerExchange(Object body) {
+        if (body instanceof Exchange) {
+            return (Exchange) body;
+        }
+
+        return null;
     }
 
-    private boolean hasValidTimestampHeader(Message innerMessage) {
-        if (innerMessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP) != null) {
-            return innerMessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP) instanceof Long;
+    private boolean hasValidTimestampHeader(Object timeStamp) {
+        if (timeStamp != null) {
+            return timeStamp instanceof Long;
         }
 
         return false;
     }
 
-    private Long getOverrideTimestamp(Long innerTimestamp, Message innerMessage) {
-        if (hasValidTimestampHeader(innerMessage)) {
-            innerTimestamp = (Long) innerMessage.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
+    private Long getOverrideTimestamp(Message innerMessage) {
+        Object objTimestamp = innerMessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
+
+        if (hasValidTimestampHeader(objTimestamp)) {
+            return (Long) innerMessage.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
         }
 
-        return innerTimestamp;
+        return null;
     }
 
-    private String getInnerTopic(String innerTopic, Message innerMessage) {
+    private String getInnerTopic(Message innerMessage) {
         if (innerMessage.getHeader(KafkaConstants.OVERRIDE_TOPIC) != null) {
-            innerTopic = (String) innerMessage.removeHeader(KafkaConstants.OVERRIDE_TOPIC);
+            return (String) innerMessage.removeHeader(KafkaConstants.OVERRIDE_TOPIC);
         }
 
-        return innerTopic;
+        return msgTopic;
     }
 
-    private Object getInnerKey(Exchange innerExchange, Message innerMmessage) {
-        Object innerKey;
-        innerKey = kafkaConfiguration.getKey() != null
-                ? kafkaConfiguration.getKey() : innerMmessage.getHeader(KafkaConstants.KEY);
+    private Object getInnerKey(Exchange innerExchange, Message innerMessage) {
+        Object innerKey = innerMessage.getHeader(KafkaConstants.KEY);
         if (innerKey != null) {
-            innerKey = tryConvertToSerializedType(innerExchange, innerKey,
-                    kafkaConfiguration.getKeySerializer());
+
+            innerKey = kafkaConfiguration.getKey() != null ? kafkaConfiguration.getKey() : innerKey;
+
+            if (innerKey != null) {
+                innerKey = tryConvertToSerializedType(innerExchange, innerKey,
+                        kafkaConfiguration.getKeySerializer());
+            }
+
+            return innerKey;
         }
-        return innerKey;
+
+        return null;
     }
 
     private Integer getInnerPartitionKey(Message innerMessage) {
+        Integer partitionKey = innerMessage.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
+
         return kafkaConfiguration.getPartitionKey() != null
                 ? kafkaConfiguration.getPartitionKey()
-                : innerMessage.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
+                : partitionKey;
     }
 
     @Override
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
index 0295942..5b0499b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
@@ -18,9 +18,13 @@
 package org.apache.camel.component.kafka.producer.support;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.utils.Bytes;
 
 public final class ProducerUtil {
@@ -52,4 +56,32 @@ public final class ProducerUtil {
 
         return answer != null ? answer : object;
     }
+
+    static void setException(Object body, Exception e) {
+        if (e != null) {
+            if (body instanceof Exchange) {
+                ((Exchange) body).setException(e);
+            }
+            if (body instanceof Message && ((Message) body).getExchange() != null) {
+                ((Message) body).getExchange().setException(e);
+            }
+        }
+    }
+
+    static void setRecordMetadata(Object body, RecordMetadata recordMetadata) {
+        final List<RecordMetadata> recordMetadataList = Collections.singletonList(recordMetadata);
+
+        setRecordMetadata(body, recordMetadataList);
+    }
+
+    public static void setRecordMetadata(Object body, List<RecordMetadata> recordMetadataList) {
+        if (body instanceof Exchange) {
+            Exchange ex = (Exchange) body;
+            ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadataList);
+        }
+        if (body instanceof Message) {
+            Message msg = (Message) body;
+            msg.setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadataList);
+        }
+    }
 }