You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/28 13:26:11 UTC
[camel] branch master updated: CAMEL-16060 camel-kafka - decouple
kafka.PARTITION_KEY from kafka.KEY (#5263)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 73913a9 CAMEL-16060 camel-kafka - decouple kafka.PARTITION_KEY from kafka.KEY (#5263)
73913a9 is described below
commit 73913a9a1d65394743c30ffcb263cfa8dee980a7
Author: jenskordowski <10...@users.noreply.github.com>
AuthorDate: Sun Mar 28 15:25:49 2021 +0200
CAMEL-16060 camel-kafka - decouple kafka.PARTITION_KEY from kafka.KEY (#5263)
* CAMEL-16060 camel-kafka - decouple kafka.PARTITION_KEY from kafka.KEY
* CAMEL-16060 test fixes and documentation update
Co-authored-by: Jens Kordowski <je...@sap.com>
---
.../camel-kafka/src/main/docs/kafka-component.adoc | 2 +-
.../camel/component/kafka/KafkaProducer.java | 43 ++++++----------------
.../component/kafka/KafkaProducerFullTest.java | 16 ++++----
.../camel/component/kafka/KafkaProducerTest.java | 21 +++++++++++
4 files changed, 41 insertions(+), 41 deletions(-)
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 81ab68c..5dddf19 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -310,7 +310,7 @@ Before sending a message to Kafka you can configure the following headers.
| Header constant | Header value | Type | Description
| KafkaConstants.KEY | "kafka.KEY" | Object | *Required* The key of the message in order to ensure that all related message goes in the same partition
| KafkaConstants.OVERRIDE_TOPIC | "kafka.OVERRIDE_TOPIC" | String | The topic to which send the message (override and takes precedence), and the header is not preserved.
-| KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition (only used if the `KafkaConstants.KEY` header is defined)
+| KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition
|===
If you want to send a message to a dynamic topic then use `KafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index a8ef383..32aa440 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -189,8 +189,6 @@ public class KafkaProducer extends DefaultAsyncProducer {
String innerTopic = msgTopic;
Object innerKey = null;
Integer innerPartitionKey = null;
- boolean hasPartitionKey = false;
- boolean hasMessageKey = false;
Object value = next;
Exchange ex = null;
@@ -214,18 +212,15 @@ public class KafkaProducer extends DefaultAsyncProducer {
innerPartitionKey = endpoint.getConfiguration().getPartitionKey() != null
? endpoint.getConfiguration().getPartitionKey()
: innerMmessage.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
- hasPartitionKey = innerPartitionKey != null;
}
if (innerMmessage.getHeader(KafkaConstants.KEY) != null) {
innerKey = endpoint.getConfiguration().getKey() != null
? endpoint.getConfiguration().getKey() : innerMmessage.getHeader(KafkaConstants.KEY);
-
- final Object messageKey = innerKey != null
- ? tryConvertToSerializedType(innerExchange, innerKey,
- endpoint.getConfiguration().getKeySerializer())
- : null;
- hasMessageKey = messageKey != null;
+ if (innerKey != null) {
+ innerKey = tryConvertToSerializedType(innerExchange, innerKey,
+ endpoint.getConfiguration().getKeySerializer());
+ }
}
ex = innerExchange == null ? exchange : innerExchange;
@@ -234,17 +229,9 @@ public class KafkaProducer extends DefaultAsyncProducer {
}
- if (hasPartitionKey && hasMessageKey) {
- return new KeyValueHolder(
- body,
- new ProducerRecord(innerTopic, innerPartitionKey, null, innerKey, value, propagatedHeaders));
- } else if (hasMessageKey) {
- return new KeyValueHolder(
- body, new ProducerRecord(innerTopic, null, null, innerKey, value, propagatedHeaders));
- } else {
- return new KeyValueHolder(
- body, new ProducerRecord(innerTopic, null, null, null, value, propagatedHeaders));
- }
+ return new KeyValueHolder(
+ body,
+ new ProducerRecord(innerTopic, innerPartitionKey, null, innerKey, value, propagatedHeaders));
}
@Override
@@ -258,27 +245,19 @@ public class KafkaProducer extends DefaultAsyncProducer {
final Integer partitionKey = endpoint.getConfiguration().getPartitionKey() != null
? endpoint.getConfiguration().getPartitionKey()
: exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
- final boolean hasPartitionKey = partitionKey != null;
// endpoint take precedence over header configuration
Object key = endpoint.getConfiguration().getKey() != null
? endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY);
- final Object messageKey = key != null
- ? tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializer()) : null;
- final boolean hasMessageKey = messageKey != null;
+ if (key != null) {
+ key = tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializer());
+ }
// must convert each entry of the iterator into the value according to
// the serializer
Object value = tryConvertToSerializedType(exchange, msg, endpoint.getConfiguration().getValueSerializer());
- ProducerRecord record;
- if (hasPartitionKey && hasMessageKey) {
- record = new ProducerRecord(topic, partitionKey, null, key, value, propagatedHeaders);
- } else if (hasMessageKey) {
- record = new ProducerRecord(topic, null, null, key, value, propagatedHeaders);
- } else {
- record = new ProducerRecord(topic, null, null, null, value, propagatedHeaders);
- }
+ ProducerRecord record = new ProducerRecord(topic, partitionKey, null, key, value, propagatedHeaders);
return Collections.singletonList(new KeyValueHolder<Object, ProducerRecord>((Object) exchange, record)).iterator();
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index cb7015b..f3b6559 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -73,7 +73,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
@EndpointInject("kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1")
private Endpoint toStrings;
- @EndpointInject("kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1&partitionKey=1")
+ @EndpointInject("kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1&partitionKey=0")
private Endpoint toStrings2;
@EndpointInject("kafka:" + TOPIC_INTERCEPTED + "?requestRequiredAcks=-1"
@@ -163,9 +163,9 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
- sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "1");
+ sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "0");
sendMessagesInRoute(messageInOtherTopic, stringsTemplate, "IT test message in other topic",
- KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC,
+ KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
TOPIC_STRINGS_IN_HEADER);
createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
@@ -196,7 +196,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
sendMessagesInRoute(messageInTopic, stringsTemplate2, "IT test message", (String[]) null);
sendMessagesInRoute(messageInOtherTopic, stringsTemplate2, "IT test message in other topic",
- KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC,
+ KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
TOPIC_STRINGS_IN_HEADER);
createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
@@ -225,9 +225,9 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
- sendMessagesInRoute(messageInTopic, interceptedTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "1");
+ sendMessagesInRoute(messageInTopic, interceptedTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "0");
sendMessagesInRoute(messageInOtherTopic, interceptedTemplate, "IT test message in other topic",
- KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC,
+ KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
TOPIC_STRINGS_IN_HEADER);
createKafkaMessageConsumer(stringsConsumerConn, TOPIC_INTERCEPTED, TOPIC_STRINGS_IN_HEADER, messagesLatch);
@@ -251,12 +251,12 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
msgs.add("Message " + x);
}
- sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1");
+ sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "0");
msgs = new ArrayList<>();
for (int x = 0; x < messageInOtherTopic; x++) {
msgs.add("Other Message " + x);
}
- sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC,
+ sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
TOPIC_STRINGS_IN_HEADER);
createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index a12aec6..2b812e5 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -277,6 +277,19 @@ public class KafkaProducerTest {
}
@Test
+ public void processSendsMessageWithPartitionKeyHeaderOnly() throws Exception {
+ endpoint.getConfiguration().setTopic("someTopic");
+ Mockito.when(exchange.getIn()).thenReturn(in);
+ Mockito.when(exchange.getMessage()).thenReturn(out);
+ in.setHeader(KafkaConstants.PARTITION_KEY, 4);
+
+ producer.process(exchange);
+
+ verifySendMessage(4, "someTopic");
+ assertRecordMetadataExists();
+ }
+
+ @Test
public void processSendsMessageWithMessageKeyHeader() throws Exception {
endpoint.getConfiguration().setTopic("someTopic");
Mockito.when(exchange.getIn()).thenReturn(in);
@@ -380,6 +393,14 @@ public class KafkaProducerTest {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
+ protected void verifySendMessage(Integer partitionKey, String topic) {
+ ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
+ Mockito.verify(producer.getKafkaProducer()).send(captor.capture());
+ assertEquals(partitionKey, captor.getValue().partition());
+ assertEquals(topic, captor.getValue().topic());
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
protected void verifySendMessage(String topic, String messageKey) {
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
Mockito.verify(producer.getKafkaProducer()).send(captor.capture());