You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/01 10:20:45 UTC

[camel] 02/02: CAMEL-13878: Message is forwarded to the wrong Kafka Topic.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 34259dcfa79a5e60fad86ee0f32be977d499b2da
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Oct 1 12:20:19 2019 +0200

    CAMEL-13878: Message is forwarded to the wrong Kafka Topic.
---
 .../camel-kafka/src/main/docs/kafka-component.adoc |  8 ++---
 .../camel/component/kafka/KafkaConfiguration.java  | 36 -------------------
 .../camel/component/kafka/KafkaProducer.java       | 21 -----------
 .../camel/component/kafka/KafkaProducerTest.java   | 42 ++--------------------
 4 files changed, 5 insertions(+), 102 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 1d2182e..c459720 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -332,16 +332,12 @@ Before sending a message to Kafka you can configure the following headers.
 |===
 | Header constant              | Header value          | Type    | Description
 | KafkaConstants.KEY           | "kafka.KEY"           | Object  | *Required* The key of the message in order to ensure that all related message goes in the same partition
-| KafkaConstants.TOPIC         | "kafka.TOPIC"         | String  | The topic to which send the message (only read if the `bridgeEndpoint` endpoint parameter is `true`)
 | KafkaConstants.OVERRIDE_TOPIC | "kafka.OVERRIDE_TOPIC" | String  | The topic to which send the message (override and takes precedence), and the header is not preserved. |
 | KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition (only used if the `KafkaConstants.KEY` header is defined)
 |===
 
-If you want to send a message to a dynamic topic then favour using `KafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header
-that are not send along the message, as its removed in the producer. If you are using `KafkaConstants.TOPIC` then the header
-is propagated in the producer, which means that if the consumer is also Camel and you want to route to a 3rd topic then
-this header may interfere and route to a previous topic, which is not your intention. Therefore you need to turn on the bridgeEndpoint option;
-or better just use the `KafkaConstants.OVERRIDE_TOPIC` header instead.
+If you want to send a message to a dynamic topic then use `KafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header
+that are not send along the message, as its removed in the producer.
 
 After the message is sent to Kafka, the following headers are available
 [width="100%",cols="2m,2m,1m,5",options="header"]
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 71564fe..02db043 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -137,12 +137,6 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
     @UriParam(label = "consumer")
     private StateRepository<String, String> offsetRepository;
 
-    // Producer Camel specific configuration properties
-    @UriParam(label = "producer")
-    private boolean bridgeEndpoint;
-    @UriParam(label = "producer", defaultValue = "true")
-    private boolean circularTopicDetection = true;
-
     // Producer configuration properties
     @UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
     private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
@@ -556,36 +550,6 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
         this.groupId = groupId;
     }
 
-    public boolean isBridgeEndpoint() {
-        return bridgeEndpoint;
-    }
-
-    /**
-     * If the option is true, then KafkaProducer will ignore the
-     * KafkaConstants.TOPIC header setting of the inbound message.
-     */
-    public void setBridgeEndpoint(boolean bridgeEndpoint) {
-        this.bridgeEndpoint = bridgeEndpoint;
-    }
-
-    public boolean isCircularTopicDetection() {
-        return circularTopicDetection;
-    }
-
-    /**
-     * If the option is true, then KafkaProducer will detect if the message is
-     * attempted to be sent back to the same topic it may come from, if the
-     * message was original from a kafka consumer. If the KafkaConstants.TOPIC
-     * header is the same as the original kafka consumer topic, then the header
-     * setting is ignored, and the topic of the producer endpoint is used. In
-     * other words this avoids sending the same message back to where it came
-     * from. This option is not in use if the option bridgeEndpoint is set to
-     * true.
-     */
-    public void setCircularTopicDetection(boolean circularTopicDetection) {
-        this.circularTopicDetection = circularTopicDetection;
-    }
-
     public String getPartitioner() {
         return partitioner;
     }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index ea3c8a6..04bbb32 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -147,27 +147,6 @@ public class KafkaProducer extends DefaultAsyncProducer {
         if (overrideTopic != null) {
             log.debug("Using override topic: {}", overrideTopic);
             topic = overrideTopic.toString();
-        } else if (!endpoint.getConfiguration().isBridgeEndpoint()) {
-            String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, String.class);
-            boolean allowHeader = true;
-
-            // when we do not bridge then detect if we try to send back to ourselves
-            // which we most likely do not want to do
-            if (headerTopic != null && endpoint.getConfiguration().isCircularTopicDetection()) {
-                Endpoint from = exchange.getFromEndpoint();
-                if (from instanceof KafkaEndpoint) {
-                    String fromTopic = ((KafkaEndpoint) from).getConfiguration().getTopic();
-                    allowHeader = !headerTopic.equals(fromTopic);
-                    if (!allowHeader) {
-                        log.debug("Circular topic detected from message header."
-                                + " Cannot send to same topic as the message comes from: {}"
-                                + ". Will use endpoint configured topic: {}", from, topic);
-                    }
-                }
-            }
-            if (allowHeader && headerTopic != null) {
-                topic = headerTopic;
-            }
         }
 
         if (topic == null) {
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index e3120fa..93512fc 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -170,7 +170,7 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
 
-        verifySendMessage("anotherTopic");
+        verifySendMessage("sometopic");
         assertRecordMetadataExists();
     }
 
@@ -189,7 +189,7 @@ public class KafkaProducerTest {
         // the header is preserved
         assertNotNull(in.getHeader(KafkaConstants.TOPIC));
 
-        verifySendMessage(4, "anotherTopic", "someKey");
+        verifySendMessage(4, "sometopic", "someKey");
         assertRecordMetadataExists();
     }
 
@@ -277,9 +277,8 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void processSendMessageWithBridgeEndpoint() throws Exception {
+    public void processSendMessageWithTopicHeader() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
-        endpoint.getConfiguration().setBridgeEndpoint(true);
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
@@ -292,41 +291,6 @@ public class KafkaProducerTest {
         assertRecordMetadataExists();
     }
 
-    @Test
-    public void processSendMessageWithCircularDetected() throws Exception {
-        endpoint.getConfiguration().setTopic("sometopic");
-        endpoint.getConfiguration().setCircularTopicDetection(true);
-        Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getOut()).thenReturn(out);
-        Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);
-        // this is the from topic that are from the fromEndpoint
-        in.setHeader(KafkaConstants.TOPIC, "fromtopic");
-        in.setHeader(KafkaConstants.KEY, "somekey");
-
-        producer.process(exchange);
-
-        verifySendMessage("sometopic", "somekey");
-        assertRecordMetadataExists();
-    }
-
-    @Test
-    public void processSendMessageWithNoCircularDetected() throws Exception {
-        endpoint.getConfiguration().setTopic("sometopic");
-        endpoint.getConfiguration().setCircularTopicDetection(false);
-        Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getOut()).thenReturn(out);
-        Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);
-        // this is the from topic that are from the fromEndpoint
-        in.setHeader(KafkaConstants.TOPIC, "fromtopic");
-        in.setHeader(KafkaConstants.KEY, "somekey");
-
-        producer.process(exchange);
-
-        // will end up sending back to itself at fromtopic
-        verifySendMessage("fromtopic", "somekey");
-        assertRecordMetadataExists();
-    }
-
     @Test // Message and Topic Name alone
     public void processSendsMessageWithMessageTopicName() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");