You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/01 10:20:45 UTC
[camel] 02/02: CAMEL-13878: Message is forwarded to the wrong Kafka
Topic.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 34259dcfa79a5e60fad86ee0f32be977d499b2da
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Oct 1 12:20:19 2019 +0200
CAMEL-13878: Message is forwarded to the wrong Kafka Topic.
---
.../camel-kafka/src/main/docs/kafka-component.adoc | 8 ++---
.../camel/component/kafka/KafkaConfiguration.java | 36 -------------------
.../camel/component/kafka/KafkaProducer.java | 21 -----------
.../camel/component/kafka/KafkaProducerTest.java | 42 ++--------------------
4 files changed, 5 insertions(+), 102 deletions(-)
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 1d2182e..c459720 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -332,16 +332,12 @@ Before sending a message to Kafka you can configure the following headers.
|===
| Header constant | Header value | Type | Description
| KafkaConstants.KEY | "kafka.KEY" | Object | *Required* The key of the message in order to ensure that all related message goes in the same partition
-| KafkaConstants.TOPIC | "kafka.TOPIC" | String | The topic to which send the message (only read if the `bridgeEndpoint` endpoint parameter is `true`)
| KafkaConstants.OVERRIDE_TOPIC | "kafka.OVERRIDE_TOPIC" | String | The topic to which send the message (override and takes precedence), and the header is not preserved. |
| KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition (only used if the `KafkaConstants.KEY` header is defined)
|===
-If you want to send a message to a dynamic topic then favour using `KafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header
-that are not send along the message, as its removed in the producer. If you are using `KafkaConstants.TOPIC` then the header
-is propagated in the producer, which means that if the consumer is also Camel and you want to route to a 3rd topic then
-this header may interfere and route to a previous topic, which is not your intention. Therefore you need to turn on the bridgeEndpoint option;
-or better just use the `KafkaConstants.OVERRIDE_TOPIC` header instead.
+If you want to send a message to a dynamic topic then use `KafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header
+that are not send along the message, as its removed in the producer.
After the message is sent to Kafka, the following headers are available
[width="100%",cols="2m,2m,1m,5",options="header"]
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 71564fe..02db043 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -137,12 +137,6 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
@UriParam(label = "consumer")
private StateRepository<String, String> offsetRepository;
- // Producer Camel specific configuration properties
- @UriParam(label = "producer")
- private boolean bridgeEndpoint;
- @UriParam(label = "producer", defaultValue = "true")
- private boolean circularTopicDetection = true;
-
// Producer configuration properties
@UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
@@ -556,36 +550,6 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
this.groupId = groupId;
}
- public boolean isBridgeEndpoint() {
- return bridgeEndpoint;
- }
-
- /**
- * If the option is true, then KafkaProducer will ignore the
- * KafkaConstants.TOPIC header setting of the inbound message.
- */
- public void setBridgeEndpoint(boolean bridgeEndpoint) {
- this.bridgeEndpoint = bridgeEndpoint;
- }
-
- public boolean isCircularTopicDetection() {
- return circularTopicDetection;
- }
-
- /**
- * If the option is true, then KafkaProducer will detect if the message is
- * attempted to be sent back to the same topic it may come from, if the
- * message was original from a kafka consumer. If the KafkaConstants.TOPIC
- * header is the same as the original kafka consumer topic, then the header
- * setting is ignored, and the topic of the producer endpoint is used. In
- * other words this avoids sending the same message back to where it came
- * from. This option is not in use if the option bridgeEndpoint is set to
- * true.
- */
- public void setCircularTopicDetection(boolean circularTopicDetection) {
- this.circularTopicDetection = circularTopicDetection;
- }
-
public String getPartitioner() {
return partitioner;
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index ea3c8a6..04bbb32 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -147,27 +147,6 @@ public class KafkaProducer extends DefaultAsyncProducer {
if (overrideTopic != null) {
log.debug("Using override topic: {}", overrideTopic);
topic = overrideTopic.toString();
- } else if (!endpoint.getConfiguration().isBridgeEndpoint()) {
- String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, String.class);
- boolean allowHeader = true;
-
- // when we do not bridge then detect if we try to send back to ourselves
- // which we most likely do not want to do
- if (headerTopic != null && endpoint.getConfiguration().isCircularTopicDetection()) {
- Endpoint from = exchange.getFromEndpoint();
- if (from instanceof KafkaEndpoint) {
- String fromTopic = ((KafkaEndpoint) from).getConfiguration().getTopic();
- allowHeader = !headerTopic.equals(fromTopic);
- if (!allowHeader) {
- log.debug("Circular topic detected from message header."
- + " Cannot send to same topic as the message comes from: {}"
- + ". Will use endpoint configured topic: {}", from, topic);
- }
- }
- }
- if (allowHeader && headerTopic != null) {
- topic = headerTopic;
- }
}
if (topic == null) {
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index e3120fa..93512fc 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -170,7 +170,7 @@ public class KafkaProducerTest {
producer.process(exchange);
- verifySendMessage("anotherTopic");
+ verifySendMessage("sometopic");
assertRecordMetadataExists();
}
@@ -189,7 +189,7 @@ public class KafkaProducerTest {
// the header is preserved
assertNotNull(in.getHeader(KafkaConstants.TOPIC));
- verifySendMessage(4, "anotherTopic", "someKey");
+ verifySendMessage(4, "sometopic", "someKey");
assertRecordMetadataExists();
}
@@ -277,9 +277,8 @@ public class KafkaProducerTest {
}
@Test
- public void processSendMessageWithBridgeEndpoint() throws Exception {
+ public void processSendMessageWithTopicHeader() throws Exception {
endpoint.getConfiguration().setTopic("someTopic");
- endpoint.getConfiguration().setBridgeEndpoint(true);
Mockito.when(exchange.getIn()).thenReturn(in);
Mockito.when(exchange.getOut()).thenReturn(out);
in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
@@ -292,41 +291,6 @@ public class KafkaProducerTest {
assertRecordMetadataExists();
}
- @Test
- public void processSendMessageWithCircularDetected() throws Exception {
- endpoint.getConfiguration().setTopic("sometopic");
- endpoint.getConfiguration().setCircularTopicDetection(true);
- Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getOut()).thenReturn(out);
- Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);
- // this is the from topic that are from the fromEndpoint
- in.setHeader(KafkaConstants.TOPIC, "fromtopic");
- in.setHeader(KafkaConstants.KEY, "somekey");
-
- producer.process(exchange);
-
- verifySendMessage("sometopic", "somekey");
- assertRecordMetadataExists();
- }
-
- @Test
- public void processSendMessageWithNoCircularDetected() throws Exception {
- endpoint.getConfiguration().setTopic("sometopic");
- endpoint.getConfiguration().setCircularTopicDetection(false);
- Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getOut()).thenReturn(out);
- Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);
- // this is the from topic that are from the fromEndpoint
- in.setHeader(KafkaConstants.TOPIC, "fromtopic");
- in.setHeader(KafkaConstants.KEY, "somekey");
-
- producer.process(exchange);
-
- // will end up sending back to itself at fromtopic
- verifySendMessage("fromtopic", "somekey");
- assertRecordMetadataExists();
- }
-
@Test // Message and Topic Name alone
public void processSendsMessageWithMessageTopicName() throws Exception {
endpoint.getConfiguration().setTopic("someTopic");