You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/07 19:52:28 UTC

[camel] branch main updated: [camel-18327] resuming from last committed offset

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 95d3905893c [camel-18327] resuming from last committed offset
95d3905893c is described below

commit 95d3905893cc4defa59fb5d27125a1b4dad7be56
Author: geekr <ge...@gmail.com>
AuthorDate: Fri Oct 7 13:54:42 2022 -0400

    [camel-18327] resuming from last committed offset
---
 .../org/apache/camel/component/kafka/KafkaFetchRecords.java    | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 5684375bbdd..6e482a1df64 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -41,7 +41,9 @@ import org.apache.camel.util.ReflectionHelper;
 import org.apache.camel.util.TimeUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
@@ -381,6 +383,14 @@ public class KafkaFetchRecords implements Runnable {
                 break;
             case RESUME_REQUESTED:
                 LOG.info("Resuming the consumer as a response to a resume request");
+                if (consumer.committed(this.consumer.assignment()) != null) {
+                    consumer.committed(this.consumer.assignment()).forEach((k, v) -> {
+                        final TopicPartition tp = (TopicPartition) k;
+                        LOG.info("Resuming from the offset {} for the topic {} with partition {}",
+                                ((OffsetAndMetadata) v).offset(), tp.topic(), tp.partition());
+                        consumer.seek(tp, ((OffsetAndMetadata) v).offset());
+                    });
+                }
                 consumer.resume(consumer.assignment());
                 state = State.RUNNING;
                 break;