You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/24 11:34:39 UTC

[1/3] camel git commit: CAMEL-1193: Make kafka easier to use when routing between topics to avoid the header topic causing Camel to send the message to itself instead of the endpoint topic name as users would expect.

Repository: camel
Updated Branches:
  refs/heads/master bab5b27bc -> 6243402b2


CAMEL-1193: Make kafka easier to use when routing between topics to avoid the header topic causing Camel to send the message to itself instead of the endpoint topic name as users would expect.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1d164d54
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1d164d54
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1d164d54

Branch: refs/heads/master
Commit: 1d164d54675069fb672be606e99c2c7944cd8f23
Parents: bab5b27
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Apr 24 13:03:56 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Apr 24 13:14:41 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  3 +-
 .../camel/component/kafka/KafkaEndpoint.java    | 18 +++++++-
 .../camel/component/kafka/KafkaProducer.java    | 24 ++++++++++-
 .../component/kafka/KafkaProducerTest.java      | 44 ++++++++++++++++++--
 4 files changed, 83 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1d164d54/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 4604e9c..dceca6f 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -66,7 +66,7 @@ with the following path and query parameters:
 | **topic** | *Required* Name of the topic to use. On the consumer you can use comma to separate multiple topics. A producer can only send a message to a single topic. |  | String
 |=======================================================================
 
