You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/08/29 11:41:35 UTC

camel git commit: Use unique groupId by default

Repository: camel
Updated Branches:
  refs/heads/master a2d2a1f35 -> 6c24e43f1


Use unique groupId by default


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6c24e43f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6c24e43f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6c24e43f

Branch: refs/heads/master
Commit: 6c24e43f18d2ee5ce6c6375bbec571ce6ad7f5b1
Parents: a2d2a1f
Author: Ruuskanen Jyrki <jy...@finavia.fi>
Authored: Thu Jun 29 13:13:35 2017 +0300
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Tue Aug 29 13:36:43 2017 +0200

----------------------------------------------------------------------
 .../camel-kafka/src/main/docs/kafka-component.adoc  |  2 +-
 .../apache/camel/component/kafka/KafkaConsumer.java | 16 +++++++++++-----
 .../component/kafka/KafkaConsumerBatchSizeTest.java |  3 +--
 .../camel/component/kafka/KafkaConsumerTest.java    | 11 +----------
 4 files changed, 14 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6c24e43f/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index f474877..75fa6c2 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -207,7 +207,7 @@ After the message is sent to Kafka, the following headers are available
 Here is the minimal route you need in order to read messages from Kafka.
 [source,java]
 -------------------------------------------------------------
-from("kafka:test?brokers=localhost:9092&groupId=testing")
+from("kafka:test?brokers=localhost:9092")
     .log("Message received from Kafka : ${body}")
     .log("    on the topic ${headers[kafka.TOPIC]}")
     .log("    on the partition ${headers[kafka.PARTITION]}")

http://git-wip-us.apache.org/repos/asf/camel/blob/6c24e43f/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 26bb126..48125ac 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Exchange;
@@ -61,10 +62,6 @@ public class KafkaConsumer extends DefaultConsumer {
         if (ObjectHelper.isEmpty(brokers)) {
             throw new IllegalArgumentException("Brokers must be configured");
         }
-
-        if (endpoint.getConfiguration().getGroupId() == null) {
-            throw new IllegalArgumentException("groupId must not be null");
-        }
     }
 
     Properties getProps() {
@@ -78,7 +75,16 @@ public class KafkaConsumer extends DefaultConsumer {
         }
 
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, endpoint.getConfiguration().getGroupId());
+
+        if (endpoint.getConfiguration().getGroupId() != null) {
+            String groupId = endpoint.getConfiguration().getGroupId();
+            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+            log.debug("Kafka consumer groupId is {}", groupId);
+        } else {
+            String randomGroupId = UUID.randomUUID().toString();
+            props.put(ConsumerConfig.GROUP_ID_CONFIG, randomGroupId);
+            log.debug("Kafka consumer groupId is {} (generated)", randomGroupId);
+        }
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/6c24e43f/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
index 8f1bfa1..24fdcbf 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
@@ -34,8 +34,7 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
     public static final String TOPIC = "test";
 
     @EndpointInject(uri = "kafka:" + TOPIC
-            + "?groupId=group1"
-            + "&autoOffsetReset=earliest"
+            + "?autoOffsetReset=earliest"
             + "&autoCommitEnable=false"
             + "&consumerStreams=10"
     )

http://git-wip-us.apache.org/repos/asf/camel/blob/6c24e43f/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index 3e249b4..335e883 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -37,19 +37,10 @@ public class KafkaConsumerTest {
         new KafkaConsumer(endpoint, processor);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void consumerRequiresGroupId() throws Exception {
-        when(endpoint.getComponent()).thenReturn(component);
-        when(endpoint.getConfiguration()).thenReturn(configuration);
-        when(endpoint.getConfiguration().getBrokers()).thenReturn("localhost:1234");
-        new KafkaConsumer(endpoint, processor);
-    }
-
     @Test
-    public void consumerOnlyRequiresBootstrapServersAndGroupId() throws Exception {
+    public void consumerOnlyRequiresBootstrapServers() throws Exception {
         when(endpoint.getComponent()).thenReturn(component);
         when(endpoint.getConfiguration()).thenReturn(configuration);
-        when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
         when(endpoint.getConfiguration().getBrokers()).thenReturn("localhost:2181");
         new KafkaConsumer(endpoint, processor);
     }