You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/08/29 11:41:35 UTC
camel git commit: Use unique groupId by default
Repository: camel
Updated Branches:
refs/heads/master a2d2a1f35 -> 6c24e43f1
Use unique groupId by default
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6c24e43f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6c24e43f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6c24e43f
Branch: refs/heads/master
Commit: 6c24e43f18d2ee5ce6c6375bbec571ce6ad7f5b1
Parents: a2d2a1f
Author: Ruuskanen Jyrki <jy...@finavia.fi>
Authored: Thu Jun 29 13:13:35 2017 +0300
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Tue Aug 29 13:36:43 2017 +0200
----------------------------------------------------------------------
.../camel-kafka/src/main/docs/kafka-component.adoc | 2 +-
.../apache/camel/component/kafka/KafkaConsumer.java | 16 +++++++++++-----
.../component/kafka/KafkaConsumerBatchSizeTest.java | 3 +--
.../camel/component/kafka/KafkaConsumerTest.java | 11 +----------
4 files changed, 14 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6c24e43f/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index f474877..75fa6c2 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -207,7 +207,7 @@ After the message is sent to Kafka, the following headers are available
Here is the minimal route you need in order to read messages from Kafka.
[source,java]
-------------------------------------------------------------
-from("kafka:test?brokers=localhost:9092&groupId=testing")
+from("kafka:test?brokers=localhost:9092")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
http://git-wip-us.apache.org/repos/asf/camel/blob/6c24e43f/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 26bb126..48125ac 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Exchange;
@@ -61,10 +62,6 @@ public class KafkaConsumer extends DefaultConsumer {
if (ObjectHelper.isEmpty(brokers)) {
throw new IllegalArgumentException("Brokers must be configured");
}
-
- if (endpoint.getConfiguration().getGroupId() == null) {
- throw new IllegalArgumentException("groupId must not be null");
- }
}
Properties getProps() {
@@ -78,7 +75,16 @@ public class KafkaConsumer extends DefaultConsumer {
}
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, endpoint.getConfiguration().getGroupId());
+
+ if (endpoint.getConfiguration().getGroupId() != null) {
+ String groupId = endpoint.getConfiguration().getGroupId();
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ log.debug("Kafka consumer groupId is {}", groupId);
+ } else {
+ String randomGroupId = UUID.randomUUID().toString();
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, randomGroupId);
+ log.debug("Kafka consumer groupId is {} (generated)", randomGroupId);
+ }
return props;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6c24e43f/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
index 8f1bfa1..24fdcbf 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
@@ -34,8 +34,7 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
public static final String TOPIC = "test";
@EndpointInject(uri = "kafka:" + TOPIC
- + "?groupId=group1"
- + "&autoOffsetReset=earliest"
+ + "?autoOffsetReset=earliest"
+ "&autoCommitEnable=false"
+ "&consumerStreams=10"
)
http://git-wip-us.apache.org/repos/asf/camel/blob/6c24e43f/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index 3e249b4..335e883 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -37,19 +37,10 @@ public class KafkaConsumerTest {
new KafkaConsumer(endpoint, processor);
}
- @Test(expected = IllegalArgumentException.class)
- public void consumerRequiresGroupId() throws Exception {
- when(endpoint.getComponent()).thenReturn(component);
- when(endpoint.getConfiguration()).thenReturn(configuration);
- when(endpoint.getConfiguration().getBrokers()).thenReturn("localhost:1234");
- new KafkaConsumer(endpoint, processor);
- }
-
@Test
- public void consumerOnlyRequiresBootstrapServersAndGroupId() throws Exception {
+ public void consumerOnlyRequiresBootstrapServers() throws Exception {
when(endpoint.getComponent()).thenReturn(component);
when(endpoint.getConfiguration()).thenReturn(configuration);
- when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
when(endpoint.getConfiguration().getBrokers()).thenReturn("localhost:2181");
new KafkaConsumer(endpoint, processor);
}