You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/02/15 10:23:42 UTC

[1/4] camel git commit: CAMEL-10832: Kafka. Allow to configure brokers on component level. And made topic as part of context-path so using it is similar to JMS etc.

Repository: camel
Updated Branches:
  refs/heads/master fbd2438fe -> bcb4ed25b


CAMEL-10832: Kafka. Allow to configure brokers on component level. And made topic as part of context-path so using it is similar to JMS etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/eccfc853
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/eccfc853
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/eccfc853

Branch: refs/heads/master
Commit: eccfc853902ae994a350c4eb9fe9136f5548de19
Parents: fbd2438
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Feb 15 10:58:57 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Feb 15 10:58:57 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  7 +-
 .../camel/component/kafka/KafkaComponent.java   | 53 +++++++-------
 .../component/kafka/KafkaConfiguration.java     |  6 +-
 .../camel/component/kafka/KafkaEndpoint.java    |  2 +-
 .../component/kafka/BaseEmbeddedKafkaTest.java  | 13 +++-
 .../component/kafka/KafkaComponentTest.java     | 74 +++++---------------
 .../kafka/KafkaConsumerBatchSizeTest.java       |  4 +-
 .../component/kafka/KafkaConsumerFullTest.java  |  6 +-
 .../KafkaConsumerOffsetRepositoryEmptyTest.java |  4 +-
 ...KafkaConsumerOffsetRepositoryResumeTest.java |  4 +-
 .../component/kafka/KafkaEndpointTest.java      |  5 +-
 .../component/kafka/KafkaProducerFullTest.java  |  8 +--
 .../component/kafka/KafkaProducerTest.java      |  9 ++-
 .../kafka/MockConsumerInterceptor.java          |  8 +--
 .../kafka/MockProducerInterceptor.java          |  7 +-
 .../kafka/embedded/EmbeddedZookeeper.java       |  1 -
 .../component/kafka/embedded/SystemTime.java    |  1 +
 .../component/kafka/embedded/TestUtils.java     |  1 +
 .../springboot/KafkaComponentConfiguration.java | 18 +++++
 19 files changed, 107 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 01d2bff..135b13f 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -44,7 +44,7 @@ For the option above default port 9092 is used in the URI
 
 
 // component options: START
-The Kafka component supports 1 options which are listed below.
+The Kafka component supports 2 options which are listed below.
 
 
 
@@ -52,6 +52,7 @@ The Kafka component supports 1 options which are listed below.
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
 | Name | Group | Default | Java Type | Description
