You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/09 11:10:55 UTC

[camel] branch master updated: CAMEL-14360 - Allow overriding of message key and partition key for ggregated kafka messages (#3459)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c2ebca  CAMEL-14360 - Allow overriding of message key and partition key for ggregated kafka messages (#3459)
9c2ebca is described below

commit 9c2ebca78db194edb554ef29370b7f0ec006bb03
Author: Rafał Gała <ra...@hotmail.com>
AuthorDate: Thu Jan 9 12:10:42 2020 +0100

    CAMEL-14360 - Allow overriding of message key and partition key for ggregated kafka messages (#3459)
    
    * CAMEL-14360 - Allow overriding of message key and partition key for aggregated kafka messages
    
    * CAMEL-14362 - Set metadata for each aggregated kafka message separately
    
    * CAMEL-14362 - Use KeyValueHolder instead of Pair
    
    * CMAEL-14362 - Code formatting changes and Exchange retrieval from Message instance
    
    * CAMEL-14362 - Additional check for null return value of getExchange
---
 .../camel/component/kafka/KafkaProducer.java       | 202 ++++++++++++++++-----
 .../camel/component/kafka/KafkaProducerTest.java   |  23 +++
 2 files changed, 179 insertions(+), 46 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index c316774..52fe0d3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -37,6 +38,7 @@ import org.apache.camel.Message;
 import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.URISupport;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -139,7 +141,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
-    protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws Exception {
+    protected Iterator<KeyValueHolder<Object, ProducerRecord>> createRecorder(Exchange exchange) throws Exception {
         String topic = endpoint.getConfiguration().getTopic();
 
         // must remove header so its not propagated
@@ -154,18 +156,6 @@ public class KafkaProducer extends DefaultAsyncProducer {
             topic = URISupport.extractRemainderPath(new URI(endpoint.getEndpointUri()), true);
         }
 
-        // endpoint take precedence over header configuration
-        final Integer partitionKey = endpoint.getConfiguration().getPartitionKey() != null
-                ? endpoint.getConfiguration().getPartitionKey() : exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
-        final boolean hasPartitionKey = partitionKey != null;
-
-        // endpoint take precedence over header configuration
-        Object key = endpoint.getConfiguration().getKey() != null
-                ? endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY);
-        final Object messageKey = key != null
-                ? tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializerClass()) : null;
-        final boolean hasMessageKey = messageKey != null;
-
         // extracting headers which need to be propagated
         List<Header> propagatedHeaders = getPropagatedHeaders(exchange, endpoint.getConfiguration());
 
@@ -181,34 +171,67 @@ public class KafkaProducer extends DefaultAsyncProducer {
         if (iterator != null) {
             final Iterator<Object> msgList = iterator;
             final String msgTopic = topic;
-            return new Iterator<ProducerRecord>() {
+            return new Iterator<KeyValueHolder<Object, ProducerRecord>>() {
                 @Override
                 public boolean hasNext() {
                     return msgList.hasNext();
                 }
 
                 @Override
-                public ProducerRecord next() {
+                public KeyValueHolder<Object, ProducerRecord> next() {
                     // must convert each entry of the iterator into the value according to the serializer
                     Object next = msgList.next();
                     String innerTopic = msgTopic;
+                    Object innerKey = null;
+                    Integer innerPartitionKey = null;
+                    boolean hasPartitionKey = false;
+                    boolean hasMessageKey = false;
+
+                    Object value = next;
+                    Exchange ex = null;
+                    Object body = next;
+
+                    if (next instanceof Exchange || next instanceof Message) {
+                        Exchange innerExchange = null;
+                        Message innerMmessage = null;
+                        if (next instanceof Exchange) {
+                            innerExchange = (Exchange) next;
+                            innerMmessage = innerExchange.getIn();
+                        } else {
+                            innerMmessage = (Message) next;
+                        }
+
+                        if (innerMmessage.getHeader(KafkaConstants.OVERRIDE_TOPIC) != null) {
+                            innerTopic = (String) innerMmessage.removeHeader(KafkaConstants.OVERRIDE_TOPIC);
+                        }
+
+                        if (innerMmessage.getHeader(KafkaConstants.PARTITION_KEY) != null) {
+                            innerPartitionKey = endpoint.getConfiguration().getPartitionKey() != null
+                                ? endpoint.getConfiguration().getPartitionKey() : innerMmessage.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
+                            hasPartitionKey = innerPartitionKey != null;
+                        }
+
+                        if (innerMmessage.getHeader(KafkaConstants.KEY) != null) {
+                            innerKey = endpoint.getConfiguration().getKey() != null
+                                ? endpoint.getConfiguration().getKey() : innerMmessage.getHeader(KafkaConstants.KEY);
+
+                            final Object messageKey = innerKey != null
+                                ? tryConvertToSerializedType(innerExchange, innerKey, endpoint.getConfiguration().getKeySerializerClass()) : null;
+                            hasMessageKey = messageKey != null;
+                        }
+
+                        ex = innerExchange == null ? exchange : innerExchange;
+                        value = tryConvertToSerializedType(ex, innerMmessage.getBody(),
+                            endpoint.getConfiguration().getSerializerClass());
 
-                    if (next instanceof Exchange && ((Exchange) next).getIn().getHeader(KafkaConstants.OVERRIDE_TOPIC) != null) {
-                        innerTopic = (String) ((Exchange) next).getIn().removeHeader(KafkaConstants.OVERRIDE_TOPIC);
-                    }
-
-                    if (next instanceof Message && ((Message) next).getHeader(KafkaConstants.OVERRIDE_TOPIC) != null) {
-                        innerTopic = (String) ((Message) next).removeHeader(KafkaConstants.OVERRIDE_TOPIC);
                     }
 
-                    Object value = tryConvertToSerializedType(exchange, next, endpoint.getConfiguration().getSerializerClass());
-
                     if (hasPartitionKey && hasMessageKey) {
-                        return new ProducerRecord(innerTopic, partitionKey, null, key, value, propagatedHeaders);
+                        return new KeyValueHolder(body, new ProducerRecord(innerTopic, innerPartitionKey, null, innerKey, value, propagatedHeaders));
                     } else if (hasMessageKey) {
-                        return new ProducerRecord(innerTopic, null, null, key, value, propagatedHeaders);
+                        return new KeyValueHolder(body, new ProducerRecord(innerTopic, null, null, innerKey, value, propagatedHeaders));
                     } else {
-                        return new ProducerRecord(innerTopic, null, null, null, value, propagatedHeaders);
+                        return new KeyValueHolder(body, new ProducerRecord(innerTopic, null, null, null, value, propagatedHeaders));
                     }
                 }
 
@@ -219,6 +242,18 @@ public class KafkaProducer extends DefaultAsyncProducer {
             };
         }
 
+        // endpoint take precedence over header configuration
+        final Integer partitionKey = endpoint.getConfiguration().getPartitionKey() != null
+            ? endpoint.getConfiguration().getPartitionKey() : exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
+        final boolean hasPartitionKey = partitionKey != null;
+
+        // endpoint take precedence over header configuration
+        Object key = endpoint.getConfiguration().getKey() != null
+            ? endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY);
+        final Object messageKey = key != null
+            ? tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializerClass()) : null;
+        final boolean hasMessageKey = messageKey != null;
+
         // must convert each entry of the iterator into the value according to the serializer
         Object value = tryConvertToSerializedType(exchange, msg, endpoint.getConfiguration().getSerializerClass());
 
@@ -230,7 +265,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
         } else {
             record = new ProducerRecord(topic, null, null, null, value, propagatedHeaders);
         }
-        return Collections.singletonList(record).iterator();
+        return Collections.singletonList(new KeyValueHolder<Object, ProducerRecord>((Object)exchange, record)).iterator();
     }
 
     private List<Header> getPropagatedHeaders(Exchange exchange, KafkaConfiguration getConfiguration) {
@@ -259,8 +294,8 @@ public class KafkaProducer extends DefaultAsyncProducer {
     @SuppressWarnings({"unchecked", "rawtypes"})
     // Camel calls this method if the endpoint isSynchronous(), as the KafkaEndpoint creates a SynchronousDelegateProducer for it
     public void process(Exchange exchange) throws Exception {
-        Iterator<ProducerRecord> c = createRecorder(exchange);
-        List<Future<RecordMetadata>> futures = new LinkedList<>();
+        Iterator<KeyValueHolder<Object, ProducerRecord>> c = createRecorder(exchange);
+        List<KeyValueHolder<Object, Future<RecordMetadata>>> futures = new LinkedList<>();
         List<RecordMetadata> recordMetadatas = new ArrayList<>();
 
         if (endpoint.getConfiguration().isRecordMetadata()) {
@@ -272,15 +307,39 @@ public class KafkaProducer extends DefaultAsyncProducer {
         }
 
         while (c.hasNext()) {
-            ProducerRecord rec = c.next();
+            KeyValueHolder<Object, ProducerRecord> exrec = c.next();
+            ProducerRecord rec = exrec.getValue();
             if (log.isDebugEnabled()) {
                 log.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
             }
-            futures.add(kafkaProducer.send(rec));
+            futures.add(new KeyValueHolder(exrec.getKey(), kafkaProducer.send(rec)));
         }
-        for (Future<RecordMetadata> f : futures) {
+        for (KeyValueHolder<Object, Future<RecordMetadata>> f : futures) {
             //wait for them all to be sent
-            recordMetadatas.add(f.get());
+            List<RecordMetadata> metadata = Collections.singletonList(f.getValue().get());
+            recordMetadatas.addAll(metadata);
+            Exchange innerExchange = null;
+            if (f.getKey() instanceof  Exchange) {
+                innerExchange = (Exchange) f.getKey();
+                if (innerExchange != null) {
+                    if (endpoint.getConfiguration().isRecordMetadata()) {
+                        if (innerExchange.hasOut()) {
+                            innerExchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
+                        } else {
+                            innerExchange.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
+                        }
+                    }
+                }
+            }
+            Message innerMessage = null;
+            if (f.getKey() instanceof  Message) {
+                innerMessage = (Message) f.getKey();
+                if (innerMessage != null) {
+                    if (endpoint.getConfiguration().isRecordMetadata()) {
+                        innerMessage.setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
+                    }
+                }
+            }
         }
     }
 
@@ -288,15 +347,20 @@ public class KafkaProducer extends DefaultAsyncProducer {
     @SuppressWarnings({"unchecked", "rawtypes"})
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            Iterator<ProducerRecord> c = createRecorder(exchange);
+            Iterator<KeyValueHolder<Object, ProducerRecord>> c = createRecorder(exchange);
             KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback);
             while (c.hasNext()) {
                 cb.increment();
-                ProducerRecord rec = c.next();
+                KeyValueHolder<Object, ProducerRecord> exrec = c.next();
+                ProducerRecord rec = exrec.getValue();
                 if (log.isDebugEnabled()) {
                     log.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
                 }
-                kafkaProducer.send(rec, cb);
+                List<Callback> delegates = new ArrayList<>(Arrays.asList(cb));
+                if (exrec.getKey() != null) {
+                    delegates.add(new KafkaProducerCallBack(exrec.getKey()));
+                }
+                kafkaProducer.send(rec, new DelegatingCallback(delegates.toArray(new Callback[0])));
             }
             return cb.allSent();
         } catch (Exception ex) {
@@ -312,6 +376,10 @@ public class KafkaProducer extends DefaultAsyncProducer {
     protected Object tryConvertToSerializedType(Exchange exchange, Object object, String serializerClass) {
         Object answer = null;
 
+        if (exchange == null) {
+            return object;
+        }
+
         if (KafkaConstants.KAFKA_DEFAULT_SERIALIZER.equals(serializerClass)) {
             answer = exchange.getContext().getTypeConverter().tryConvertTo(String.class, exchange, object);
         } else if ("org.apache.kafka.common.serialization.ByteArraySerializer".equals(serializerClass)) {
@@ -329,25 +397,58 @@ public class KafkaProducer extends DefaultAsyncProducer {
         return answer != null ? answer : object;
     }
 
+    private final class DelegatingCallback implements Callback {
+
+        private final List<Callback> callbacks;
+
+        public DelegatingCallback(Callback... callbacks) {
+            this.callbacks = Arrays.asList(callbacks);
+        }
+
+        @Override
+        public void onCompletion(RecordMetadata metadata, Exception exception) {
+            callbacks.forEach(c -> c.onCompletion(metadata, exception));
+        }
+    }
+
     private final class KafkaProducerCallBack implements Callback {
 
-        private final Exchange exchange;
+        private final Object body;
         private final AsyncCallback callback;
         private final AtomicInteger count = new AtomicInteger(1);
         private final List<RecordMetadata> recordMetadatas = new ArrayList<>();
 
-        KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) {
-            this.exchange = exchange;
+        KafkaProducerCallBack(Object body, AsyncCallback callback) {
+            this.body = body;
             this.callback = callback;
             if (endpoint.getConfiguration().isRecordMetadata()) {
-                if (exchange.hasOut()) {
-                    exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
-                } else {
-                    exchange.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+                if (body instanceof Exchange) {
+                    Exchange ex = (Exchange) body;
+                    if (ex.hasOut()) {
+                        ex.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+                    } else {
+                        ex.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+                    }
+                }
+                if (body instanceof Message) {
+                    Message msg = (Message) body;
+                    msg.setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
                 }
             }
         }
 
+        public KafkaProducerCallBack(Exchange exchange) {
+            this(exchange, null);
+        }
+
+        public KafkaProducerCallBack(Message message) {
+            this(message, null);
+        }
+
+        public KafkaProducerCallBack(Object body) {
+            this(body, null);
+        }
+
         void increment() {
             count.incrementAndGet();
         }
@@ -356,7 +457,9 @@ public class KafkaProducer extends DefaultAsyncProducer {
             if (count.decrementAndGet() == 0) {
                 log.trace("All messages sent, continue routing.");
                 //was able to get all the work done while queuing the requests
-                callback.done(true);
+                if (callback != null) {
+                    callback.done(true);
+                }
                 return true;
             }
             return false;
@@ -365,7 +468,12 @@ public class KafkaProducer extends DefaultAsyncProducer {
         @Override
         public void onCompletion(RecordMetadata recordMetadata, Exception e) {
             if (e != null) {
-                exchange.setException(e);
+                if (body instanceof Exchange) {
+                    ((Exchange)body).setException(e);
+                }
+                if (body instanceof Message && ((Message)body).getExchange() != null) {
+                    ((Message)body).getExchange().setException(e);
+                }
             }
 
             recordMetadatas.add(recordMetadata);
@@ -377,7 +485,9 @@ public class KafkaProducer extends DefaultAsyncProducer {
                     @Override
                     public void run() {
                         log.trace("All messages sent, continue routing.");
-                        callback.done(false);
+                        if (callback != null) {
+                            callback.done(false);
+                        }
                     }
                 });
             }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index bda3ecc..be5cb16 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -88,6 +88,7 @@ public class KafkaProducerTest {
         Mockito.when(context.getTypeConverter()).thenReturn(converter);
         Mockito.when(converter.tryConvertTo(String.class, exchange, null)).thenReturn(null);
         Mockito.when(camelContext.getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory());
+        Mockito.when(camelContext.getTypeConverter()).thenReturn(converter);
 
         producer.setKafkaProducer(kp);
         producer.setWorkerPool(Executors.newFixedThreadPool(1));
@@ -333,6 +334,7 @@ public class KafkaProducerTest {
         // assert results
         verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", "overridenTopic3"));
         assertRecordMetadataExists(3);
+        assertRecordMetadataExistsForEachAggregatedExchange();
     }
 
     @Test
@@ -358,6 +360,7 @@ public class KafkaProducerTest {
         // assert results
         verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", "overridenTopic3"));
         assertRecordMetadataExists(3);
+        assertRecordMetadataExistsForEachAggregatedMessage();
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -410,6 +413,26 @@ public class KafkaProducerTest {
         assertTrue(recordMetaData1.get(0) != null);
     }
 
+    private void assertRecordMetadataExistsForEachAggregatedExchange() {
+        List<Exchange> exchanges = (List<Exchange>) in.getBody();
+        for (Exchange ex : exchanges) {
+            List<RecordMetadata> recordMetaData = (List<RecordMetadata>) ex.getMessage().getHeader(KafkaConstants.KAFKA_RECORDMETA);
+            assertTrue(recordMetaData != null);
+            assertEquals("Expected one recordMetaData", recordMetaData.size(), 1);
+            assertTrue(recordMetaData.get(0) != null);
+        }
+    }
+
+    private void assertRecordMetadataExistsForEachAggregatedMessage() {
+        List<Message> messages = (List<Message>) in.getBody();
+        for (Message msg : messages) {
+            List<RecordMetadata> recordMetaData = (List<RecordMetadata>) msg.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+            assertTrue(recordMetaData != null);
+            assertEquals("Expected one recordMetaData", recordMetaData.size(), 1);
+            assertTrue(recordMetaData.get(0) != null);
+        }
+    }
+
     private Exchange aggregateExchanges(final List<Exchange> exchangesToAggregate, final AggregationStrategy strategy) {
         Exchange exchangeHolder = new DefaultExchange(camelContext);