You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/09 11:10:55 UTC
[camel] branch master updated: CAMEL-14360 - Allow overriding of
message key and partition key for ggregated kafka messages (#3459)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 9c2ebca CAMEL-14360 - Allow overriding of message key and partition key for ggregated kafka messages (#3459)
9c2ebca is described below
commit 9c2ebca78db194edb554ef29370b7f0ec006bb03
Author: Rafał Gała <ra...@hotmail.com>
AuthorDate: Thu Jan 9 12:10:42 2020 +0100
CAMEL-14360 - Allow overriding of message key and partition key for ggregated kafka messages (#3459)
* CAMEL-14360 - Allow overriding of message key and partition key for aggregated kafka messages
* CAMEL-14362 - Set metadata for each aggregated kafka message separately
* CAMEL-14362 - Use KeyValueHolder instead of Pair
* CMAEL-14362 - Code formatting changes and Exchange retrieval from Message instance
* CAMEL-14362 - Additional check for null return value of getExchange
---
.../camel/component/kafka/KafkaProducer.java | 202 ++++++++++++++++-----
.../camel/component/kafka/KafkaProducerTest.java | 23 +++
2 files changed, 179 insertions(+), 46 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index c316774..52fe0d3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
@@ -37,6 +38,7 @@ import org.apache.camel.Message;
import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.URISupport;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -139,7 +141,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
}
@SuppressWarnings({"unchecked", "rawtypes"})
- protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws Exception {
+ protected Iterator<KeyValueHolder<Object, ProducerRecord>> createRecorder(Exchange exchange) throws Exception {
String topic = endpoint.getConfiguration().getTopic();
// must remove header so its not propagated
@@ -154,18 +156,6 @@ public class KafkaProducer extends DefaultAsyncProducer {
topic = URISupport.extractRemainderPath(new URI(endpoint.getEndpointUri()), true);
}
- // endpoint take precedence over header configuration
- final Integer partitionKey = endpoint.getConfiguration().getPartitionKey() != null
- ? endpoint.getConfiguration().getPartitionKey() : exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
- final boolean hasPartitionKey = partitionKey != null;
-
- // endpoint take precedence over header configuration
- Object key = endpoint.getConfiguration().getKey() != null
- ? endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY);
- final Object messageKey = key != null
- ? tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializerClass()) : null;
- final boolean hasMessageKey = messageKey != null;
-
// extracting headers which need to be propagated
List<Header> propagatedHeaders = getPropagatedHeaders(exchange, endpoint.getConfiguration());
@@ -181,34 +171,67 @@ public class KafkaProducer extends DefaultAsyncProducer {
if (iterator != null) {
final Iterator<Object> msgList = iterator;
final String msgTopic = topic;
- return new Iterator<ProducerRecord>() {
+ return new Iterator<KeyValueHolder<Object, ProducerRecord>>() {
@Override
public boolean hasNext() {
return msgList.hasNext();
}
@Override
- public ProducerRecord next() {
+ public KeyValueHolder<Object, ProducerRecord> next() {
// must convert each entry of the iterator into the value according to the serializer
Object next = msgList.next();
String innerTopic = msgTopic;
+ Object innerKey = null;
+ Integer innerPartitionKey = null;
+ boolean hasPartitionKey = false;
+ boolean hasMessageKey = false;
+
+ Object value = next;
+ Exchange ex = null;
+ Object body = next;
+
+ if (next instanceof Exchange || next instanceof Message) {
+ Exchange innerExchange = null;
+ Message innerMmessage = null;
+ if (next instanceof Exchange) {
+ innerExchange = (Exchange) next;
+ innerMmessage = innerExchange.getIn();
+ } else {
+ innerMmessage = (Message) next;
+ }
+
+ if (innerMmessage.getHeader(KafkaConstants.OVERRIDE_TOPIC) != null) {
+ innerTopic = (String) innerMmessage.removeHeader(KafkaConstants.OVERRIDE_TOPIC);
+ }
+
+ if (innerMmessage.getHeader(KafkaConstants.PARTITION_KEY) != null) {
+ innerPartitionKey = endpoint.getConfiguration().getPartitionKey() != null
+ ? endpoint.getConfiguration().getPartitionKey() : innerMmessage.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
+ hasPartitionKey = innerPartitionKey != null;
+ }
+
+ if (innerMmessage.getHeader(KafkaConstants.KEY) != null) {
+ innerKey = endpoint.getConfiguration().getKey() != null
+ ? endpoint.getConfiguration().getKey() : innerMmessage.getHeader(KafkaConstants.KEY);
+
+ final Object messageKey = innerKey != null
+ ? tryConvertToSerializedType(innerExchange, innerKey, endpoint.getConfiguration().getKeySerializerClass()) : null;
+ hasMessageKey = messageKey != null;
+ }
+
+ ex = innerExchange == null ? exchange : innerExchange;
+ value = tryConvertToSerializedType(ex, innerMmessage.getBody(),
+ endpoint.getConfiguration().getSerializerClass());
- if (next instanceof Exchange && ((Exchange) next).getIn().getHeader(KafkaConstants.OVERRIDE_TOPIC) != null) {
- innerTopic = (String) ((Exchange) next).getIn().removeHeader(KafkaConstants.OVERRIDE_TOPIC);
- }
-
- if (next instanceof Message && ((Message) next).getHeader(KafkaConstants.OVERRIDE_TOPIC) != null) {
- innerTopic = (String) ((Message) next).removeHeader(KafkaConstants.OVERRIDE_TOPIC);
}
- Object value = tryConvertToSerializedType(exchange, next, endpoint.getConfiguration().getSerializerClass());
-
if (hasPartitionKey && hasMessageKey) {
- return new ProducerRecord(innerTopic, partitionKey, null, key, value, propagatedHeaders);
+ return new KeyValueHolder(body, new ProducerRecord(innerTopic, innerPartitionKey, null, innerKey, value, propagatedHeaders));
} else if (hasMessageKey) {
- return new ProducerRecord(innerTopic, null, null, key, value, propagatedHeaders);
+ return new KeyValueHolder(body, new ProducerRecord(innerTopic, null, null, innerKey, value, propagatedHeaders));
} else {
- return new ProducerRecord(innerTopic, null, null, null, value, propagatedHeaders);
+ return new KeyValueHolder(body, new ProducerRecord(innerTopic, null, null, null, value, propagatedHeaders));
}
}
@@ -219,6 +242,18 @@ public class KafkaProducer extends DefaultAsyncProducer {
};
}
+ // endpoint take precedence over header configuration
+ final Integer partitionKey = endpoint.getConfiguration().getPartitionKey() != null
+ ? endpoint.getConfiguration().getPartitionKey() : exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
+ final boolean hasPartitionKey = partitionKey != null;
+
+ // endpoint take precedence over header configuration
+ Object key = endpoint.getConfiguration().getKey() != null
+ ? endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY);
+ final Object messageKey = key != null
+ ? tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializerClass()) : null;
+ final boolean hasMessageKey = messageKey != null;
+
// must convert each entry of the iterator into the value according to the serializer
Object value = tryConvertToSerializedType(exchange, msg, endpoint.getConfiguration().getSerializerClass());
@@ -230,7 +265,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
} else {
record = new ProducerRecord(topic, null, null, null, value, propagatedHeaders);
}
- return Collections.singletonList(record).iterator();
+ return Collections.singletonList(new KeyValueHolder<Object, ProducerRecord>((Object)exchange, record)).iterator();
}
private List<Header> getPropagatedHeaders(Exchange exchange, KafkaConfiguration getConfiguration) {
@@ -259,8 +294,8 @@ public class KafkaProducer extends DefaultAsyncProducer {
@SuppressWarnings({"unchecked", "rawtypes"})
// Camel calls this method if the endpoint isSynchronous(), as the KafkaEndpoint creates a SynchronousDelegateProducer for it
public void process(Exchange exchange) throws Exception {
- Iterator<ProducerRecord> c = createRecorder(exchange);
- List<Future<RecordMetadata>> futures = new LinkedList<>();
+ Iterator<KeyValueHolder<Object, ProducerRecord>> c = createRecorder(exchange);
+ List<KeyValueHolder<Object, Future<RecordMetadata>>> futures = new LinkedList<>();
List<RecordMetadata> recordMetadatas = new ArrayList<>();
if (endpoint.getConfiguration().isRecordMetadata()) {
@@ -272,15 +307,39 @@ public class KafkaProducer extends DefaultAsyncProducer {
}
while (c.hasNext()) {
- ProducerRecord rec = c.next();
+ KeyValueHolder<Object, ProducerRecord> exrec = c.next();
+ ProducerRecord rec = exrec.getValue();
if (log.isDebugEnabled()) {
log.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
}
- futures.add(kafkaProducer.send(rec));
+ futures.add(new KeyValueHolder(exrec.getKey(), kafkaProducer.send(rec)));
}
- for (Future<RecordMetadata> f : futures) {
+ for (KeyValueHolder<Object, Future<RecordMetadata>> f : futures) {
//wait for them all to be sent
- recordMetadatas.add(f.get());
+ List<RecordMetadata> metadata = Collections.singletonList(f.getValue().get());
+ recordMetadatas.addAll(metadata);
+ Exchange innerExchange = null;
+ if (f.getKey() instanceof Exchange) {
+ innerExchange = (Exchange) f.getKey();
+ if (innerExchange != null) {
+ if (endpoint.getConfiguration().isRecordMetadata()) {
+ if (innerExchange.hasOut()) {
+ innerExchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
+ } else {
+ innerExchange.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
+ }
+ }
+ }
+ }
+ Message innerMessage = null;
+ if (f.getKey() instanceof Message) {
+ innerMessage = (Message) f.getKey();
+ if (innerMessage != null) {
+ if (endpoint.getConfiguration().isRecordMetadata()) {
+ innerMessage.setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
+ }
+ }
+ }
}
}
@@ -288,15 +347,20 @@ public class KafkaProducer extends DefaultAsyncProducer {
@SuppressWarnings({"unchecked", "rawtypes"})
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
- Iterator<ProducerRecord> c = createRecorder(exchange);
+ Iterator<KeyValueHolder<Object, ProducerRecord>> c = createRecorder(exchange);
KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback);
while (c.hasNext()) {
cb.increment();
- ProducerRecord rec = c.next();
+ KeyValueHolder<Object, ProducerRecord> exrec = c.next();
+ ProducerRecord rec = exrec.getValue();
if (log.isDebugEnabled()) {
log.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
}
- kafkaProducer.send(rec, cb);
+ List<Callback> delegates = new ArrayList<>(Arrays.asList(cb));
+ if (exrec.getKey() != null) {
+ delegates.add(new KafkaProducerCallBack(exrec.getKey()));
+ }
+ kafkaProducer.send(rec, new DelegatingCallback(delegates.toArray(new Callback[0])));
}
return cb.allSent();
} catch (Exception ex) {
@@ -312,6 +376,10 @@ public class KafkaProducer extends DefaultAsyncProducer {
protected Object tryConvertToSerializedType(Exchange exchange, Object object, String serializerClass) {
Object answer = null;
+ if (exchange == null) {
+ return object;
+ }
+
if (KafkaConstants.KAFKA_DEFAULT_SERIALIZER.equals(serializerClass)) {
answer = exchange.getContext().getTypeConverter().tryConvertTo(String.class, exchange, object);
} else if ("org.apache.kafka.common.serialization.ByteArraySerializer".equals(serializerClass)) {
@@ -329,25 +397,58 @@ public class KafkaProducer extends DefaultAsyncProducer {
return answer != null ? answer : object;
}
+ private final class DelegatingCallback implements Callback {
+
+ private final List<Callback> callbacks;
+
+ public DelegatingCallback(Callback... callbacks) {
+ this.callbacks = Arrays.asList(callbacks);
+ }
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ callbacks.forEach(c -> c.onCompletion(metadata, exception));
+ }
+ }
+
private final class KafkaProducerCallBack implements Callback {
- private final Exchange exchange;
+ private final Object body;
private final AsyncCallback callback;
private final AtomicInteger count = new AtomicInteger(1);
private final List<RecordMetadata> recordMetadatas = new ArrayList<>();
- KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) {
- this.exchange = exchange;
+ KafkaProducerCallBack(Object body, AsyncCallback callback) {
+ this.body = body;
this.callback = callback;
if (endpoint.getConfiguration().isRecordMetadata()) {
- if (exchange.hasOut()) {
- exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
- } else {
- exchange.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+ if (body instanceof Exchange) {
+ Exchange ex = (Exchange) body;
+ if (ex.hasOut()) {
+ ex.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+ } else {
+ ex.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+ }
+ }
+ if (body instanceof Message) {
+ Message msg = (Message) body;
+ msg.setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
}
}
}
+ public KafkaProducerCallBack(Exchange exchange) {
+ this(exchange, null);
+ }
+
+ public KafkaProducerCallBack(Message message) {
+ this(message, null);
+ }
+
+ public KafkaProducerCallBack(Object body) {
+ this(body, null);
+ }
+
void increment() {
count.incrementAndGet();
}
@@ -356,7 +457,9 @@ public class KafkaProducer extends DefaultAsyncProducer {
if (count.decrementAndGet() == 0) {
log.trace("All messages sent, continue routing.");
//was able to get all the work done while queuing the requests
- callback.done(true);
+ if (callback != null) {
+ callback.done(true);
+ }
return true;
}
return false;
@@ -365,7 +468,12 @@ public class KafkaProducer extends DefaultAsyncProducer {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
- exchange.setException(e);
+ if (body instanceof Exchange) {
+ ((Exchange)body).setException(e);
+ }
+ if (body instanceof Message && ((Message)body).getExchange() != null) {
+ ((Message)body).getExchange().setException(e);
+ }
}
recordMetadatas.add(recordMetadata);
@@ -377,7 +485,9 @@ public class KafkaProducer extends DefaultAsyncProducer {
@Override
public void run() {
log.trace("All messages sent, continue routing.");
- callback.done(false);
+ if (callback != null) {
+ callback.done(false);
+ }
}
});
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index bda3ecc..be5cb16 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -88,6 +88,7 @@ public class KafkaProducerTest {
Mockito.when(context.getTypeConverter()).thenReturn(converter);
Mockito.when(converter.tryConvertTo(String.class, exchange, null)).thenReturn(null);
Mockito.when(camelContext.getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory());
+ Mockito.when(camelContext.getTypeConverter()).thenReturn(converter);
producer.setKafkaProducer(kp);
producer.setWorkerPool(Executors.newFixedThreadPool(1));
@@ -333,6 +334,7 @@ public class KafkaProducerTest {
// assert results
verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", "overridenTopic3"));
assertRecordMetadataExists(3);
+ assertRecordMetadataExistsForEachAggregatedExchange();
}
@Test
@@ -358,6 +360,7 @@ public class KafkaProducerTest {
// assert results
verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", "overridenTopic3"));
assertRecordMetadataExists(3);
+ assertRecordMetadataExistsForEachAggregatedMessage();
}
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -410,6 +413,26 @@ public class KafkaProducerTest {
assertTrue(recordMetaData1.get(0) != null);
}
+ private void assertRecordMetadataExistsForEachAggregatedExchange() {
+ List<Exchange> exchanges = (List<Exchange>) in.getBody();
+ for (Exchange ex : exchanges) {
+ List<RecordMetadata> recordMetaData = (List<RecordMetadata>) ex.getMessage().getHeader(KafkaConstants.KAFKA_RECORDMETA);
+ assertTrue(recordMetaData != null);
+ assertEquals("Expected one recordMetaData", recordMetaData.size(), 1);
+ assertTrue(recordMetaData.get(0) != null);
+ }
+ }
+
+ private void assertRecordMetadataExistsForEachAggregatedMessage() {
+ List<Message> messages = (List<Message>) in.getBody();
+ for (Message msg : messages) {
+ List<RecordMetadata> recordMetaData = (List<RecordMetadata>) msg.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+ assertTrue(recordMetaData != null);
+ assertEquals("Expected one recordMetaData", recordMetaData.size(), 1);
+ assertTrue(recordMetaData.get(0) != null);
+ }
+ }
+
private Exchange aggregateExchanges(final List<Exchange> exchangesToAggregate, final AggregationStrategy strategy) {
Exchange exchangeHolder = new DefaultExchange(camelContext);