-#### Query Parameters (82 parameters):
+#### Query Parameters (83 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -100,6 +100,7 @@ with the following path and query parameters:
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | **bridgeEndpoint** (producer) | If the option is true then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message. | false | boolean
 | **bufferMemorySize** (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests. | 33554432 | Integer
+| **circularKeyDetection** (producer) | If the option is true then KafkaProducer will detect if the message is attempted to be sent back to the same topic it may come from if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the same as the original kafka consumer topic then the header setting is ignored and the topic of the producer endpoint is used. In other words this avoids sending the same message back to where it came from. This option is not in use if the option bridgeEndpoint is set to true. | true | boolean
 | **compressionCodec** (producer) | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none gzip and snappy. | none | String
 | **connectionMaxIdleMs** (producer) | Close idle connections after the number of milliseconds specified by this config. | 540000 | Integer
 | **key** (producer) | The record key (or null if no key is specified). If this option has been configured then it take precedence over header link KafkaConstantsKEY |  | String

http://git-wip-us.apache.org/repos/asf/camel/blob/1d164d54/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 46bf844..b4c86ef 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -20,7 +20,6 @@ import java.lang.reflect.Field;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
-import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -53,6 +52,8 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     private KafkaConfiguration configuration = new KafkaConfiguration();
     @UriParam(label = "producer")
     private boolean bridgeEndpoint;
+    @UriParam(label = "producer", defaultValue = "true")
+    private boolean circularTopicDetection = true;
 
     public KafkaEndpoint() {
     }
@@ -196,4 +197,19 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     public void setBridgeEndpoint(boolean bridgeEndpoint) {
         this.bridgeEndpoint = bridgeEndpoint;
     }
+
+    public boolean isCircularTopicDetection() {
+        return circularTopicDetection;
+    }
+
+    /**
+     * If the option is true, then KafkaProducer will detect if the message is attempted to be sent back to the same topic
+     * it may come from, if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the
+     * same as the original kafka consumer topic, then the header setting is ignored, and the topic of the producer
+     * endpoint is used. In other words this avoids sending the same message back to where it came from.
+     * This option is not in use if the option bridgeEndpoint is set to true.
+     */
+    public void setCircularTopicDetection(boolean circularTopicDetection) {
+        this.circularTopicDetection = circularTopicDetection;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1d164d54/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 65fd9d2..01d29b5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelException;
 import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.kafka.clients.producer.Callback;
@@ -120,9 +121,30 @@ public class KafkaProducer extends DefaultAsyncProducer {
     @SuppressWarnings("unchecked")
     protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws CamelException {
         String topic = endpoint.getConfiguration().getTopic();
+
         if (!endpoint.isBridgeEndpoint()) {
-            topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class);
+            String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, String.class);
+            boolean allowHeader = true;
+
+            // when we do not bridge then detect if we try to send back to ourselves
+            // which we most likely do not want to do
+            if (headerTopic != null && endpoint.isCircularTopicDetection()) {
+                Endpoint from = exchange.getFromEndpoint();
+                if (from instanceof KafkaEndpoint) {
+                    String fromTopic = ((KafkaEndpoint) from).getConfiguration().getTopic();
+                    allowHeader = !headerTopic.equals(fromTopic);
+                    if (!allowHeader) {
+                        log.debug("Circular topic detected from message header."
+                            + " Cannot send to same topic as the message comes from: {}"
+                            + ". Will use endpoint configured topic: {}", from, topic);
+                    }
+                }
+            }
+            if (allowHeader && headerTopic != null) {
+                topic = headerTopic;
+            }
         }
+
         if (topic == null) {
             throw new CamelExchangeException("No topic key set", exchange);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/1d164d54/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index d30e737..9143017 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -47,6 +47,7 @@ public class KafkaProducerTest {
 
     private KafkaProducer producer;
     private KafkaEndpoint endpoint;
+    private KafkaEndpoint fromEndpoint;
 
     private TypeConverter converter = Mockito.mock(TypeConverter.class);
     private CamelContext context = Mockito.mock(CamelContext.class);
@@ -63,6 +64,8 @@ public class KafkaProducerTest {
         endpoint = kafka.createEndpoint("kafka:sometopic", "sometopic", new HashMap());
         producer = new KafkaProducer(endpoint);
 
+        fromEndpoint = kafka.createEndpoint("kafka:fromtopic", "fromtopic", new HashMap());
+
         RecordMetadata rm = new RecordMetadata(null, 1, 1);
         Future future = Mockito.mock(Future.class);
         Mockito.when(future.get()).thenReturn(rm);
@@ -204,7 +207,7 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void processSendsMesssageWithPartitionKeyHeader() throws Exception {
+    public void processSendsMessageWithPartitionKeyHeader() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
@@ -218,7 +221,7 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void processSendsMesssageWithMessageKeyHeader() throws Exception {
+    public void processSendsMessageWithMessageKeyHeader() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
@@ -246,8 +249,43 @@ public class KafkaProducerTest {
         assertRecordMetadataExists();
     }
 
+    @Test
+    public void processSendMessageWithCircularDetected() throws Exception {
+        endpoint.getConfiguration().setTopic("sometopic");
+        endpoint.setCircularTopicDetection(true); // enable by default
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
+        Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);
+        // this is the from topic that are from the fromEndpoint
+        in.setHeader(KafkaConstants.TOPIC, "fromtopic");
+        in.setHeader(KafkaConstants.KEY, "somekey");
+
+        producer.process(exchange);
+
+        verifySendMessage("sometopic", "somekey");
+        assertRecordMetadataExists();
+    }
+
+    @Test
+    public void processSendMessageWithNoCircularDetected() throws Exception {
+        endpoint.getConfiguration().setTopic("sometopic");
+        endpoint.setCircularTopicDetection(false); // enable by default
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
+        Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);
+        // this is the from topic that are from the fromEndpoint
+        in.setHeader(KafkaConstants.TOPIC, "fromtopic");
+        in.setHeader(KafkaConstants.KEY, "somekey");
+
+        producer.process(exchange);
+
+        // will end up sending back to itself at fromtopic
+        verifySendMessage("fromtopic", "somekey");
+        assertRecordMetadataExists();
+    }
+
     @Test // Message and Topic Name alone
-    public void processSendsMesssageWithMessageTopicName() throws Exception {
+    public void processSendsMessageWithMessageTopicName() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);


[3/3] camel git commit: Add some logging in kafka producer when it sends/has all sent etc.

Posted by da...@apache.org.
Add some logging in kafka producer when it sends/has all sent etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6243402b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6243402b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6243402b

Branch: refs/heads/master
Commit: 6243402b2291af99c7f7b950f5e905489d403074
Parents: 8b5e93e
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Apr 24 13:29:33 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Apr 24 13:29:33 2017 +0200

----------------------------------------------------------------------
 .../apache/camel/component/kafka/KafkaProducer.java   | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6243402b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index ede3d3e..e3b556b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -232,7 +232,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
         }
 
         while (c.hasNext()) {
-            futures.add(kafkaProducer.send(c.next()));
+            ProducerRecord rec = c.next();
+            if (log.isDebugEnabled()) {
+                log.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
+            }
+            futures.add(kafkaProducer.send(rec));
         }
         for (Future<RecordMetadata> f : futures) {
             //wait for them all to be sent
@@ -248,7 +252,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
             KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback);
             while (c.hasNext()) {
                 cb.increment();
-                kafkaProducer.send(c.next(), cb);
+                ProducerRecord rec = c.next();
+                if (log.isDebugEnabled()) {
+                    log.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
+                }
+                kafkaProducer.send(rec, cb);
             }
             return cb.allSent();
         } catch (Exception ex) {
@@ -306,6 +314,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
         boolean allSent() {
             if (count.decrementAndGet() == 0) {
+                log.trace("All messages sent, continue routing.");
                 //was able to get all the work done while queuing the requests
                 callback.done(true);
                 return true;
@@ -327,6 +336,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
                 workerPool.submit(new Runnable() {
                     @Override
                     public void run() {
+                        log.trace("All messages sent, continue routing.");
                         callback.done(false);
                     }
                 });


[2/3] camel git commit: Make kafka bridgeEndpoint option on configuraion so they are all there and thus also available via spring boot

Posted by da...@apache.org.
Make kafka bridgeEndpoint option on configuraion so they are all there and thus also available via spring boot


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8b5e93ec
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8b5e93ec
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8b5e93ec

Branch: refs/heads/master
Commit: 8b5e93ec694ef31a2bd43248735d3bc274f4e9d3
Parents: 1d164d5
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Apr 24 13:24:20 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Apr 24 13:24:20 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  2 +-
 .../component/kafka/KafkaConfiguration.java     | 32 ++++++++++++++++++++
 .../camel/component/kafka/KafkaEndpoint.java    | 29 ------------------
 .../camel/component/kafka/KafkaProducer.java    |  4 +--
 .../component/kafka/KafkaProducerTest.java      |  6 ++--
 .../springboot/KafkaComponentConfiguration.java | 32 ++++++++++++++++++++
 6 files changed, 70 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index dceca6f..5637673 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -100,7 +100,7 @@ with the following path and query parameters:
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | **bridgeEndpoint** (producer) | If the option is true then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message. | false | boolean
 | **bufferMemorySize** (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests. | 33554432 | Integer
-| **circularKeyDetection** (producer) | If the option is true then KafkaProducer will detect if the message is attempted to be sent back to the same topic it may come from if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the same as the original kafka consumer topic then the header setting is ignored and the topic of the producer endpoint is used. In other words this avoids sending the same message back to where it came from. This option is not in use if the option bridgeEndpoint is set to true. | true | boolean
+| **circularTopicDetection** (producer) | If the option is true then KafkaProducer will detect if the message is attempted to be sent back to the same topic it may come from if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the same as the original kafka consumer topic then the header setting is ignored and the topic of the producer endpoint is used. In other words this avoids sending the same message back to where it came from. This option is not in use if the option bridgeEndpoint is set to true. | true | boolean
 | **compressionCodec** (producer) | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none gzip and snappy. | none | String
 | **connectionMaxIdleMs** (producer) | Close idle connections after the number of milliseconds specified by this config. | 540000 | Integer
 | **key** (producer) | The record key (or null if no key is specified). If this option has been configured then it take precedence over header link KafkaConstantsKEY |  | String

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index a609c79..bb4acbc 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -118,6 +118,12 @@ public class KafkaConfiguration implements Cloneable {
     @UriParam(label = "consumer")
     private StateRepository<String, String> offsetRepository;
 
+    //Producer Camel specific configuration properties
+    @UriParam(label = "producer")
+    private boolean bridgeEndpoint;
+    @UriParam(label = "producer", defaultValue = "true")
+    private boolean circularTopicDetection = true;
+
     //Producer configuration properties
     @UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
     private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
@@ -493,6 +499,32 @@ public class KafkaConfiguration implements Cloneable {
         this.groupId = groupId;
     }
 
+    public boolean isBridgeEndpoint() {
+        return bridgeEndpoint;
+    }
+
+    /**
+     * If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message.
+     */
+    public void setBridgeEndpoint(boolean bridgeEndpoint) {
+        this.bridgeEndpoint = bridgeEndpoint;
+    }
+
+    public boolean isCircularTopicDetection() {
+        return circularTopicDetection;
+    }
+
+    /**
+     * If the option is true, then KafkaProducer will detect if the message is attempted to be sent back to the same topic
+     * it may come from, if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the
+     * same as the original kafka consumer topic, then the header setting is ignored, and the topic of the producer
+     * endpoint is used. In other words this avoids sending the same message back to where it came from.
+     * This option is not in use if the option bridgeEndpoint is set to true.
+     */
+    public void setCircularTopicDetection(boolean circularTopicDetection) {
+        this.circularTopicDetection = circularTopicDetection;
+    }
+
     public String getPartitioner() {
         return partitioner;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index b4c86ef..14932bf 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -50,10 +50,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
 
     @UriParam
     private KafkaConfiguration configuration = new KafkaConfiguration();
-    @UriParam(label = "producer")
-    private boolean bridgeEndpoint;
-    @UriParam(label = "producer", defaultValue = "true")
-    private boolean circularTopicDetection = true;
 
     public KafkaEndpoint() {
     }
@@ -187,29 +183,4 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return new KafkaProducer(endpoint);
     }
 
-    public boolean isBridgeEndpoint() {
-        return bridgeEndpoint;
-    }
-
-    /**
-     * If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message.
-     */
-    public void setBridgeEndpoint(boolean bridgeEndpoint) {
-        this.bridgeEndpoint = bridgeEndpoint;
-    }
-
-    public boolean isCircularTopicDetection() {
-        return circularTopicDetection;
-    }
-
-    /**
-     * If the option is true, then KafkaProducer will detect if the message is attempted to be sent back to the same topic
-     * it may come from, if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the
-     * same as the original kafka consumer topic, then the header setting is ignored, and the topic of the producer
-     * endpoint is used. In other words this avoids sending the same message back to where it came from.
-     * This option is not in use if the option bridgeEndpoint is set to true.
-     */
-    public void setCircularTopicDetection(boolean circularTopicDetection) {
-        this.circularTopicDetection = circularTopicDetection;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 01d29b5..ede3d3e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -122,13 +122,13 @@ public class KafkaProducer extends DefaultAsyncProducer {
     protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws CamelException {
         String topic = endpoint.getConfiguration().getTopic();
 
-        if (!endpoint.isBridgeEndpoint()) {
+        if (!endpoint.getConfiguration().isBridgeEndpoint()) {
             String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, String.class);
             boolean allowHeader = true;
 
             // when we do not bridge then detect if we try to send back to ourselves
             // which we most likely do not want to do
-            if (headerTopic != null && endpoint.isCircularTopicDetection()) {
+            if (headerTopic != null && endpoint.getConfiguration().isCircularTopicDetection()) {
                 Endpoint from = exchange.getFromEndpoint();
                 if (from instanceof KafkaEndpoint) {
                     String fromTopic = ((KafkaEndpoint) from).getConfiguration().getTopic();

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 9143017..41378b8 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -236,7 +236,7 @@ public class KafkaProducerTest {
     @Test
     public void processSendMessageWithBridgeEndpoint() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
-        endpoint.setBridgeEndpoint(true);
+        endpoint.getConfiguration().setBridgeEndpoint(true);
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
@@ -252,7 +252,7 @@ public class KafkaProducerTest {
     @Test
     public void processSendMessageWithCircularDetected() throws Exception {
         endpoint.getConfiguration().setTopic("sometopic");
-        endpoint.setCircularTopicDetection(true); // enable by default
+        endpoint.getConfiguration().setCircularTopicDetection(true);
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
         Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);
@@ -269,7 +269,7 @@ public class KafkaProducerTest {
     @Test
     public void processSendMessageWithNoCircularDetected() throws Exception {
         endpoint.getConfiguration().setTopic("sometopic");
-        endpoint.setCircularTopicDetection(false); // enable by default
+        endpoint.getConfiguration().setCircularTopicDetection(false);
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
         Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 160c552..f5dfddd 100644
--- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -117,6 +117,22 @@ public class KafkaComponentConfiguration {
          */
         private String groupId;
         /**
+         * If the option is true, then KafkaProducer will ignore the
+         * KafkaConstants.TOPIC header setting of the inbound message.
+         */
+        private Boolean bridgeEndpoint = false;
+        /**
+         * If the option is true, then KafkaProducer will detect if the message
+         * is attempted to be sent back to the same topic it may come from, if
+         * the message was original from a kafka consumer. If the
+         * KafkaConstants.TOPIC header is the same as the original kafka
+         * consumer topic, then the header setting is ignored, and the topic of
+         * the producer endpoint is used. In other words this avoids sending the
+         * same message back to where it came from. This option is not in use if
+         * the option bridgeEndpoint is set to true.
+         */
+        private Boolean circularTopicDetection = true;
+        /**
          * The partitioner class for partitioning messages amongst sub-topics.
          * The default partitioner is based on the hash of the key.
          */
@@ -631,6 +647,22 @@ public class KafkaComponentConfiguration {
             this.groupId = groupId;
         }
 
+        public Boolean getBridgeEndpoint() {
+            return bridgeEndpoint;
+        }
+
+        public void setBridgeEndpoint(Boolean bridgeEndpoint) {
+            this.bridgeEndpoint = bridgeEndpoint;
+        }
+
+        public Boolean getCircularTopicDetection() {
+            return circularTopicDetection;
+        }
+
+        public void setCircularTopicDetection(Boolean circularTopicDetection) {
+            this.circularTopicDetection = circularTopicDetection;
+        }
+
         public String getPartitioner() {
             return partitioner;
         }