You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/24 11:34:40 UTC

[2/3] camel git commit: Make kafka bridgeEndpoint option on configuraion so they are all there and thus also available via spring boot

Make kafka bridgeEndpoint option on configuraion so they are all there and thus also available via spring boot


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8b5e93ec
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8b5e93ec
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8b5e93ec

Branch: refs/heads/master
Commit: 8b5e93ec694ef31a2bd43248735d3bc274f4e9d3
Parents: 1d164d5
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Apr 24 13:24:20 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Apr 24 13:24:20 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  2 +-
 .../component/kafka/KafkaConfiguration.java     | 32 ++++++++++++++++++++
 .../camel/component/kafka/KafkaEndpoint.java    | 29 ------------------
 .../camel/component/kafka/KafkaProducer.java    |  4 +--
 .../component/kafka/KafkaProducerTest.java      |  6 ++--
 .../springboot/KafkaComponentConfiguration.java | 32 ++++++++++++++++++++
 6 files changed, 70 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index dceca6f..5637673 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -100,7 +100,7 @@ with the following path and query parameters:
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | **bridgeEndpoint** (producer) | If the option is true then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message. | false | boolean
 | **bufferMemorySize** (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests. | 33554432 | Integer
-| **circularKeyDetection** (producer) | If the option is true then KafkaProducer will detect if the message is attempted to be sent back to the same topic it may come from if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the same as the original kafka consumer topic then the header setting is ignored and the topic of the producer endpoint is used. In other words this avoids sending the same message back to where it came from. This option is not in use if the option bridgeEndpoint is set to true. | true | boolean
+| **circularTopicDetection** (producer) | If the option is true then KafkaProducer will detect if the message is attempted to be sent back to the same topic it may come from if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the same as the original kafka consumer topic then the header setting is ignored and the topic of the producer endpoint is used. In other words this avoids sending the same message back to where it came from. This option is not in use if the option bridgeEndpoint is set to true. | true | boolean
 | **compressionCodec** (producer) | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none gzip and snappy. | none | String
 | **connectionMaxIdleMs** (producer) | Close idle connections after the number of milliseconds specified by this config. | 540000 | Integer
 | **key** (producer) | The record key (or null if no key is specified). If this option has been configured then it take precedence over header link KafkaConstantsKEY |  | String

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index a609c79..bb4acbc 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -118,6 +118,12 @@ public class KafkaConfiguration implements Cloneable {
     @UriParam(label = "consumer")
     private StateRepository<String, String> offsetRepository;
 
+    //Producer Camel specific configuration properties
+    @UriParam(label = "producer")
+    private boolean bridgeEndpoint;
+    @UriParam(label = "producer", defaultValue = "true")
+    private boolean circularTopicDetection = true;
+
     //Producer configuration properties
     @UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
     private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
@@ -493,6 +499,32 @@ public class KafkaConfiguration implements Cloneable {
         this.groupId = groupId;
     }
 
+    public boolean isBridgeEndpoint() {
+        return bridgeEndpoint;
+    }
+
+    /**
+     * If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message.
+     */
+    public void setBridgeEndpoint(boolean bridgeEndpoint) {
+        this.bridgeEndpoint = bridgeEndpoint;
+    }
+
+    public boolean isCircularTopicDetection() {
+        return circularTopicDetection;
+    }
+
+    /**
+     * If the option is true, then KafkaProducer will detect if the message is attempted to be sent back to the same topic
+     * it may come from, if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the
+     * same as the original kafka consumer topic, then the header setting is ignored, and the topic of the producer
+     * endpoint is used. In other words this avoids sending the same message back to where it came from.
+     * This option is not in use if the option bridgeEndpoint is set to true.
+     */
+    public void setCircularTopicDetection(boolean circularTopicDetection) {
+        this.circularTopicDetection = circularTopicDetection;
+    }
+
     public String getPartitioner() {
         return partitioner;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index b4c86ef..14932bf 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -50,10 +50,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
 
     @UriParam
     private KafkaConfiguration configuration = new KafkaConfiguration();
-    @UriParam(label = "producer")
-    private boolean bridgeEndpoint;
-    @UriParam(label = "producer", defaultValue = "true")
-    private boolean circularTopicDetection = true;
 
     public KafkaEndpoint() {
     }
@@ -187,29 +183,4 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return new KafkaProducer(endpoint);
     }
 
-    public boolean isBridgeEndpoint() {
-        return bridgeEndpoint;
-    }
-
-    /**
-     * If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message.
-     */
-    public void setBridgeEndpoint(boolean bridgeEndpoint) {
-        this.bridgeEndpoint = bridgeEndpoint;
-    }
-
-    public boolean isCircularTopicDetection() {
-        return circularTopicDetection;
-    }
-
-    /**
-     * If the option is true, then KafkaProducer will detect if the message is attempted to be sent back to the same topic
-     * it may come from, if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the
-     * same as the original kafka consumer topic, then the header setting is ignored, and the topic of the producer
-     * endpoint is used. In other words this avoids sending the same message back to where it came from.
-     * This option is not in use if the option bridgeEndpoint is set to true.
-     */
-    public void setCircularTopicDetection(boolean circularTopicDetection) {
-        this.circularTopicDetection = circularTopicDetection;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 01d29b5..ede3d3e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -122,13 +122,13 @@ public class KafkaProducer extends DefaultAsyncProducer {
     protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws CamelException {
         String topic = endpoint.getConfiguration().getTopic();
 
-        if (!endpoint.isBridgeEndpoint()) {
+        if (!endpoint.getConfiguration().isBridgeEndpoint()) {
             String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, String.class);
             boolean allowHeader = true;
 
             // when we do not bridge then detect if we try to send back to ourselves
             // which we most likely do not want to do
-            if (headerTopic != null && endpoint.isCircularTopicDetection()) {
+            if (headerTopic != null && endpoint.getConfiguration().isCircularTopicDetection()) {
                 Endpoint from = exchange.getFromEndpoint();
                 if (from instanceof KafkaEndpoint) {
                     String fromTopic = ((KafkaEndpoint) from).getConfiguration().getTopic();

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 9143017..41378b8 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -236,7 +236,7 @@ public class KafkaProducerTest {
     @Test
     public void processSendMessageWithBridgeEndpoint() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
-        endpoint.setBridgeEndpoint(true);
+        endpoint.getConfiguration().setBridgeEndpoint(true);
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
@@ -252,7 +252,7 @@ public class KafkaProducerTest {
     @Test
     public void processSendMessageWithCircularDetected() throws Exception {
         endpoint.getConfiguration().setTopic("sometopic");
-        endpoint.setCircularTopicDetection(true); // enable by default
+        endpoint.getConfiguration().setCircularTopicDetection(true);
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
         Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);
@@ -269,7 +269,7 @@ public class KafkaProducerTest {
     @Test
     public void processSendMessageWithNoCircularDetected() throws Exception {
         endpoint.getConfiguration().setTopic("sometopic");
-        endpoint.setCircularTopicDetection(false); // enable by default
+        endpoint.getConfiguration().setCircularTopicDetection(false);
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
         Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 160c552..f5dfddd 100644
--- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -117,6 +117,22 @@ public class KafkaComponentConfiguration {
          */
         private String groupId;
         /**
+         * If the option is true, then KafkaProducer will ignore the
+         * KafkaConstants.TOPIC header setting of the inbound message.
+         */
+        private Boolean bridgeEndpoint = false;
+        /**
+         * If the option is true, then KafkaProducer will detect if the message
+         * is attempted to be sent back to the same topic it may come from, if
+         * the message was original from a kafka consumer. If the
+         * KafkaConstants.TOPIC header is the same as the original kafka
+         * consumer topic, then the header setting is ignored, and the topic of
+         * the producer endpoint is used. In other words this avoids sending the
+         * same message back to where it came from. This option is not in use if
+         * the option bridgeEndpoint is set to true.
+         */
+        private Boolean circularTopicDetection = true;
+        /**
          * The partitioner class for partitioning messages amongst sub-topics.
          * The default partitioner is based on the hash of the key.
          */
@@ -631,6 +647,22 @@ public class KafkaComponentConfiguration {
             this.groupId = groupId;
         }
 
+        public Boolean getBridgeEndpoint() {
+            return bridgeEndpoint;
+        }
+
+        public void setBridgeEndpoint(Boolean bridgeEndpoint) {
+            this.bridgeEndpoint = bridgeEndpoint;
+        }
+
+        public Boolean getCircularTopicDetection() {
+            return circularTopicDetection;
+        }
+
+        public void setCircularTopicDetection(Boolean circularTopicDetection) {
+            this.circularTopicDetection = circularTopicDetection;
+        }
+
         public String getPartitioner() {
             return partitioner;
         }