You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2019/12/05 08:29:16 UTC

[camel] branch master updated: CAMEL-14233: Fix Overriden topics when using aggregation on Exchanges or Messages

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c87c10  CAMEL-14233: Fix Overriden topics when using aggregation on Exchanges or Messages
     new b214d81  Merge pull request #3383 from omarsmak/CAMEL-14233
5c87c10 is described below

commit 5c87c103ccbf82dd339dadeb7dcb589e5e2fcc86
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Wed Dec 4 17:22:24 2019 +0100

    CAMEL-14233: Fix Overriden topics when using aggregation on Exchanges or Messages
---
 .../camel/component/kafka/KafkaProducer.java       |  17 +++-
 .../camel/component/kafka/KafkaProducerTest.java   | 100 +++++++++++++++++++++
 2 files changed, 114 insertions(+), 3 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 47ba7b0..c316774 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.DefaultAsyncProducer;
@@ -190,14 +191,24 @@ public class KafkaProducer extends DefaultAsyncProducer {
                 public ProducerRecord next() {
                     // must convert each entry of the iterator into the value according to the serializer
                     Object next = msgList.next();
+                    String innerTopic = msgTopic;
+
+                    if (next instanceof Exchange && ((Exchange) next).getIn().getHeader(KafkaConstants.OVERRIDE_TOPIC) != null) {
+                        innerTopic = (String) ((Exchange) next).getIn().removeHeader(KafkaConstants.OVERRIDE_TOPIC);
+                    }
+
+                    if (next instanceof Message && ((Message) next).getHeader(KafkaConstants.OVERRIDE_TOPIC) != null) {
+                        innerTopic = (String) ((Message) next).removeHeader(KafkaConstants.OVERRIDE_TOPIC);
+                    }
+
                     Object value = tryConvertToSerializedType(exchange, next, endpoint.getConfiguration().getSerializerClass());
 
                     if (hasPartitionKey && hasMessageKey) {
-                        return new ProducerRecord(msgTopic, partitionKey, null, key, value, propagatedHeaders);
+                        return new ProducerRecord(innerTopic, partitionKey, null, key, value, propagatedHeaders);
                     } else if (hasMessageKey) {
-                        return new ProducerRecord(msgTopic, null, null, key, value, propagatedHeaders);
+                        return new ProducerRecord(innerTopic, null, null, key, value, propagatedHeaders);
                     } else {
-                        return new ProducerRecord(msgTopic, null, null, null, value, propagatedHeaders);
+                        return new ProducerRecord(innerTopic, null, null, null, value, propagatedHeaders);
                     }
                 }
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 93512fc..bda3ecc 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -16,12 +16,16 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
+import org.apache.camel.AggregationStrategy;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -29,6 +33,9 @@ import org.apache.camel.Message;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.engine.DefaultHeadersMapFactory;
+import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
+import org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy;
+import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.DefaultMessage;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -303,6 +310,56 @@ public class KafkaProducerTest {
         assertRecordMetadataExists();
     }
 
+    @Test
+    public void processSendsMessageWithListOfExchangesWithOverrideTopicHeaderOnEveryExchange() throws Exception {
+        endpoint.getConfiguration().setTopic("someTopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+
+        // we set the initial topic
+        in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
+        in.setHeader(KafkaConstants.KEY, "someKey");
+
+        // we add our exchanges in order to aggregate
+        final List<Exchange> nestedExchanges = createListOfExchangesWithTopics(Arrays.asList("overridenTopic1", "overridenTopic2", "overridenTopic3"));
+
+        // aggregate
+        final Exchange finalAggregatedExchange = aggregateExchanges(nestedExchanges, new GroupedExchangeAggregationStrategy());
+
+        in.setBody(finalAggregatedExchange.getIn().getBody());
+        in.setHeaders(finalAggregatedExchange.getIn().getHeaders());
+
+        producer.process(exchange);
+
+        // assert results
+        verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", "overridenTopic3"));
+        assertRecordMetadataExists(3);
+    }
+
+    @Test
+    public void processSendsMessageWithListOfMessagesWithOverrideTopicHeaderOnEveryExchange() throws Exception {
+        endpoint.getConfiguration().setTopic("someTopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+
+        // we set the initial topic
+        in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
+        in.setHeader(KafkaConstants.KEY, "someKey");
+
+        // we add our exchanges in order to aggregate
+        final List<Exchange> nestedExchanges = createListOfExchangesWithTopics(Arrays.asList("overridenTopic1", "overridenTopic2", "overridenTopic3"));
+
+        // aggregate
+        final Exchange finalAggregatedExchange = aggregateExchanges(nestedExchanges, new GroupedMessageAggregationStrategy());
+
+        in.setBody(finalAggregatedExchange.getIn().getBody());
+        in.setHeaders(finalAggregatedExchange.getIn().getHeaders());
+
+        producer.process(exchange);
+
+        // assert results
+        verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", "overridenTopic3"));
+        assertRecordMetadataExists(3);
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     protected void verifySendMessage(Integer partitionKey, String topic, String messageKey) {
         ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
@@ -327,10 +384,53 @@ public class KafkaProducerTest {
         assertEquals(topic, captor.getValue().topic());
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    protected void verifySendMessages(final List<String> expectedTopics) {
+        final ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
+        Mockito.verify(producer.getKafkaProducer(), Mockito.atLeast(expectedTopics.size())).send(captor.capture());
+        final List<String> actualTopics = captor.getAllValues()
+                .stream()
+                .map(ProducerRecord::topic)
+                .collect(Collectors.toList());
+
+        assertEquals(expectedTopics, actualTopics);
+    }
+
     private void assertRecordMetadataExists() {
         List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
         assertTrue(recordMetaData1 != null);
         assertEquals("Expected one recordMetaData", recordMetaData1.size(), 1);
         assertTrue(recordMetaData1.get(0) != null);
     }
+
+    private void assertRecordMetadataExists(final int numMetadata) {
+        List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+        assertTrue(recordMetaData1 != null);
+        assertEquals("Expected one recordMetaData", recordMetaData1.size(), numMetadata);
+        assertTrue(recordMetaData1.get(0) != null);
+    }
+
+    private Exchange aggregateExchanges(final List<Exchange> exchangesToAggregate, final AggregationStrategy strategy) {
+        Exchange exchangeHolder = new DefaultExchange(camelContext);
+
+        for (final Exchange innerExchange: exchangesToAggregate) {
+            exchangeHolder = strategy.aggregate(exchangeHolder, innerExchange);
+        }
+
+        strategy.onCompletion(exchangeHolder);
+
+        return exchangeHolder;
+    }
+
+    private List<Exchange> createListOfExchangesWithTopics(final List<String> topics) {
+        final List<Exchange> resultLists = new LinkedList<>();
+
+        topics.forEach(topic -> {
+            final Exchange innerExchange = new DefaultExchange(camelContext);
+            innerExchange.getIn().setHeader(KafkaConstants.OVERRIDE_TOPIC, topic);
+            resultLists.add(innerExchange);
+        });
+
+        return resultLists;
+    }
 }