You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/12 13:55:27 UTC

[camel] 03/03: [camel-18327] resuming from last committed offset

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f519f84b7df11ebc12bf475c70bd35a126f480c0
Author: geekr <ge...@gmail.com>
AuthorDate: Fri Oct 7 13:54:42 2022 -0400

    [camel-18327] resuming from last committed offset
---
 .../org/apache/camel/component/kafka/KafkaFetchRecords.java    | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index edf39671436..079a14615ab 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -41,7 +41,9 @@ import org.apache.camel.util.ReflectionHelper;
 import org.apache.camel.util.TimeUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
@@ -380,6 +382,14 @@ public class KafkaFetchRecords implements Runnable {
                 break;
             case RESUME_REQUESTED:
                 LOG.info("Resuming the consumer as a response to a resume request");
+                if (consumer.committed(this.consumer.assignment()) != null) {
+                    consumer.committed(this.consumer.assignment()).forEach((k, v) -> {
+                        final TopicPartition tp = (TopicPartition) k;
+                        LOG.info("Resuming from the offset {} for the topic {} with partition {}",
+                                ((OffsetAndMetadata) v).offset(), tp.topic(), tp.partition());
+                        consumer.seek(tp, ((OffsetAndMetadata) v).offset());
+                    });
+                }
                 consumer.resume(consumer.assignment());
                 state = State.RUNNING;
                 break;