You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/01 10:20:44 UTC

[camel] 01/02: CAMEL-13878: Message is forwarded to the wrong Kafka Topic.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a00e82de71e9980f9d354666eec3964a54c523a3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Oct 1 12:00:20 2019 +0200

    CAMEL-13878: Message is forwarded to the wrong Kafka Topic.
---
 .../camel-kafka/src/main/docs/kafka-component.adoc |  7 +++++++
 .../camel/component/kafka/KafkaConstants.java      |  1 +
 .../camel/component/kafka/KafkaProducer.java       |  7 ++++++-
 .../camel/component/kafka/KafkaProducerTest.java   | 24 ++++++++++++++++++++++
 4 files changed, 38 insertions(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 1e9968d..1d2182e 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -333,9 +333,16 @@ Before sending a message to Kafka you can configure the following headers.
 | Header constant              | Header value          | Type    | Description
 | KafkaConstants.KEY           | "kafka.KEY"           | Object  | *Required* The key of the message in order to ensure that all related message goes in the same partition
 | KafkaConstants.TOPIC         | "kafka.TOPIC"         | String  | The topic to which send the message (only read if the `bridgeEndpoint` endpoint parameter is `true`)
+| KafkaConstants.OVERRIDE_TOPIC | "kafka.OVERRIDE_TOPIC" | String  | The topic to which send the message (override and takes precedence), and the header is not preserved. |
 | KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition (only used if the `KafkaConstants.KEY` header is defined)
 |===
 
+If you want to send a message to a dynamic topic then favour using `KafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header
+that are not send along the message, as its removed in the producer. If you are using `KafkaConstants.TOPIC` then the header
+is propagated in the producer, which means that if the consumer is also Camel and you want to route to a 3rd topic then
+this header may interfere and route to a previous topic, which is not your intention. Therefore you need to turn on the bridgeEndpoint option;
+or better just use the `KafkaConstants.OVERRIDE_TOPIC` header instead.
+
 After the message is sent to Kafka, the following headers are available
 [width="100%",cols="2m,2m,1m,5",options="header"]
 |===
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 0d1468d..ecda3bf 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -22,6 +22,7 @@ public final class KafkaConstants {
     public static final String PARTITION = "kafka.PARTITION";
     public static final String KEY = "kafka.KEY";
     public static final String TOPIC = "kafka.TOPIC";
+    public static final String OVERRIDE_TOPIC = "kafka.OVERRIDE_TOPIC";
     public static final String OFFSET = "kafka.OFFSET";
     public static final String HEADERS = "kafka.HEADERS";
     public static final String LAST_RECORD_BEFORE_COMMIT = "kafka.LAST_RECORD_BEFORE_COMMIT";
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index c43d824..ea3c8a6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -142,7 +142,12 @@ public class KafkaProducer extends DefaultAsyncProducer {
     protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws Exception {
         String topic = endpoint.getConfiguration().getTopic();
 
-        if (!endpoint.getConfiguration().isBridgeEndpoint()) {
+        // must remove header so its not propagated
+        Object overrideTopic = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TOPIC);
+        if (overrideTopic != null) {
+            log.debug("Using override topic: {}", overrideTopic);
+            topic = overrideTopic.toString();
+        } else if (!endpoint.getConfiguration().isBridgeEndpoint()) {
             String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, String.class);
             boolean allowHeader = true;
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index a67aaf9..e3120fa 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -40,6 +40,8 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -184,6 +186,28 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
 
+        // the header is preserved
+        assertNotNull(in.getHeader(KafkaConstants.TOPIC));
+
+        verifySendMessage(4, "anotherTopic", "someKey");
+        assertRecordMetadataExists();
+    }
+
+    @Test
+    public void processSendsMessageWithOverrideTopicHeaderAndEndPoint() throws Exception {
+        endpoint.getConfiguration().setTopic("sometopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
+
+        in.setHeader(KafkaConstants.PARTITION_KEY, 4);
+        in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
+        in.setHeader(KafkaConstants.KEY, "someKey");
+
+        producer.process(exchange);
+
+        // the header is now removed
+        assertNull(in.getHeader(KafkaConstants.OVERRIDE_TOPIC));
+
         verifySendMessage(4, "anotherTopic", "someKey");
         assertRecordMetadataExists();
     }