+| brokers | common |  | String | This is for bootstrapping and the producer will only use it for getting metadata (topics partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation.
 | workerPool | advanced |  | ExecutorService | To use a shared custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed.
 |=======================================================================
 {% endraw %}
@@ -66,11 +67,11 @@ The Kafka component supports 80 endpoint options which are listed below:
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
 | Name | Group | Default | Java Type | Description
-| brokers | common |  | String | *Required* This is for bootstrapping and the producer will only use it for getting metadata (topics partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation.
+| topic | common |  | String | *Required* Name of the topic to use.
+| brokers | common |  | String | This is for bootstrapping and the producer will only use it for getting metadata (topics partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation.
 | clientId | common |  | String | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.
 | groupId | common |  | String | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.
 | partitioner | common | org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key.
-| topic | common |  | String | *Required* Name of the topic to use.
 | autoCommitEnable | consumer | true | Boolean | If true periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin.
 | autoCommitIntervalMs | consumer | 5000 | Integer | The frequency in ms that the consumer offsets are committed to zookeeper.
 | autoOffsetReset | consumer | latest | String | What to do when there is no initial offset in ZooKeeper or if an offset is out of range: smallest : automatically reset the offset to the smallest offset largest : automatically reset the offset to the largest offset fail: throw exception to the consumer

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index f2d22f2..5c7ce9b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -18,22 +18,17 @@ package org.apache.camel.component.kafka;
 
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.UriEndpointComponent;
 import org.apache.camel.spi.Metadata;
+import org.apache.camel.util.ObjectHelper;
 
 public class KafkaComponent extends UriEndpointComponent {
     
-    // Topic name validation as per Kafka documentation [a-zA-Z0-9\\._\\-] as of 0.10
-    // hostname and port are extracted as per pattern. IP and hostname syntax is not validated using regex.
-    
-    static final Pattern SIMPLE_KAFKA_URI_PATTERN = Pattern.compile("([a-z0-9\\.]*)(:?)([0-9]*)/([a-zA-Z0-9\\._\\-]*)", Pattern.CASE_INSENSITIVE);
-    
-    static final String DEFAULT_PORT = "9092";
+    @Metadata(label = "common")
+    private String brokers;
 
     @Metadata(label = "advanced")
     private ExecutorService workerPool;
@@ -48,36 +43,36 @@ public class KafkaComponent extends UriEndpointComponent {
 
     @Override
     protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
-        
         KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
-        
-        Matcher matcher = SIMPLE_KAFKA_URI_PATTERN.matcher(remaining);
-               
-        if (matcher.matches()) {
-            String hostName = matcher.group(1);          
-            String port = matcher.group(3);
-            String topic = matcher.group(4);
-            
-            if (port != null && port.length() == 0) {
-                port = DEFAULT_PORT;
-            }            
-            endpoint.getConfiguration().setBrokers(hostName + ":" + port);
-            endpoint.getConfiguration().setTopic(topic);
-        } else {
-            String brokers = remaining.split("\\?")[0];
-            if (brokers != null) {
-                endpoint.getConfiguration().setBrokers(brokers);
-            }            
+
+        if (ObjectHelper.isEmpty(remaining)) {
+            throw new IllegalArgumentException("Topic must be configured on endpoint using syntax kafka:topic");
         }
+        endpoint.getConfiguration().setTopic(remaining);
 
-        // configure component options before endpoint properties which can override from params
-        endpoint.getConfiguration().setWorkerPool(workerPool);
+        endpoint.getConfiguration().setBrokers(getBrokers());
+        endpoint.getConfiguration().setWorkerPool(getWorkerPool());
 
         setProperties(endpoint.getConfiguration(), params);
         setProperties(endpoint, params);
         return endpoint;
     }
 
+    public String getBrokers() {
+        return brokers;
+    }
+
+    /**
+     * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas).
+     * The socket connections for sending the actual data will be established based on the broker information returned in the metadata.
+     * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
+     * <p/>
+     * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.
+     */
+    public void setBrokers(String brokers) {
+        this.brokers = brokers;
+    }
+
     public ExecutorService getWorkerPool() {
         return workerPool;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 05476aa..b42e146 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -45,10 +45,8 @@ import org.apache.kafka.common.config.SslConfigs;
 public class KafkaConfiguration {
 
     @UriPath @Metadata(required = "true")
-    private String brokers;
-
-    @UriParam @Metadata(required = "true")
     private String topic;
+
     @UriParam
     private String groupId;
     @UriParam(defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
@@ -59,6 +57,8 @@ public class KafkaConfiguration {
     private int consumersCount = 1;
 
     //Common configuration properties
+    @UriParam(label = "common")
+    private String brokers;
     @UriParam
     private String clientId;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index f93c521..3c125e5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
 /**
  * The kafka component allows messages to be sent to (or consumed from) Apache Kafka brokers.
  */
-@UriEndpoint(firstVersion = "2.13.0", scheme = "kafka", title = "Kafka", syntax = "kafka:brokers", consumerClass = KafkaConsumer.class, label = "messaging")
+@UriEndpoint(firstVersion = "2.13.0", scheme = "kafka", title = "Kafka", syntax = "kafka:topic", consumerClass = KafkaConsumer.class, label = "messaging")
 public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersSupport {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaEndpoint.class);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
index 4ed1dea..e36ed0a 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
@@ -30,9 +30,13 @@ import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class BaseEmbeddedKafkaTest extends CamelTestSupport {
 
+    private static final Logger  LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class);
+
     static EmbeddedZookeeper embeddedZookeeper;
     static EmbeddedKafkaCluster embeddedKafkaCluster;
 
@@ -56,9 +60,9 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
         } catch (IOException e) {
             e.printStackTrace();
         }
-        System.out.println("### Embedded Zookeeper connection: " + embeddedZookeeper.getConnection());
+        LOG.info("### Embedded Zookeeper connection: " + embeddedZookeeper.getConnection());
         embeddedKafkaCluster.startup();
-        System.out.println("### Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList());
+        LOG.info("### Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList());
     }
 
     @AfterClass
@@ -82,6 +86,11 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
         context.addComponent("properties", new PropertiesComponent("ref:prop"));
+
+        KafkaComponent kafka = new KafkaComponent();
+        kafka.setBrokers("localhost:" + getKafkaPort());
+        context.addComponent("kafka", kafka);
+
         return context;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index e94ba24..8d106e9 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -30,21 +30,28 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-
 public class KafkaComponentTest {
 
     private CamelContext context = Mockito.mock(CamelContext.class);
 
     @Test
     public void testPropertiesSet() throws Exception {
-        Map<String, Object> params = new HashMap<String, Object>();
-        params.put("topic", "mytopic");
-        params.put("partitioner", "com.class.Party");
+        String uri = "kafka:mytopic?brokers=broker1:12345,broker2:12566&partitioner=com.class.Party";
 
-        String uri = "kafka:broker1:12345,broker2:12566";
-        String remaining = "broker1:12345,broker2:12566";
+        KafkaEndpoint endpoint = (KafkaEndpoint) new KafkaComponent(context).createEndpoint(uri);
+        assertEquals("broker1:12345,broker2:12566", endpoint.getConfiguration().getBrokers());
+        assertEquals("mytopic", endpoint.getConfiguration().getTopic());
+        assertEquals("com.class.Party", endpoint.getConfiguration().getPartitioner());
+    }
 
-        KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
+    @Test
+    public void testBrokersOnComponent() throws Exception {
+        KafkaComponent kafka = new KafkaComponent(context);
+        kafka.setBrokers("broker1:12345,broker2:12566");
+
+        String uri = "kafka:mytopic?partitioner=com.class.Party";
+
+        KafkaEndpoint endpoint = (KafkaEndpoint) kafka.createEndpoint(uri);
         assertEquals("broker1:12345,broker2:12566", endpoint.getConfiguration().getBrokers());
         assertEquals("mytopic", endpoint.getConfiguration().getTopic());
         assertEquals("com.class.Party", endpoint.getConfiguration().getPartitioner());
@@ -55,11 +62,12 @@ public class KafkaComponentTest {
         Map<String, Object> params = new HashMap<String, Object>();
         setProducerProperty(params);
 
-        String uri = "kafka:dev1:12345,dev2:12566";
-        String remaining = "dev1:12345,dev2:12566";
+        String uri = "kafka:mytopic?brokers=dev1:12345,dev2:12566";
+        String remaining = "mytopic";
 
         KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
 
+        assertEquals("mytopic", endpoint.getConfiguration().getTopic());
         assertEquals("1", endpoint.getConfiguration().getRequestRequiredAcks());
         assertEquals(new Integer(1), endpoint.getConfiguration().getBufferMemorySize());
         assertEquals(new Integer(10), endpoint.getConfiguration().getProducerBatchSize());
@@ -109,8 +117,8 @@ public class KafkaComponentTest {
     public void testAllProducerKeys() throws Exception {
         Map<String, Object> params = new HashMap<String, Object>();
 
-        String uri = "kafka:dev1:12345,dev2:12566";
-        String remaining = "dev1:12345,dev2:12566";
+        String uri = "kafka:mytopic?brokers=dev1:12345,dev2:12566";
+        String remaining = "mytopic";
 
         KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
         assertEquals(endpoint.getConfiguration().createProducerProperties().keySet(), getProducerKeys().keySet());
@@ -203,48 +211,4 @@ public class KafkaComponentTest {
         params.put("sslTrustmanagerAlgorithm", "PKIX");
     }
     
-    // the URL format should include the topic name like the ActiiveMQ & AMQP endpoints
-    // kafka:serverName:port/topicName
-    // kafka:serverName/topicName
-    
-    @Test
-    public void testSimpleKakfaUriEndpoint() throws Exception {
-        
-        Map<String, Object> params = new HashMap<String, Object>();
- 
-        String uri = "kafka:broker1:9999/topic2One.33";
-        String remaining = "broker1:9999/topic2One.33";
-        
-
-        KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
-        
-        assertEquals("topic2One.33", endpoint.getConfiguration().getTopic());
-        assertEquals("broker1:9999", endpoint.getConfiguration().getBrokers());
-        
-        // port not provided in the URI
-        
-        uri = "kafka:broker1/click-Topic";
-        remaining = "broker1/click-Topic";
-        
-        endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
-        
-        assertEquals("click-Topic", endpoint.getConfiguration().getTopic());
-        assertEquals("broker1:9092", endpoint.getConfiguration().getBrokers());
-        
-        // IP Address provided instead of hostname
-        
-        uri = "kafka:10.10.10.3/click-Topic";
-        remaining = "10.10.10.3/click-Topic";
-        
-        endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
-        
-        assertEquals("click-Topic", endpoint.getConfiguration().getTopic());
-        assertEquals("10.10.10.3:9092", endpoint.getConfiguration().getBrokers());
-        
-       
-        
-        
-        
-    }   
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
index 3d004a1..2edbbb6 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
@@ -32,8 +32,8 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
 
     public static final String TOPIC = "test";
 
-    @EndpointInject(uri = "kafka:localhost:{{kafkaPort}}?topic=" + TOPIC
-            + "&groupId=group1"
+    @EndpointInject(uri = "kafka:" + TOPIC
+            + "?groupId=group1"
             + "&autoOffsetReset=earliest"
             + "&autoCommitEnable=false"
             + "&consumerStreams=10"

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 6b08f3d..5b449bd 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -18,14 +18,12 @@ package org.apache.camel.component.kafka;
 
 import java.io.IOException;
 import java.util.Properties;
-import java.util.function.Consumer;
 import java.util.stream.StreamSupport;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.After;
@@ -37,8 +35,8 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
 
     public static final String TOPIC = "test";
 
-    @EndpointInject(uri = "kafka:localhost:{{kafkaPort}}?topic=" + TOPIC
-            + "&groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+    @EndpointInject(uri = "kafka:" + TOPIC
+            + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
             + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
             + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
index 620504f..e8cb50a 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
@@ -95,8 +95,8 @@ public class KafkaConsumerOffsetRepositoryEmptyTest extends BaseEmbeddedKafkaTes
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +  //
-                             "&groupId=A" +                            //
+                from("kafka:" + TOPIC +
+                             "?groupId=A" +
                              "&autoOffsetReset=earliest" +             // Ask to start from the beginning if we have unknown offset
                              "&consumersCount=2" +                     // We have 2 partitions, we want 1 consumer per partition
                              "&offsetRepository=#offset")              // Keep the offset in our repository

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
index 7e71c3e..36555a2 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
@@ -97,8 +97,8 @@ public class KafkaConsumerOffsetRepositoryResumeTest extends BaseEmbeddedKafkaTe
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +  //
-                             "&groupId=A" +                            //
+                from("kafka:" + TOPIC +
+                             "?groupId=A" +
                              "&autoOffsetReset=earliest" +             // Ask to start from the beginning if we have unknown offset
                              "&consumersCount=2" +                     // We have 2 partitions, we want 1 consumer per partition
                              "&offsetRepository=#offset")              // Keep the offset in our repository

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
index c7ea1e7..3e5300d 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
@@ -29,8 +29,7 @@ public class KafkaEndpointTest {
 
     @Test
     public void testCreatingKafkaExchangeSetsHeaders() throws URISyntaxException {
-        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", new KafkaComponent());
-        endpoint.getConfiguration().setBrokers("localhost");
+        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:mytopic?brokers=localhost", new KafkaComponent());
 
         ConsumerRecord<String, String> record = new ConsumerRecord<String, String>("topic", 4, 56, "somekey", "");
         Exchange exchange = endpoint.createKafkaExchange(record);
@@ -42,7 +41,7 @@ public class KafkaEndpointTest {
 
     @Test
     public void assertSingleton() throws URISyntaxException {
-        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", new KafkaComponent());
+        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:mytopic?brokers=localhost", new KafkaComponent());
         endpoint.getConfiguration().setBrokers("localhost");
         assertTrue(endpoint.isSingleton());
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 9298458..fb396c2 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -52,16 +52,14 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
     private static KafkaConsumer<String, String> stringsConsumerConn;
     private static KafkaConsumer<byte[], byte[]> bytesConsumerConn;
 
-    @EndpointInject(uri = "kafka:localhost:{{kafkaPort}}?topic=" + TOPIC_STRINGS
-            + "&requestRequiredAcks=-1")
+    @EndpointInject(uri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1")
     private Endpoint toStrings;
-    @EndpointInject(uri = "kafka:localhost:{{kafkaPort}}?topic=" + TOPIC_INTERCEPTED
-            + "&requestRequiredAcks=-1"
+    @EndpointInject(uri = "kafka:" + TOPIC_INTERCEPTED + "?requestRequiredAcks=-1"
             + "&interceptorClasses=org.apache.camel.component.kafka.MockProducerInterceptor")
     private Endpoint toStringsWithInterceptor;
     @EndpointInject(uri = "mock:kafkaAck")
     private MockEndpoint mockEndpoint;
-    @EndpointInject(uri = "kafka:localhost:{{kafkaPort}}?topic=" + TOPIC_BYTES + "&requestRequiredAcks=-1"
+    @EndpointInject(uri = "kafka:" + TOPIC_BYTES + "?requestRequiredAcks=-1"
             + "&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer&"
             + "keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")
     private Endpoint toBytes;

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index f87b5c2..17a3aec 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.Executors;
@@ -25,6 +26,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -51,9 +53,10 @@ public class KafkaProducerTest {
 
     @SuppressWarnings({"unchecked"})
     public KafkaProducerTest() throws Exception {
-        endpoint = new KafkaEndpoint(
-                "kafka:broker1:1234,broker2:4567?topic=sometopic", null);
-        endpoint.getConfiguration().setBrokers("broker1:1234,broker2:4567");
+        KafkaComponent kafka = new KafkaComponent(new DefaultCamelContext());
+        kafka.setBrokers("broker1:1234,broker2:4567");
+
+        endpoint = kafka.createEndpoint("kafka:sometopic", "sometopic", new HashMap());
         producer = new KafkaProducer(endpoint);
 
         RecordMetadata rm = new RecordMetadata(null, 1, 1);

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockConsumerInterceptor.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockConsumerInterceptor.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockConsumerInterceptor.java
index b5b42de..503556a 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockConsumerInterceptor.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockConsumerInterceptor.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka;
 
 import java.util.ArrayList;
@@ -29,7 +28,6 @@ public class MockConsumerInterceptor implements ConsumerInterceptor<String, Stri
 
     public static ArrayList<ConsumerRecords<String, String>> recordsCaptured = new ArrayList<>();
 
-
     @Override
     public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
         recordsCaptured.add(consumerRecords);
@@ -38,16 +36,16 @@ public class MockConsumerInterceptor implements ConsumerInterceptor<String, Stri
 
     @Override
     public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
-
+        // noop
     }
 
     @Override
     public void close() {
-
+        // noop
     }
 
     @Override
     public void configure(Map<String, ?> map) {
-
+        // noop
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockProducerInterceptor.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockProducerInterceptor.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockProducerInterceptor.java
index 1052040..a06170c 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockProducerInterceptor.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockProducerInterceptor.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka;
 
 import java.util.ArrayList;
@@ -36,16 +35,16 @@ public class MockProducerInterceptor implements ProducerInterceptor<String, Stri
 
     @Override
     public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
-
+        // noop
     }
 
     @Override
     public void close() {
-
+        // noop
     }
 
     @Override
     public void configure(Map<String, ?> map) {
-
+        // noop
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
index 41762c9..4ce3bd7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
@@ -67,7 +67,6 @@ public class EmbeddedZookeeper {
         }
     }
 
-
     public void shutdown() {
         factory.shutdown();
         try {

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java
index 2a9793a..43edda9 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.embedded;
 import kafka.utils.Time;
 
 class SystemTime implements Time {
+
     public long milliseconds() {
         return System.currentTimeMillis();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java
index 7a71b14..25b3c3b 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java
@@ -23,6 +23,7 @@ import java.net.ServerSocket;
 import java.util.Random;
 
 final class TestUtils {
+
     private static final Random RANDOM = new Random();
 
     private TestUtils() {

http://git-wip-us.apache.org/repos/asf/camel/blob/eccfc853/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 5460500..9cb1576 100644
--- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -29,6 +29,16 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 public class KafkaComponentConfiguration {
 
     /**
+     * This is for bootstrapping and the producer will only use it for getting
+     * metadata (topics partitions and replicas). The socket connections for
+     * sending the actual data will be established based on the broker
+     * information returned in the metadata. The format is
+     * host1:port1host2:port2 and the list can be a subset of brokers or a VIP
+     * pointing to a subset of brokers. This option is known as
+     * metadata.broker.list in the Kafka documentation.
+     */
+    private String brokers;
+    /**
      * To use a shared custom worker pool for continue routing Exchange after
      * kafka server has acknowledge the message that was sent to it from
      * KafkaProducer using asynchronous non-blocking processing. If using this
@@ -37,6 +47,14 @@ public class KafkaComponentConfiguration {
      */
     private ExecutorService workerPool;
 
+    public String getBrokers() {
+        return brokers;
+    }
+
+    public void setBrokers(String brokers) {
+        this.brokers = brokers;
+    }
+
     public ExecutorService getWorkerPool() {
         return workerPool;
     }


[3/4] camel git commit: CAMEL-10832: Kafka. Allow to configure brokers on component level. And made topic as part of context-path so using it is similar to JMS etc.

Posted by da...@apache.org.
CAMEL-10832: Kafka. Allow to configure brokers on component level. And made topic as part of context-path so using it is similar to JMS etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d3b38a05
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d3b38a05
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d3b38a05

Branch: refs/heads/master
Commit: d3b38a05b7b6552be1c083aa2aed529159471a90
Parents: d6e45cb
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Feb 15 11:14:38 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Feb 15 11:16:40 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  4 ++--
 .../camel/component/kafka/KafkaComponent.java   |  7 ++++---
 .../component/kafka/KafkaConfiguration.java     |  2 +-
 .../camel/component/kafka/KafkaConsumer.java    | 20 +++++++++++++++++---
 .../camel/component/kafka/KafkaEndpoint.java    |  6 ++++++
 .../camel/component/kafka/KafkaProducer.java    | 10 +++++++---
 .../component/kafka/BaseEmbeddedKafkaTest.java  |  2 +-
 .../component/kafka/KafkaComponentTest.java     |  7 ++++---
 .../KafkaConsumerOffsetRepositoryEmptyTest.java | 10 +++++-----
 ...KafkaConsumerOffsetRepositoryResumeTest.java | 10 +++++-----
 .../component/kafka/KafkaConsumerTest.java      |  4 ++++
 .../springboot/KafkaComponentConfiguration.java |  2 +-
 12 files changed, 57 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index f9d2749..211d592 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -42,7 +42,7 @@ The Kafka component supports 2 options which are listed below.
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
 | Name | Group | Default | Java Type | Description
-| brokers | common |  | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation.
+| brokers | common |  | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as bootstrap.servers in the Kafka documentation.
 | workerPool | advanced |  | ExecutorService | To use a shared custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed.
 |=======================================================================
 {% endraw %}
@@ -58,7 +58,7 @@ The Kafka component supports 80 endpoint options which are listed below:
 |=======================================================================
 | Name | Group | Default | Java Type | Description
 | topic | common |  | String | *Required* Name of the topic to use.
-| brokers | common |  | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation.
+| brokers | common |  | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as bootstrap.servers in the Kafka documentation.
 | clientId | common |  | String | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.
 | groupId | common |  | String | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.
 | partitioner | common | org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key.

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 575dcfc..da0f59d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -49,10 +49,11 @@ public class KafkaComponent extends UriEndpointComponent {
             throw new IllegalArgumentException("Topic must be configured on endpoint using syntax kafka:topic");
         }
         endpoint.getConfiguration().setTopic(remaining);
-
-        endpoint.getConfiguration().setBrokers(getBrokers());
         endpoint.getConfiguration().setWorkerPool(getWorkerPool());
 
+        // brokers can be configured on either component or endpoint level
+        // and the consumer and produce is aware of this and act accordingly
+
         setProperties(endpoint.getConfiguration(), params);
         setProperties(endpoint, params);
         return endpoint;
@@ -66,7 +67,7 @@ public class KafkaComponent extends UriEndpointComponent {
      * URL of the Kafka brokers to use.
      * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
      * <p/>
-     * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.
+     * This option is known as <tt>bootstrap.servers</tt> in the Kafka documentation.
      */
     public void setBrokers(String brokers) {
         this.brokers = brokers;

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index bec73da..57eea8f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -616,7 +616,7 @@ public class KafkaConfiguration {
      * URL of the Kafka brokers to use.
      * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
      * <p/>
-     * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.
+     * This option is known as <tt>bootstrap.servers</tt> in the Kafka documentation.
      */
     public void setBrokers(String brokers) {
         this.brokers = brokers;

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 549c1c2..0327a76 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -29,6 +29,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.spi.StateRepository;
 import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -55,9 +56,15 @@ public class KafkaConsumer extends DefaultConsumer {
         this.processor = processor;
         this.pollTimeoutMs = endpoint.getConfiguration().getPollTimeoutMs();
 
-        if (endpoint.getConfiguration().getBrokers() == null) {
-            throw new IllegalArgumentException("BootStrap servers must be specified");
+        // brokers can be configured on endpoint or component level
+        String brokers = endpoint.getConfiguration().getBrokers();
+        if (brokers == null) {
+            brokers = endpoint.getComponent().getBrokers();
         }
+        if (ObjectHelper.isEmpty(brokers)) {
+            throw new IllegalArgumentException("Brokers must be configured");
+        }
+
         if (endpoint.getConfiguration().getGroupId() == null) {
             throw new IllegalArgumentException("groupId must not be null");
         }
@@ -66,7 +73,14 @@ public class KafkaConsumer extends DefaultConsumer {
     Properties getProps() {
         Properties props = endpoint.getConfiguration().createConsumerProperties();
         endpoint.updateClassProperties(props);
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getConfiguration().getBrokers());
+
+        // brokers can be configured on endpoint or component level
+        String brokers = endpoint.getConfiguration().getBrokers();
+        if (brokers == null) {
+            brokers = endpoint.getComponent().getBrokers();
+        }
+
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
         props.put(ConsumerConfig.GROUP_ID_CONFIG, endpoint.getConfiguration().getGroupId());
         return props;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 3c125e5..46bf844 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -20,6 +20,7 @@ import java.lang.reflect.Field;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -60,6 +61,11 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         super(endpointUri, component);
     }
 
+    @Override
+    public KafkaComponent getComponent() {
+        return (KafkaComponent) super.getComponent();
+    }
+
     public KafkaConfiguration getConfiguration() {
         return configuration;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 2ffe96b..abfc588 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -51,13 +51,17 @@ public class KafkaProducer extends DefaultAsyncProducer {
     Properties getProps() {
         Properties props = endpoint.getConfiguration().createProducerProperties();
         endpoint.updateClassProperties(props);
-        if (endpoint.getConfiguration().getBrokers() != null) {
-            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getConfiguration().getBrokers());
+
+        // brokers can be configured on endpoint or component level
+        String brokers = endpoint.getConfiguration().getBrokers();
+        if (brokers == null) {
+            brokers = endpoint.getComponent().getBrokers();
         }
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+
         return props;
     }
 
-
     public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
         return kafkaProducer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
index e36ed0a..6b72d33 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 
 public class BaseEmbeddedKafkaTest extends CamelTestSupport {
 
-    private static final Logger  LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class);
+    static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class);
 
     static EmbeddedZookeeper embeddedZookeeper;
     static EmbeddedKafkaCluster embeddedKafkaCluster;

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 8d106e9..92982cc 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.kafka;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -30,6 +28,8 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.junit.Assert.assertEquals;
+
 public class KafkaComponentTest {
 
     private CamelContext context = Mockito.mock(CamelContext.class);
@@ -52,7 +52,8 @@ public class KafkaComponentTest {
         String uri = "kafka:mytopic?partitioner=com.class.Party";
 
         KafkaEndpoint endpoint = (KafkaEndpoint) kafka.createEndpoint(uri);
-        assertEquals("broker1:12345,broker2:12566", endpoint.getConfiguration().getBrokers());
+        assertEquals(null, endpoint.getConfiguration().getBrokers());
+        assertEquals("broker1:12345,broker2:12566", endpoint.getComponent().getBrokers());
         assertEquals("mytopic", endpoint.getConfiguration().getTopic());
         assertEquals("com.class.Party", endpoint.getConfiguration().getPartitioner());
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
index e8cb50a..2b59970 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
@@ -95,11 +95,11 @@ public class KafkaConsumerOffsetRepositoryEmptyTest extends BaseEmbeddedKafkaTes
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("kafka:" + TOPIC +
-                             "?groupId=A" +
-                             "&autoOffsetReset=earliest" +             // Ask to start from the beginning if we have unknown offset
-                             "&consumersCount=2" +                     // We have 2 partitions, we want 1 consumer per partition
-                             "&offsetRepository=#offset")              // Keep the offset in our repository
+                from("kafka:" + TOPIC
+                             + "?groupId=A"
+                             + "&autoOffsetReset=earliest"             // Ask to start from the beginning if we have unknown offset
+                             + "&consumersCount=2"                     // We have 2 partitions, we want 1 consumer per partition
+                             + "&offsetRepository=#offset")            // Keep the offset in our repository
                         .to("mock:result");
             }
         };

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
index 36555a2..fe8052a 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
@@ -97,11 +97,11 @@ public class KafkaConsumerOffsetRepositoryResumeTest extends BaseEmbeddedKafkaTe
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("kafka:" + TOPIC +
-                             "?groupId=A" +
-                             "&autoOffsetReset=earliest" +             // Ask to start from the beginning if we have unknown offset
-                             "&consumersCount=2" +                     // We have 2 partitions, we want 1 consumer per partition
-                             "&offsetRepository=#offset")              // Keep the offset in our repository
+                from("kafka:" + TOPIC
+                             + "?groupId=A"
+                             + "&autoOffsetReset=earliest"             // Ask to start from the beginning if we have unknown offset
+                             + "&consumersCount=2"                     // We have 2 partitions, we want 1 consumer per partition
+                             + "&offsetRepository=#offset")            // Keep the offset in our repository
                         .to("mock:result");
             }
         };

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index 97311d7..3e249b4 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -25,11 +25,13 @@ import static org.mockito.Mockito.when;
 public class KafkaConsumerTest {
 
     private KafkaConfiguration configuration = mock(KafkaConfiguration.class);
+    private KafkaComponent component = mock(KafkaComponent.class);
     private KafkaEndpoint endpoint = mock(KafkaEndpoint.class);
     private Processor processor = mock(Processor.class);
 
     @Test(expected = IllegalArgumentException.class)
     public void consumerRequiresBootstrapServers() throws Exception {
+        when(endpoint.getComponent()).thenReturn(component);
         when(endpoint.getConfiguration()).thenReturn(configuration);
         when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
         new KafkaConsumer(endpoint, processor);
@@ -37,6 +39,7 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void consumerRequiresGroupId() throws Exception {
+        when(endpoint.getComponent()).thenReturn(component);
         when(endpoint.getConfiguration()).thenReturn(configuration);
         when(endpoint.getConfiguration().getBrokers()).thenReturn("localhost:1234");
         new KafkaConsumer(endpoint, processor);
@@ -44,6 +47,7 @@ public class KafkaConsumerTest {
 
     @Test
     public void consumerOnlyRequiresBootstrapServersAndGroupId() throws Exception {
+        when(endpoint.getComponent()).thenReturn(component);
         when(endpoint.getConfiguration()).thenReturn(configuration);
         when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
         when(endpoint.getConfiguration().getBrokers()).thenReturn("localhost:2181");

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index b0eaa20..3396544 100644
--- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -31,7 +31,7 @@ public class KafkaComponentConfiguration {
     /**
      * URL of the Kafka brokers to use. The format is host1:port1host2:port2 and
      * the list can be a subset of brokers or a VIP pointing to a subset of
-     * brokers. This option is known as metadata.broker.list in the Kafka
+     * brokers. This option is known as bootstrap.servers in the Kafka
      * documentation.
      */
     private String brokers;


[2/4] camel git commit: CAMEL-10832: Kafka. Allow to configure brokers on component level. And made topic as part of context-path so using it is similar to JMS etc.

Posted by da...@apache.org.
CAMEL-10832: Kafka. Allow to configure brokers on component level. And made topic as part of context-path so using it is similar to JMS etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d6e45cbe
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d6e45cbe
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d6e45cbe

Branch: refs/heads/master
Commit: d6e45cbe845b1c8854b0d8b57a4805f082c42277
Parents: eccfc85
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Feb 15 11:03:13 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Feb 15 11:03:13 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          | 26 ++++++--------------
 .../camel/component/kafka/KafkaComponent.java   |  3 +--
 .../component/kafka/KafkaConfiguration.java     |  3 +--
 .../springboot/KafkaComponentConfiguration.java | 11 +++------
 4 files changed, 14 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d6e45cbe/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 135b13f..f9d2749 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -25,17 +25,7 @@ From Camel 2.17 onwards Scala is no longer used, as we use the kafka java client
 
 [source,java]
 ---------------------------
-kafka:server:port[?options]
-
-OR
-
-kafka:server:port/topicName[?options]
-
-OR
-
-kafka:server/topicName[?options] 
-
-For the option above default port 9092 is used in the URI
+kafka:topic[?options]
 
 ---------------------------
 
@@ -52,7 +42,7 @@ The Kafka component supports 2 options which are listed below.
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
 | Name | Group | Default | Java Type | Description
-| brokers | common |  | String | This is for bootstrapping and the producer will only use it for getting metadata (topics partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation.
+| brokers | common |  | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation.
 | workerPool | advanced |  | ExecutorService | To use a shared custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed.
 |=======================================================================
 {% endraw %}
@@ -68,7 +58,7 @@ The Kafka component supports 80 endpoint options which are listed below:
 |=======================================================================
 | Name | Group | Default | Java Type | Description
 | topic | common |  | String | *Required* Name of the topic to use.
-| brokers | common |  | String | This is for bootstrapping and the producer will only use it for getting metadata (topics partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation.
+| brokers | common |  | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation.
 | clientId | common |  | String | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.
 | groupId | common |  | String | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.
 | partitioner | common | org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key.
@@ -196,7 +186,7 @@ After the message is sent to Kafka, the following headers are available
 Here is the minimal route you need in order to read messages from Kafka.
 [source,java]
 -------------------------------------------------------------
-from("kafka:localhost:9092?topic=test&groupId=testing")
+from("kafka:test?brokers=localhost:9092&groupId=testing")
     .log("Message received from Kafka : ${body}")
     .log("    on the topic ${headers[kafka.TOPIC]}")
     .log("    on the partition ${headers[kafka.PARTITION]}")
@@ -222,7 +212,7 @@ DefaultCamelContext camelContext = new DefaultCamelContext(registry);
 camelContext.addRoutes(new RouteBuilder() {
     @Override
     public void configure() throws Exception {
-        from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +  //
+        from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
                      "&groupId=A" +                            //
                      "&autoOffsetReset=earliest" +             // Ask to start from the beginning if we have unknown offset
                      "&offsetRepository=#offsetRepo")          // Keep the offsets in the previously configured repository
@@ -240,7 +230,7 @@ Here is the minimal route you need in order to write messages to Kafka.
 from("direct:start")
     .setBody(constant("Message from Camel"))          // Message to send
     .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message
-    .to("kafka:localhost:9092?topic=test");
+    .to("kafka:test?brokers=localhost:9092");
 ----------------------------------------------------------------------------
 
 
@@ -251,7 +241,7 @@ You have 2 different ways to configure the SSL communication on the Kafka` compo
 The first way is through the many SSL endpoint parameters
 [source,java]
 -------------------------------------------------------------
-from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +
+from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
              "&groupId=A" +
              "&sslKeystoreLocation=/path/to/keystore.jks" +
              "&sslKeystorePassword=changeit" +
@@ -281,7 +271,7 @@ DefaultCamelContext camelContext = new DefaultCamelContext(registry);
 camelContext.addRoutes(new RouteBuilder() {
     @Override
     public void configure() throws Exception {
-        from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +  //
+        from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
                      "&groupId=A" +                            //
                      "&sslContextParameters=#ssl")             // Reference the SSL configuration
                 .to("mock:result");

http://git-wip-us.apache.org/repos/asf/camel/blob/d6e45cbe/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 5c7ce9b..575dcfc 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -63,8 +63,7 @@ public class KafkaComponent extends UriEndpointComponent {
     }
 
     /**
-     * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas).
-     * The socket connections for sending the actual data will be established based on the broker information returned in the metadata.
+     * URL of the Kafka brokers to use.
      * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
      * <p/>
      * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.

http://git-wip-us.apache.org/repos/asf/camel/blob/d6e45cbe/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index b42e146..bec73da 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -613,8 +613,7 @@ public class KafkaConfiguration {
     }
 
     /**
-     * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas).
-     * The socket connections for sending the actual data will be established based on the broker information returned in the metadata.
+     * URL of the Kafka brokers to use.
      * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
      * <p/>
      * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.

http://git-wip-us.apache.org/repos/asf/camel/blob/d6e45cbe/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 9cb1576..b0eaa20 100644
--- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -29,13 +29,10 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 public class KafkaComponentConfiguration {
 
     /**
-     * This is for bootstrapping and the producer will only use it for getting
-     * metadata (topics partitions and replicas). The socket connections for
-     * sending the actual data will be established based on the broker
-     * information returned in the metadata. The format is
-     * host1:port1host2:port2 and the list can be a subset of brokers or a VIP
-     * pointing to a subset of brokers. This option is known as
-     * metadata.broker.list in the Kafka documentation.
+     * URL of the Kafka brokers to use. The format is host1:port1host2:port2 and
+     * the list can be a subset of brokers or a VIP pointing to a subset of
+     * brokers. This option is known as metadata.broker.list in the Kafka
+     * documentation.
      */
     private String brokers;
     /**


[4/4] camel git commit: CAMEL-10832: Kafka. Allow to configure brokers on component level. And made topic as part of context-path so using it is similar to JMS etc.

Posted by da...@apache.org.
CAMEL-10832: Kafka. Allow to configure brokers on component level. And made topic as part of context-path so using it is similar to JMS etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bcb4ed25
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bcb4ed25
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bcb4ed25

Branch: refs/heads/master
Commit: bcb4ed25b5dc943dac09cc0b79436cadf4fae65e
Parents: d3b38a0
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Feb 15 11:23:28 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Feb 15 11:23:28 2017 +0100

----------------------------------------------------------------------
 examples/camel-example-kafka/README.adoc        | 75 +++++++++++++++++++
 examples/camel-example-kafka/README.md          | 77 --------------------
 .../example/kafka/MessageConsumerClient.java    | 15 ++--
 .../example/kafka/MessagePublisherClient.java   | 27 +++----
 .../camel/example/kafka/StringPartitioner.java  | 34 +--------
 .../src/main/resources/application.properties   | 12 +--
 6 files changed, 96 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/README.adoc
----------------------------------------------------------------------
diff --git a/examples/camel-example-kafka/README.adoc b/examples/camel-example-kafka/README.adoc
new file mode 100644
index 0000000..83e79d8
--- /dev/null
+++ b/examples/camel-example-kafka/README.adoc
@@ -0,0 +1,75 @@
+# Camel Kafka example
+
+### Introduction
+
+An example which shows how to integrate Camel with Kakfa.
+
+This project consists of the following examples:
+
+  1. Send messages continuously by typing on the command line.
+  2. Example of partitioner for a given producer.
+  3. Topic is sent in the header as well as in the URL.
+
+
+### Preparing Kafka
+
+This example requires that Kafka Server is up and running.
+
+You will need to create following topics before you run the examples.
+
+On windows run
+
+    kafka-topics.bat --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 2 --topic TestLog
+    
+    kafka-topics.bat --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 1 --topic AccessLog
+
+On linux run
+    
+    kafka-topics.sh --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 2 --topic TestLog
+    
+    kafka-topics.sh --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 1 --topic AccessLog
+
+
+### Build
+
+You will need to compile this example first:
+
+    mvn compile
+
+### Run
+
+Run the consumer first in separate shell 
+
+    
+    mvn compile exec:java -Pkafka-consumer
+
+
+Run the message producer in the seperate shell
+
+    
+    mvn compile exec:java -Pkafka-producer
+
+Initially, some messages are sent programmatically. 
+On the command prompt, type the messages. Each line is sent as one message to kafka
+Press `Ctrl-C` to exit.
+
+
+### Configuration
+
+You can configure the details in the file:
+  `src/main/resources/application.properties`
+
+You can enable verbose logging by adjusting the `src/main/resources/log4j2.properties`
+  file as documented in the file.
+
+
+### Forum, Help, etc
+
+If you hit an problems please let us know on the Camel Forums
+	<http://camel.apache.org/discussion-forums.html>
+
+Please help us make Apache Camel better - we appreciate any feedback you may
+have.  Enjoy!
+
+
+The Camel riders!

http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/README.md
----------------------------------------------------------------------
diff --git a/examples/camel-example-kafka/README.md b/examples/camel-example-kafka/README.md
deleted file mode 100644
index 06c7374..0000000
--- a/examples/camel-example-kafka/README.md
+++ /dev/null
@@ -1,77 +0,0 @@
-# Camel Kafka example
-
-### Introduction
-
-An example which shows how to integrate Camel with Kakfa.
-
-This project consists of the following examples:
-
-  1. Send messages continuously by typing on the command line.
-  2. Example of partitioner for a given producer.
-  3. Topic is sent in the header as well as in the URL.
-
-
-### Preparing Kafka
-
-This example requires that Kafka Server is up and running.
-
-You will need to create following topics before you run the examples.
-
-On windows run
-
-    kafka-topics.bat --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 2 --topic TestLog
-    
-    kafka-topics.bat --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 1 --topic AccessLog
-
-On linux run
-    
-    kafka-topics.sh --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 2 --topic TestLog
-    
-    kafka-topics.sh --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 1 --topic AccessLog
-
-
-### Build
-
-You will need to compile this example first:
-
-    mvn compile
-
-### Run
-
-1. Run the consumer first in separate shell 
-
-    mvn compile exec:java -Pkafka-consumer
-
-
-2. Run the message producer in the seperate shell
-
-    mvn compile exec:java -Pkafka-producer
-
-   Initially, some messages are sent programmatically. 
-   
-   On the command prompt, type the messages. Each line is sent as one message to kafka
-   
-   Type Ctrl-C to exit.
-
-
-
-### Configuration
-
-You can configure the details in the file:
-  `src/main/resources/application.properties`
-
-You can enable verbose logging by adjusting the `src/main/resources/log4j.properties`
-  file as documented in the file.
-
-
-### Forum, Help, etc
-
-If you hit an problems please let us know on the Camel Forums
-	<http://camel.apache.org/discussion-forums.html>
-
-Please help us make Apache Camel better - we appreciate any feedback you may
-have.  Enjoy!
-
-
-
-The Camel riders!

http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java
index 47c2abb..7513ba9 100644
--- a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java
+++ b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java
@@ -29,7 +29,6 @@ public final class MessageConsumerClient {
 
     private MessageConsumerClient() {
     }
-        
 
     public static void main(String[] args) throws Exception {
 
@@ -46,13 +45,13 @@ public final class MessageConsumerClient {
 
                 log.info("About to start route: Kafka Server -> Log ");
 
-                from("kafka:{{kafka.host}}:{{kafka.port}}?" 
-                        + "topic={{consumer.topic}}&" 
-                        + "maxPollRecords={{consumer.maxPollRecords}}&" 
-                        + "consumersCount={{consumer.consumersCount}}&" 
-                        + "seekToBeginning={{consumer.seekToBeginning}}&" 
-                        + "groupId={{consumer.group}}")
-                        .routeId("FromKafka").log("${body}");
+                from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
+                        + "&maxPollRecords={{consumer.maxPollRecords}}"
+                        + "&consumersCount={{consumer.consumersCount}}"
+                        + "&seekToBeginning={{consumer.seekToBeginning}}"
+                        + "&groupId={{consumer.group}}")
+                        .routeId("FromKafka")
+                    .log("${body}");
             }
         });
         camelContext.start();

http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java
index ee5bd9d..dac7b36 100644
--- a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java
+++ b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java
@@ -51,27 +51,20 @@ public final class MessagePublisherClient {
                 pc.setLocation("classpath:application.properties");
 
                 from("direct:kafkaStart").routeId("DirectToKafka")
-                        .to("kafka:{{kafka.host}}:{{kafka.port}}?topic={{producer.topic}}").log("${headers}"); // Topic
-                                                                                                                // and
-                                                                                                                // offset
-                                                                                                                // of
-                                                                                                                // the
-                                                                                                                // record
-                                                                                                                // is
-                                                                                                                // returned.
+                    .to("kafka:{{producer.topic}}?brokers={{kafka.host}}:{{kafka.port}}").log("${headers}");
 
                 // Topic can be set in header as well.
 
-                from("direct:kafkaStartNoTopic").routeId("kafkaStartNoTopic").to("kafka:{{kafka.host}}:{{kafka.port}}")
-                        .log("${headers}"); // Topic and offset of the record is
-                                            // returned.
+                from("direct:kafkaStartNoTopic").routeId("kafkaStartNoTopic")
+                    .to("kafka:dummy?brokers={{kafka.host}}:{{kafka.port}}")
+                    .log("${headers}");
 
                 // Use custom partitioner based on the key.
 
                 from("direct:kafkaStartWithPartitioner").routeId("kafkaStartWithPartitioner")
-                        .to("kafka:{{kafka.host}}:{{kafka.port}}?topic={{producer.topic}}&partitioner={{producer.partitioner}}")
-                        .log("${headers}"); // Use custom partitioner based on
-                                            // the key.
+                        .to("kafka:{{producer.topic}}?brokers={{kafka.host}}:{{kafka.port}}&partitioner={{producer.partitioner}}")
+                        .log("${headers}");
+
 
                 // Takes input from the command line.
 
@@ -101,14 +94,12 @@ public final class MessagePublisherClient {
 
         testKafkaMessage = "PART 0 :  " + testKafkaMessage;
         Map<String, Object> newHeader = new HashMap<String, Object>();
-        newHeader.put(KafkaConstants.KEY, "AB"); // This should go to partition
-                                                    // 0
+        newHeader.put(KafkaConstants.KEY, "AB"); // This should go to partition 0
 
         producerTemplate.sendBodyAndHeaders("direct:kafkaStartWithPartitioner", testKafkaMessage, newHeader);
 
         testKafkaMessage = "PART 1 :  " + testKafkaMessage;
-        newHeader.put(KafkaConstants.KEY, "ABC"); // This should go to partition
-                                                    // 1
+        newHeader.put(KafkaConstants.KEY, "ABC"); // This should go to partition 1
 
         producerTemplate.sendBodyAndHeaders("direct:kafkaStartWithPartitioner", testKafkaMessage, newHeader);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java
index 13d57aa..0566bd1 100644
--- a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java
+++ b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,55 +23,29 @@ import org.apache.kafka.common.Cluster;
 
 public class StringPartitioner implements Partitioner {
 
-    /**
-     * 
-     */
     public StringPartitioner() {
-        // TODO Auto-generated constructor stub
+        // noop
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.kafka.common.Configurable#configure(java.util.Map)
-     */
     @Override
     public void configure(Map<String, ?> configs) {
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.kafka.clients.producer.Partitioner#partition(java.lang.String,
-     * java.lang.Object, byte[], java.lang.Object, byte[],
-     * org.apache.kafka.common.Cluster)
-     */
     @Override
     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
-
         int partId = 0;
 
         if (key != null && key instanceof String) {
-
             String sKey = (String) key;
-
             int len = sKey.length();
 
             // This will return either 1 or zero
-
             partId = len % 2;
-
         }
 
         return partId;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.kafka.clients.producer.Partitioner#close()
-     */
     @Override
     public void close() {
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/examples/camel-example-kafka/src/main/resources/application.properties b/examples/camel-example-kafka/src/main/resources/application.properties
index 1728337..08738bd 100644
--- a/examples/camel-example-kafka/src/main/resources/application.properties
+++ b/examples/camel-example-kafka/src/main/resources/application.properties
@@ -20,31 +20,21 @@
 kafka.host=localhost
 kafka.port=9092
 
-
+kafka.serializerClass=kafka.serializer.StringEncoder
 
 # Producer properties
-
 producer.topic=TestLog
-
 producer.partitioner=org.apache.camel.example.kafka.StringPartitioner
 
-
-kafka.serializerClass=kafka.serializer.StringEncoder
-
 # Consumer properties 
 
 # One consumer can listen to more than one topics.[ TestLog,AccessLog ] 
-
 consumer.topic=TestLog
-
 consumer.group=kafkaGroup
-
 consumer.maxPollRecords=5000
 
 # No of consumers that connect to Kafka server
-
 consumer.consumersCount=1
 
 # Get records from the begining
-
 consumer.seekToBeginning=true