You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/04/05 09:21:18 UTC

[camel] 01/03: CAMEL-19180 - Kafka Idempotent Repository does not give the user control over a randomized group id if the kafka broker requires the id to be in a specified form

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7bb96e8a976cfd33922d11eb341c319f96be1703
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Apr 5 10:53:21 2023 +0200

    CAMEL-19180 - Kafka Idempotent Repository does not give the user control over a randomized group id if the kafka broker requires the id to be in a specified form
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 .../kafka/KafkaIdempotentRepository.java           | 47 ++++++++++++-
 .../KafkaConsumerIdempotentGroupIdIT.java          | 80 ++++++++++++++++++++++
 2 files changed, 125 insertions(+), 2 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index 3e2d3ca0193..42715ce1aa3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -82,6 +82,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
     // configurable
     private String topic;
     private String bootstrapServers;
+
+    private String groupId = null;
     private Properties producerConfig;
     private Properties consumerConfig;
     private int maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE;
@@ -113,6 +115,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
     }
 
+    public KafkaIdempotentRepository(String topic, String bootstrapServers, String groupId) {
+        this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId);
+    }
+
     public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs) {
         this.topic = topic;
         this.bootstrapServers = bootstrapServers;
@@ -124,6 +130,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
     }
 
+    public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, String groupId) {
+        this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId);
+    }
+
     public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize,
                                      int pollDurationMs) {
         this.topic = topic;
@@ -133,6 +143,24 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this.pollDurationMs = pollDurationMs;
     }
 
+    public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs, String groupId) {
+        this.topic = topic;
+        this.bootstrapServers = bootstrapServers;
+        this.maxCacheSize = maxCacheSize;
+        this.pollDurationMs = pollDurationMs;
+        this.groupId = groupId;
+    }
+
+    public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize,
+                                     int pollDurationMs, String groupId) {
+        this.topic = topic;
+        this.consumerConfig = consumerConfig;
+        this.producerConfig = producerConfig;
+        this.maxCacheSize = maxCacheSize;
+        this.pollDurationMs = pollDurationMs;
+        this.groupId = groupId;
+    }
+
     public String getTopic() {
         return topic;
     }
@@ -250,6 +278,19 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this.pollDurationMs = pollDurationMs;
     }
 
+    public String getGroupId() {
+        return groupId;
+    }
+
+    /**
+     * Sets the group id of the Kafka consumer.
+     *
+     * @param groupId The poll duration in milliseconds.
+     */
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
     @Override
     public void setCamelContext(CamelContext camelContext) {
         this.camelContext = camelContext;
@@ -284,8 +325,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         ObjectHelper.notNull(producerConfig, "producerConfig");
 
         // each consumer instance must have control over its own offset, so
-        // assign a groupID at random
-        String groupId = UUID.randomUUID().toString();
+        // assign a groupID at random if not specified
+        if (ObjectHelper.isEmpty(groupId)) {
+            groupId = UUID.randomUUID().toString();
+        }
         log.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId);
 
         consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
new file mode 100644
index 00000000000..09937f7542b
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+
+import java.util.Arrays;
+
+import static org.apache.camel.component.kafka.serde.KafkaSerdeHelper.numericHeader;
+
+@DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", matches = "false")
+public class KafkaConsumerIdempotentGroupIdIT extends KafkaConsumerIdempotentTestSupport {
+
+    public static final String TOPIC = "idempt";
+
+    private int size = 200;
+
+    @BindToRegistry("kafkaIdempotentRepository")
+    private KafkaIdempotentRepository testIdempotent = new KafkaIdempotentRepository("TEST_IDEMPOTENT", getBootstrapServers(), "test_1");
+
+    @BeforeEach
+    public void before() {
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all();
+        doSend(size, TOPIC);
+    }
+
+    @AfterEach
+    public void after() {
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from("kafka:" + TOPIC
+                     + "?groupId=KafkaConsumerIdempotentIT&autoOffsetReset=earliest"
+                     + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                     + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                     + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true"
+                     + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor").routeId("foo")
+                        .idempotentConsumer(numericHeader("id"))
+                        .idempotentRepository("kafkaIdempotentRepository")
+                        .to(KafkaTestUtil.MOCK_RESULT);
+            }
+        };
+    }
+
+    @Test
+    @DisplayName("Numeric headers is consumable when using idempotent (CAMEL-16914)")
+    void kafkaIdempotentMessageIsConsumedByCamel() {
+        MockEndpoint to = contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT);
+
+        doRun(to, size);
+    }
+}