You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2019/12/05 08:29:16 UTC
[camel] branch master updated: CAMEL-14233: Fix Overriden topics
when using aggregation on Exchanges or Messages
This is an automated email from the ASF dual-hosted git repository.
oalsafi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 5c87c10 CAMEL-14233: Fix Overriden topics when using aggregation on Exchanges or Messages
new b214d81 Merge pull request #3383 from omarsmak/CAMEL-14233
5c87c10 is described below
commit 5c87c103ccbf82dd339dadeb7dcb589e5e2fcc86
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Wed Dec 4 17:22:24 2019 +0100
CAMEL-14233: Fix Overriden topics when using aggregation on Exchanges or Messages
---
.../camel/component/kafka/KafkaProducer.java | 17 +++-
.../camel/component/kafka/KafkaProducerTest.java | 100 +++++++++++++++++++++
2 files changed, 114 insertions(+), 3 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 47ba7b0..c316774 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultAsyncProducer;
@@ -190,14 +191,24 @@ public class KafkaProducer extends DefaultAsyncProducer {
public ProducerRecord next() {
// must convert each entry of the iterator into the value according to the serializer
Object next = msgList.next();
+ String innerTopic = msgTopic;
+
+ if (next instanceof Exchange && ((Exchange) next).getIn().getHeader(KafkaConstants.OVERRIDE_TOPIC) != null) {
+ innerTopic = (String) ((Exchange) next).getIn().removeHeader(KafkaConstants.OVERRIDE_TOPIC);
+ }
+
+ if (next instanceof Message && ((Message) next).getHeader(KafkaConstants.OVERRIDE_TOPIC) != null) {
+ innerTopic = (String) ((Message) next).removeHeader(KafkaConstants.OVERRIDE_TOPIC);
+ }
+
Object value = tryConvertToSerializedType(exchange, next, endpoint.getConfiguration().getSerializerClass());
if (hasPartitionKey && hasMessageKey) {
- return new ProducerRecord(msgTopic, partitionKey, null, key, value, propagatedHeaders);
+ return new ProducerRecord(innerTopic, partitionKey, null, key, value, propagatedHeaders);
} else if (hasMessageKey) {
- return new ProducerRecord(msgTopic, null, null, key, value, propagatedHeaders);
+ return new ProducerRecord(innerTopic, null, null, key, value, propagatedHeaders);
} else {
- return new ProducerRecord(msgTopic, null, null, null, value, propagatedHeaders);
+ return new ProducerRecord(innerTopic, null, null, null, value, propagatedHeaders);
}
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 93512fc..bda3ecc 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -16,12 +16,16 @@
*/
package org.apache.camel.component.kafka;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import org.apache.camel.AggregationStrategy;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -29,6 +33,9 @@ import org.apache.camel.Message;
import org.apache.camel.TypeConverter;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.engine.DefaultHeadersMapFactory;
+import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
+import org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy;
+import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.DefaultMessage;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -303,6 +310,56 @@ public class KafkaProducerTest {
assertRecordMetadataExists();
}
+ @Test
+ public void processSendsMessageWithListOfExchangesWithOverrideTopicHeaderOnEveryExchange() throws Exception {
+ endpoint.getConfiguration().setTopic("someTopic");
+ Mockito.when(exchange.getIn()).thenReturn(in);
+
+ // we set the initial topic
+ in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
+ in.setHeader(KafkaConstants.KEY, "someKey");
+
+ // we add our exchanges in order to aggregate
+ final List<Exchange> nestedExchanges = createListOfExchangesWithTopics(Arrays.asList("overridenTopic1", "overridenTopic2", "overridenTopic3"));
+
+ // aggregate
+ final Exchange finalAggregatedExchange = aggregateExchanges(nestedExchanges, new GroupedExchangeAggregationStrategy());
+
+ in.setBody(finalAggregatedExchange.getIn().getBody());
+ in.setHeaders(finalAggregatedExchange.getIn().getHeaders());
+
+ producer.process(exchange);
+
+ // assert results
+ verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", "overridenTopic3"));
+ assertRecordMetadataExists(3);
+ }
+
+ @Test
+ public void processSendsMessageWithListOfMessagesWithOverrideTopicHeaderOnEveryExchange() throws Exception {
+ endpoint.getConfiguration().setTopic("someTopic");
+ Mockito.when(exchange.getIn()).thenReturn(in);
+
+ // we set the initial topic
+ in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
+ in.setHeader(KafkaConstants.KEY, "someKey");
+
+ // we add our exchanges in order to aggregate
+ final List<Exchange> nestedExchanges = createListOfExchangesWithTopics(Arrays.asList("overridenTopic1", "overridenTopic2", "overridenTopic3"));
+
+ // aggregate
+ final Exchange finalAggregatedExchange = aggregateExchanges(nestedExchanges, new GroupedMessageAggregationStrategy());
+
+ in.setBody(finalAggregatedExchange.getIn().getBody());
+ in.setHeaders(finalAggregatedExchange.getIn().getHeaders());
+
+ producer.process(exchange);
+
+ // assert results
+ verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", "overridenTopic3"));
+ assertRecordMetadataExists(3);
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
protected void verifySendMessage(Integer partitionKey, String topic, String messageKey) {
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
@@ -327,10 +384,53 @@ public class KafkaProducerTest {
assertEquals(topic, captor.getValue().topic());
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ protected void verifySendMessages(final List<String> expectedTopics) {
+ final ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
+ Mockito.verify(producer.getKafkaProducer(), Mockito.atLeast(expectedTopics.size())).send(captor.capture());
+ final List<String> actualTopics = captor.getAllValues()
+ .stream()
+ .map(ProducerRecord::topic)
+ .collect(Collectors.toList());
+
+ assertEquals(expectedTopics, actualTopics);
+ }
+
private void assertRecordMetadataExists() {
List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
assertTrue(recordMetaData1 != null);
assertEquals("Expected one recordMetaData", recordMetaData1.size(), 1);
assertTrue(recordMetaData1.get(0) != null);
}
+
+ private void assertRecordMetadataExists(final int numMetadata) {
+ List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+ assertTrue(recordMetaData1 != null);
+ assertEquals("Expected one recordMetaData", recordMetaData1.size(), numMetadata);
+ assertTrue(recordMetaData1.get(0) != null);
+ }
+
+ private Exchange aggregateExchanges(final List<Exchange> exchangesToAggregate, final AggregationStrategy strategy) {
+ Exchange exchangeHolder = new DefaultExchange(camelContext);
+
+ for (final Exchange innerExchange: exchangesToAggregate) {
+ exchangeHolder = strategy.aggregate(exchangeHolder, innerExchange);
+ }
+
+ strategy.onCompletion(exchangeHolder);
+
+ return exchangeHolder;
+ }
+
+ private List<Exchange> createListOfExchangesWithTopics(final List<String> topics) {
+ final List<Exchange> resultLists = new LinkedList<>();
+
+ topics.forEach(topic -> {
+ final Exchange innerExchange = new DefaultExchange(camelContext);
+ innerExchange.getIn().setHeader(KafkaConstants.OVERRIDE_TOPIC, topic);
+ resultLists.add(innerExchange);
+ });
+
+ return resultLists;
+ }
}