You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/02/27 20:52:54 UTC
[incubator-streampipes] 01/02: [hotfix] Modify assignment of groupId
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch rel/0.69.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 2b19458cb3f46beac9179a36cf20e0e95aab3fa6
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Feb 27 20:41:54 2022 +0100
[hotfix] Modify assignment of groupId
---
.../streampipes/messaging/kafka/config/ConsumerConfigFactory.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index 30dfbdd..2aa488a 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -19,7 +19,6 @@ package org.apache.streampipes.messaging.kafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import java.util.Properties;
@@ -42,7 +41,7 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
public Properties makeDefaultProperties() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, getConfigOrDefault(protocol::getGroupId, UUID.randomUUID().toString()));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT_CONFIG_DEFAULT);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT);