You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/16 09:36:04 UTC

(camel) branch camel-3.22.x updated: CAMEL-20218: Use a single Kafka consumer with assign() call to consume all cached entries from the beginning of the Kafka topic.

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.22.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.22.x by this push:
     new c5bf53188b9 CAMEL-20218: Use a single Kafka consumer with assign() call to consume all cached entries from the beginning of the Kafka topic.
c5bf53188b9 is described below

commit c5bf53188b972a54b9782450990ae4088af918ff
Author: Arseniy Tashoyan <ar...@swisscom.com>
AuthorDate: Mon Jan 15 15:44:13 2024 +0100

    CAMEL-20218: Use a single Kafka consumer with assign() call to consume all cached entries from the beginning of the Kafka topic.
---
 .../kafka/KafkaIdempotentRepository.java           | 71 +++++++++++-----------
 1 file changed, 37 insertions(+), 34 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index 8087ae40e95..3ae33f5a546 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -18,15 +18,15 @@ package org.apache.camel.processor.idempotent.kafka;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
@@ -41,7 +41,6 @@ import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StringHelper;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -49,6 +48,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -115,6 +115,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
     }
 
+    /**
+     * @deprecated Use the constructor without groupId; the parameter groupId is ignored.
+     */
+    @Deprecated
     public KafkaIdempotentRepository(String topic, String bootstrapServers, String groupId) {
         this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId);
     }
@@ -130,6 +134,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
     }
 
+    /**
+     * @deprecated Use the constructor without groupId; the parameter groupId is ignored.
+     */
+    @Deprecated
     public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, String groupId) {
         this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId);
     }
@@ -143,6 +151,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this.pollDurationMs = pollDurationMs;
     }
 
+    /**
+     * @deprecated Use the constructor without groupId; the parameter groupId is ignored.
+     */
+    @Deprecated
     public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs,
                                      String groupId) {
         this.topic = topic;
@@ -152,6 +164,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this.groupId = groupId;
     }
 
+    /**
+     * @deprecated Use the constructor without groupId; the parameter groupId is ignored.
+     */
+    @Deprecated
     public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize,
                                      int pollDurationMs, String groupId) {
         this.topic = topic;
@@ -279,6 +295,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         this.pollDurationMs = pollDurationMs;
     }
 
+    /**
+     * @deprecated The parameter groupId is ignored.
+     */
+    @Deprecated
     public String getGroupId() {
         return groupId;
     }
@@ -287,7 +307,9 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
      * Sets the group id of the Kafka consumer.
      *
      * @param groupId The poll duration in milliseconds.
+     * @deprecated The parameter groupId is ignored.
      */
+    @Deprecated
     public void setGroupId(String groupId) {
         this.groupId = groupId;
     }
@@ -325,15 +347,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         ObjectHelper.notNull(consumerConfig, "consumerConfig");
         ObjectHelper.notNull(producerConfig, "producerConfig");
 
-        // each consumer instance must have control over its own offset, so
-        // assign a groupID at random if not specified
-        if (ObjectHelper.isEmpty(groupId)) {
-            groupId = UUID.randomUUID().toString();
-        }
-        log.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId);
-
-        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE.toString());
+        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE.toString());
         consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
@@ -465,28 +479,17 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
 
         @Override
         public void run() {
-            log.debug("Subscribing consumer to {}", topic);
-            consumer.subscribe(Collections.singleton(topic), new ConsumerRebalanceListener() {
-                @Override
-                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-                }
+            log.debug("Getting partitions of topic {}", topic);
+            List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+            Collection<TopicPartition> partitions = partitionInfos.stream()
+                    .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
+                    .collect(Collectors.toUnmodifiableList());
 
-                @Override
-                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
-                    // Whenever a partition is assigned, we want to consume from the beginning to guarantee all the
-                    // existing entries in the topic/partition are added to the cache
-                    log.debug("Seeking to beginning");
-                    consumer.seekToBeginning(collection);
-                }
-            });
-
-            // According to the Kafka documentation: "Rebalances will only occur during an active call to poll, so
-            // callbacks will also only be invoked during that time".
-            // We can safely trigger a poll(0) because the consumer doesn't have any record pre-fetched.
-            log.debug("Forcing rebalance to get partitions assigned");
-            if (!consumer.poll(Duration.ofMillis(0)).isEmpty()) {
-                throw new IllegalStateException("First call to Kafka consumer.poll(0) should never return any record");
-            }
+            log.debug("Assigning consumer to partitions {}", partitions);
+            consumer.assign(partitions);
+
+            log.debug("Seeking consumer to beginning of partitions {}", partitions);
+            consumer.seekToBeginning(partitions);
 
             POLL_LOOP: while (running.get()) {
                 log.trace("Polling");
@@ -529,7 +532,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
                         continue POLL_LOOP;
                     }
                 }
-
+                consumer.commitSync();
             }
             log.debug("TopicPoller finished - triggering shutdown latch");
             shutdownLatch.countDown();