You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/28 13:26:11 UTC

[camel] branch master updated: CAMEL-16060 camel-kafka - decouple kafka.PARTITION_KEY from kafka.KEY (#5263)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 73913a9  CAMEL-16060 camel-kafka - decouple kafka.PARTITION_KEY from kafka.KEY (#5263)
73913a9 is described below

commit 73913a9a1d65394743c30ffcb263cfa8dee980a7
Author: jenskordowski <10...@users.noreply.github.com>
AuthorDate: Sun Mar 28 15:25:49 2021 +0200

    CAMEL-16060 camel-kafka - decouple kafka.PARTITION_KEY from kafka.KEY (#5263)
    
    * CAMEL-16060 camel-kafka - decouple kafka.PARTITION_KEY from kafka.KEY
    
    * CAMEL-16060 test fixes and documentation update
    
    Co-authored-by: Jens Kordowski <je...@sap.com>
---
 .../camel-kafka/src/main/docs/kafka-component.adoc |  2 +-
 .../camel/component/kafka/KafkaProducer.java       | 43 ++++++----------------
 .../component/kafka/KafkaProducerFullTest.java     | 16 ++++----
 .../camel/component/kafka/KafkaProducerTest.java   | 21 +++++++++++
 4 files changed, 41 insertions(+), 41 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 81ab68c..5dddf19 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -310,7 +310,7 @@ Before sending a message to Kafka you can configure the following headers.
 | Header constant              | Header value          | Type    | Description
 | KafkaConstants.KEY           | "kafka.KEY"           | Object  | *Required* The key of the message in order to ensure that all related message goes in the same partition
 | KafkaConstants.OVERRIDE_TOPIC | "kafka.OVERRIDE_TOPIC" | String  | The topic to which send the message (override and takes precedence), and the header is not preserved.
-| KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition (only used if the `KafkaConstants.KEY` header is defined)
+| KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition
 |===
 
 If you want to send a message to a dynamic topic then use `KafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index a8ef383..32aa440 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -189,8 +189,6 @@ public class KafkaProducer extends DefaultAsyncProducer {
                     String innerTopic = msgTopic;
                     Object innerKey = null;
                     Integer innerPartitionKey = null;
-                    boolean hasPartitionKey = false;
-                    boolean hasMessageKey = false;
 
                     Object value = next;
                     Exchange ex = null;
@@ -214,18 +212,15 @@ public class KafkaProducer extends DefaultAsyncProducer {
                             innerPartitionKey = endpoint.getConfiguration().getPartitionKey() != null
                                     ? endpoint.getConfiguration().getPartitionKey()
                                     : innerMmessage.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
-                            hasPartitionKey = innerPartitionKey != null;
                         }
 
                         if (innerMmessage.getHeader(KafkaConstants.KEY) != null) {
                             innerKey = endpoint.getConfiguration().getKey() != null
                                     ? endpoint.getConfiguration().getKey() : innerMmessage.getHeader(KafkaConstants.KEY);
-
-                            final Object messageKey = innerKey != null
-                                    ? tryConvertToSerializedType(innerExchange, innerKey,
-                                            endpoint.getConfiguration().getKeySerializer())
-                                    : null;
-                            hasMessageKey = messageKey != null;
+                            if (innerKey != null) {
+                                innerKey = tryConvertToSerializedType(innerExchange, innerKey,
+                                        endpoint.getConfiguration().getKeySerializer());
+                            }
                         }
 
                         ex = innerExchange == null ? exchange : innerExchange;
@@ -234,17 +229,9 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
                     }
 
-                    if (hasPartitionKey && hasMessageKey) {
-                        return new KeyValueHolder(
-                                body,
-                                new ProducerRecord(innerTopic, innerPartitionKey, null, innerKey, value, propagatedHeaders));
-                    } else if (hasMessageKey) {
-                        return new KeyValueHolder(
-                                body, new ProducerRecord(innerTopic, null, null, innerKey, value, propagatedHeaders));
-                    } else {
-                        return new KeyValueHolder(
-                                body, new ProducerRecord(innerTopic, null, null, null, value, propagatedHeaders));
-                    }
+                    return new KeyValueHolder(
+                            body,
+                            new ProducerRecord(innerTopic, innerPartitionKey, null, innerKey, value, propagatedHeaders));
                 }
 
                 @Override
@@ -258,27 +245,19 @@ public class KafkaProducer extends DefaultAsyncProducer {
         final Integer partitionKey = endpoint.getConfiguration().getPartitionKey() != null
                 ? endpoint.getConfiguration().getPartitionKey()
                 : exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
-        final boolean hasPartitionKey = partitionKey != null;
 
         // endpoint take precedence over header configuration
         Object key = endpoint.getConfiguration().getKey() != null
                 ? endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY);
-        final Object messageKey = key != null
-                ? tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializer()) : null;
-        final boolean hasMessageKey = messageKey != null;
+        if (key != null) {
+            key = tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializer());
+        }
 
         // must convert each entry of the iterator into the value according to
         // the serializer
         Object value = tryConvertToSerializedType(exchange, msg, endpoint.getConfiguration().getValueSerializer());
 
-        ProducerRecord record;
-        if (hasPartitionKey && hasMessageKey) {
-            record = new ProducerRecord(topic, partitionKey, null, key, value, propagatedHeaders);
-        } else if (hasMessageKey) {
-            record = new ProducerRecord(topic, null, null, key, value, propagatedHeaders);
-        } else {
-            record = new ProducerRecord(topic, null, null, null, value, propagatedHeaders);
-        }
+        ProducerRecord record = new ProducerRecord(topic, partitionKey, null, key, value, propagatedHeaders);
         return Collections.singletonList(new KeyValueHolder<Object, ProducerRecord>((Object) exchange, record)).iterator();
     }
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index cb7015b..f3b6559 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -73,7 +73,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
     @EndpointInject("kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1")
     private Endpoint toStrings;
 
-    @EndpointInject("kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1&partitionKey=1")
+    @EndpointInject("kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1&partitionKey=0")
     private Endpoint toStrings2;
 
     @EndpointInject("kafka:" + TOPIC_INTERCEPTED + "?requestRequiredAcks=-1"
@@ -163,9 +163,9 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
 
         CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
 
-        sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "1");
+        sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "0");
         sendMessagesInRoute(messageInOtherTopic, stringsTemplate, "IT test message in other topic",
-                KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC,
+                KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
                 TOPIC_STRINGS_IN_HEADER);
 
         createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
@@ -196,7 +196,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
 
         sendMessagesInRoute(messageInTopic, stringsTemplate2, "IT test message", (String[]) null);
         sendMessagesInRoute(messageInOtherTopic, stringsTemplate2, "IT test message in other topic",
-                KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC,
+                KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
                 TOPIC_STRINGS_IN_HEADER);
 
         createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
@@ -225,9 +225,9 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
 
         CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
 
-        sendMessagesInRoute(messageInTopic, interceptedTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "1");
+        sendMessagesInRoute(messageInTopic, interceptedTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "0");
         sendMessagesInRoute(messageInOtherTopic, interceptedTemplate, "IT test message in other topic",
-                KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC,
+                KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
                 TOPIC_STRINGS_IN_HEADER);
         createKafkaMessageConsumer(stringsConsumerConn, TOPIC_INTERCEPTED, TOPIC_STRINGS_IN_HEADER, messagesLatch);
 
@@ -251,12 +251,12 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
             msgs.add("Message " + x);
         }
 
-        sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1");
+        sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "0");
         msgs = new ArrayList<>();
         for (int x = 0; x < messageInOtherTopic; x++) {
             msgs.add("Other Message " + x);
         }
-        sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC,
+        sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
                 TOPIC_STRINGS_IN_HEADER);
 
         createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index a12aec6..2b812e5 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -277,6 +277,19 @@ public class KafkaProducerTest {
     }
 
     @Test
+    public void processSendsMessageWithPartitionKeyHeaderOnly() throws Exception {
+        endpoint.getConfiguration().setTopic("someTopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getMessage()).thenReturn(out);
+        in.setHeader(KafkaConstants.PARTITION_KEY, 4);
+
+        producer.process(exchange);
+
+        verifySendMessage(4, "someTopic");
+        assertRecordMetadataExists();
+    }
+
+    @Test
     public void processSendsMessageWithMessageKeyHeader() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
@@ -380,6 +393,14 @@ public class KafkaProducerTest {
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
+    protected void verifySendMessage(Integer partitionKey, String topic) {
+        ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
+        Mockito.verify(producer.getKafkaProducer()).send(captor.capture());
+        assertEquals(partitionKey, captor.getValue().partition());
+        assertEquals(topic, captor.getValue().topic());
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     protected void verifySendMessage(String topic, String messageKey) {
         ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
         Mockito.verify(producer.getKafkaProducer()).send(captor.capture());