You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/04/05 09:21:54 UTC

[camel] branch camel-3.x updated (bcfcc90c523 -> 2d37c05acd9)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


    from bcfcc90c523 CAMEL-19242: camel-jbang - camel doc may not work on main
     new cfc5a88c10c CAMEL-19180 - Kafka Idempotent Repository does not give the user control over a randomized group id if the kafka broker requires the id to be in a specified form
     new d44c275f0fd CAMEL-19180 - Kafka Idempotent Repository does not give the user control over a randomized group id if the kafka broker requires the id to be in a specified form
     new 2d37c05acd9 CAMEL-19180 - Kafka Idempotent Repository does not give the user control over a randomized group id if the kafka broker requires the id to be in a specified form

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel-kafka/src/main/docs/kafka-component.adoc |  1 +
 .../kafka/KafkaIdempotentRepository.java           | 48 +++++++++++++++++++++-
 ....java => KafkaConsumerIdempotentGroupIdIT.java} | 37 +++++++----------
 3 files changed, 62 insertions(+), 24 deletions(-)
 copy components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/{KafkaConsumerIdempotentIT.java => KafkaConsumerIdempotentGroupIdIT.java} (72%)


[camel] 03/03: CAMEL-19180 - Kafka Idempotent Repository does not give the user control over a randomized group id if the kafka broker requires the id to be in a specified form

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2d37c05acd9eb2cd49d201dcd62e97bdb19864a5
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Apr 5 11:09:39 2023 +0200

    CAMEL-19180 - Kafka Idempotent Repository does not give the user control over a randomized group id if the kafka broker requires the id to be in a specified form
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 components/camel-kafka/src/main/docs/kafka-component.adoc | 1 +
 1 file changed, 1 insertion(+)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 07f431090dd..a2aed963d68 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -242,6 +242,7 @@ A `KafkaIdempotentRepository` has the following properties:
 | maxCacheSize | How many of the most recently used keys should be stored in memory (default 1000).
 | pollDurationMs | The poll duration of the Kafka consumer. The local caches are updated immediately. This value will affect how far behind other peers that update their caches from the topic are relative to the idempotent consumer instance that sent the cache action message. The default value of this is 100 ms. +
 If setting this value explicitly, be aware that there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository's consumer and the Kafka brokers. The cache warmup process also depends on there being one poll that fetches nothing - this indicates that the stream has been consumed up to the current point. If the poll duration is excessively long for the rate at which messages are sent on the topic, there exists a possibility that the cache ca [...]
+| groupId | The groupId to assign to the idempotent consumer. If not specified it will be randomize.
 |===
 
 The repository can be instantiated by defining the `topic` and `bootstrapServers`, or the `producerConfig` and `consumerConfig` property sets can be explicitly defined to enable features such as SSL/SASL.


[camel] 02/03: CAMEL-19180 - Kafka Idempotent Repository does not give the user control over a randomized group id if the kafka broker requires the id to be in a specified form

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d44c275f0fd7ac78189bd53176c3aeb53a264c23
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Apr 5 11:08:07 2023 +0200

    CAMEL-19180 - Kafka Idempotent Repository does not give the user control over a randomized group id if the kafka broker requires the id to be in a specified form
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 .../processor/idempotent/kafka/KafkaIdempotentRepository.java      | 3 ++-
 .../kafka/integration/KafkaConsumerIdempotentGroupIdIT.java        | 7 ++++---
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index 89debcfaeb9..07005a529a2 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -143,7 +143,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this.pollDurationMs = pollDurationMs;
     }
 
