You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/02/27 20:52:54 UTC

[incubator-streampipes] 01/02: [hotfix] Modify assignment of groupId

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch rel/0.69.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 2b19458cb3f46beac9179a36cf20e0e95aab3fa6
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Feb 27 20:41:54 2022 +0100

    [hotfix] Modify assignment of groupId
---
 .../streampipes/messaging/kafka/config/ConsumerConfigFactory.java      | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index 30dfbdd..2aa488a 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -19,7 +19,6 @@ package org.apache.streampipes.messaging.kafka.config;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 
 import java.util.Properties;
@@ -42,7 +41,7 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
   public Properties makeDefaultProperties() {
     Properties props = new Properties();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, getConfigOrDefault(protocol::getGroupId, UUID.randomUUID().toString()));
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT_CONFIG_DEFAULT);
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
             AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT);