You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/16 09:36:04 UTC
(camel) branch camel-3.22.x updated: CAMEL-20218: Use a single Kafka consumer with assign() call to consume all cached entries from the beginning of the Kafka topic.
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-3.22.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.22.x by this push:
new c5bf53188b9 CAMEL-20218: Use a single Kafka consumer with assign() call to consume all cached entries from the beginning of the Kafka topic.
c5bf53188b9 is described below
commit c5bf53188b972a54b9782450990ae4088af918ff
Author: Arseniy Tashoyan <ar...@swisscom.com>
AuthorDate: Mon Jan 15 15:44:13 2024 +0100
CAMEL-20218: Use a single Kafka consumer with assign() call to consume all cached entries from the beginning of the Kafka topic.
---
.../kafka/KafkaIdempotentRepository.java | 71 +++++++++++-----------
1 file changed, 37 insertions(+), 34 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index 8087ae40e95..3ae33f5a546 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -18,15 +18,15 @@ package org.apache.camel.processor.idempotent.kafka;
import java.time.Duration;
import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
@@ -41,7 +41,6 @@ import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -49,6 +48,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -115,6 +115,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
}
+ /**
+ * @deprecated Use the constructor without groupId; the parameter groupId is ignored.
+ */
+ @Deprecated
public KafkaIdempotentRepository(String topic, String bootstrapServers, String groupId) {
this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId);
}
@@ -130,6 +134,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
}
+ /**
+ * @deprecated Use the constructor without groupId; the parameter groupId is ignored.
+ */
+ @Deprecated
public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, String groupId) {
this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId);
}
@@ -143,6 +151,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
this.pollDurationMs = pollDurationMs;
}
+ /**
+ * @deprecated Use the constructor without groupId; the parameter groupId is ignored.
+ */
+ @Deprecated
public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs,
String groupId) {
this.topic = topic;
@@ -152,6 +164,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
this.groupId = groupId;
}
+ /**
+ * @deprecated Use the constructor without groupId; the parameter groupId is ignored.
+ */
+ @Deprecated
public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize,
int pollDurationMs, String groupId) {
this.topic = topic;
@@ -279,6 +295,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
this.pollDurationMs = pollDurationMs;
}
+ /**
+ * @deprecated The parameter groupId is ignored.
+ */
+ @Deprecated
public String getGroupId() {
return groupId;
}
@@ -287,7 +307,9 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
* Sets the group id of the Kafka consumer.
*
* @param groupId The poll duration in milliseconds.
+ * @deprecated The parameter groupId is ignored.
*/
+ @Deprecated
public void setGroupId(String groupId) {
this.groupId = groupId;
}
@@ -325,15 +347,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
ObjectHelper.notNull(consumerConfig, "consumerConfig");
ObjectHelper.notNull(producerConfig, "producerConfig");
- // each consumer instance must have control over its own offset, so
- // assign a groupID at random if not specified
- if (ObjectHelper.isEmpty(groupId)) {
- groupId = UUID.randomUUID().toString();
- }
- log.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId);
-
- consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE.toString());
+ consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE.toString());
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
@@ -465,28 +479,17 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
@Override
public void run() {
- log.debug("Subscribing consumer to {}", topic);
- consumer.subscribe(Collections.singleton(topic), new ConsumerRebalanceListener() {
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> collection) {
- }
+ log.debug("Getting partitions of topic {}", topic);
+ List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+ Collection<TopicPartition> partitions = partitionInfos.stream()
+ .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
+ .collect(Collectors.toUnmodifiableList());
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> collection) {
- // Whenever a partition is assigned, we want to consume from the beginning to guarantee all the
- // existing entries in the topic/partition are added to the cache
- log.debug("Seeking to beginning");
- consumer.seekToBeginning(collection);
- }
- });
-
- // According to the Kafka documentation: "Rebalances will only occur during an active call to poll, so
- // callbacks will also only be invoked during that time".
- // We can safely trigger a poll(0) because the consumer doesn't have any record pre-fetched.
- log.debug("Forcing rebalance to get partitions assigned");
- if (!consumer.poll(Duration.ofMillis(0)).isEmpty()) {
- throw new IllegalStateException("First call to Kafka consumer.poll(0) should never return any record");
- }
+ log.debug("Assigning consumer to partitions {}", partitions);
+ consumer.assign(partitions);
+
+ log.debug("Seeking consumer to beginning of partitions {}", partitions);
+ consumer.seekToBeginning(partitions);
POLL_LOOP: while (running.get()) {
log.trace("Polling");
@@ -529,7 +532,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
continue POLL_LOOP;
}
}
-
+ consumer.commitSync();
}
log.debug("TopicPoller finished - triggering shutdown latch");
shutdownLatch.countDown();