-    public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs, String groupId) {
+    public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs,
+                                     String groupId) {
         this.topic = topic;
         this.bootstrapServers = bootstrapServers;
         this.maxCacheSize = maxCacheSize;
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
index 09937f7542b..7d3f7f16a8d 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.kafka.integration;
 
+import java.util.Arrays;
+
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
@@ -27,8 +29,6 @@ import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 
-import java.util.Arrays;
-
 import static org.apache.camel.component.kafka.serde.KafkaSerdeHelper.numericHeader;
 
 @DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", matches = "false")
@@ -39,7 +39,8 @@ public class KafkaConsumerIdempotentGroupIdIT extends KafkaConsumerIdempotentTes
     private int size = 200;
 
     @BindToRegistry("kafkaIdempotentRepository")
-    private KafkaIdempotentRepository testIdempotent = new KafkaIdempotentRepository("TEST_IDEMPOTENT", getBootstrapServers(), "test_1");
+    private KafkaIdempotentRepository testIdempotent
+            = new KafkaIdempotentRepository("TEST_IDEMPOTENT", getBootstrapServers(), "test_1");
 
     @BeforeEach
     public void before() {


[camel] 01/03: CAMEL-19180 - Kafka Idempotent Repository does not give the user control over a randomized group id if the kafka broker requires the id to be in a specified form

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit cfc5a88c10cf19fa4baa74f0f793751cf73bb894
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Apr 5 10:53:21 2023 +0200

    CAMEL-19180 - Kafka Idempotent Repository does not give the user control over a randomized group id if the kafka broker requires the id to be in a specified form
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 .../kafka/KafkaIdempotentRepository.java           | 47 ++++++++++++-
 .../KafkaConsumerIdempotentGroupIdIT.java          | 80 ++++++++++++++++++++++
 2 files changed, 125 insertions(+), 2 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index 8ae44575332..89debcfaeb9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -82,6 +82,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
     // configurable
     private String topic;
     private String bootstrapServers;
+
+    private String groupId = null;
     private Properties producerConfig;
     private Properties consumerConfig;
     private int maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE;
@@ -113,6 +115,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
     }
 
+    public KafkaIdempotentRepository(String topic, String bootstrapServers, String groupId) {
+        this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId);
+    }
+
     public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs) {
         this.topic = topic;
         this.bootstrapServers = bootstrapServers;
@@ -124,6 +130,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
     }
 
+    public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, String groupId) {
+        this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId);
+    }
+
     public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize,
                                      int pollDurationMs) {
         this.topic = topic;
@@ -133,6 +143,24 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this.pollDurationMs = pollDurationMs;
     }
 
+    public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs, String groupId) {
+        this.topic = topic;
+        this.bootstrapServers = bootstrapServers;
+        this.maxCacheSize = maxCacheSize;
+        this.pollDurationMs = pollDurationMs;
+        this.groupId = groupId;
+    }
+
+    public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize,
+                                     int pollDurationMs, String groupId) {
+        this.topic = topic;
+        this.consumerConfig = consumerConfig;
+        this.producerConfig = producerConfig;
+        this.maxCacheSize = maxCacheSize;
+        this.pollDurationMs = pollDurationMs;
+        this.groupId = groupId;
+    }
+
     public String getTopic() {
         return topic;
     }
@@ -250,6 +278,19 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this.pollDurationMs = pollDurationMs;
     }
 
+    public String getGroupId() {
+        return groupId;
+    }
+
+    /**
+     * Sets the group id of the Kafka consumer.
+     *
+     * @param groupId The poll duration in milliseconds.
+     */
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
     @Override
     public void setCamelContext(CamelContext camelContext) {
         this.camelContext = camelContext;
@@ -284,8 +325,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         ObjectHelper.notNull(producerConfig, "producerConfig");
 
         // each consumer instance must have control over its own offset, so
-        // assign a groupID at random
-        String groupId = UUID.randomUUID().toString();
+        // assign a groupID at random if not specified
+        if (ObjectHelper.isEmpty(groupId)) {
+            groupId = UUID.randomUUID().toString();
+        }
         log.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId);
 
         consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
new file mode 100644
index 00000000000..09937f7542b
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+
+import java.util.Arrays;
+
+import static org.apache.camel.component.kafka.serde.KafkaSerdeHelper.numericHeader;
+
+@DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", matches = "false")
+public class KafkaConsumerIdempotentGroupIdIT extends KafkaConsumerIdempotentTestSupport {
+
+    public static final String TOPIC = "idempt";
+
+    private int size = 200;
+
+    @BindToRegistry("kafkaIdempotentRepository")
+    private KafkaIdempotentRepository testIdempotent = new KafkaIdempotentRepository("TEST_IDEMPOTENT", getBootstrapServers(), "test_1");
+
+    @BeforeEach
+    public void before() {
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all();
+        doSend(size, TOPIC);
+    }
+
+    @AfterEach
+    public void after() {
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from("kafka:" + TOPIC
+                     + "?groupId=KafkaConsumerIdempotentIT&autoOffsetReset=earliest"
+                     + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                     + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                     + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true"
+                     + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor").routeId("foo")
+                        .idempotentConsumer(numericHeader("id"))
+                        .idempotentRepository("kafkaIdempotentRepository")
+                        .to(KafkaTestUtil.MOCK_RESULT);
+            }
+        };
+    }
+
+    @Test
+    @DisplayName("Numeric headers is consumable when using idempotent (CAMEL-16914)")
+    void kafkaIdempotentMessageIsConsumedByCamel() {
+        MockEndpoint to = contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT);
+
+        doRun(to, size);
+    }
+}