You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/07 19:52:28 UTC
[camel] branch main updated: [camel-18327] resuming from last committed offset
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 95d3905893c [camel-18327] resuming from last committed offset
95d3905893c is described below
commit 95d3905893cc4defa59fb5d27125a1b4dad7be56
Author: geekr <ge...@gmail.com>
AuthorDate: Fri Oct 7 13:54:42 2022 -0400
[camel-18327] resuming from last committed offset
---
.../org/apache/camel/component/kafka/KafkaFetchRecords.java | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 5684375bbdd..6e482a1df64 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -41,7 +41,9 @@ import org.apache.camel.util.ReflectionHelper;
import org.apache.camel.util.TimeUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
@@ -381,6 +383,14 @@ public class KafkaFetchRecords implements Runnable {
break;
case RESUME_REQUESTED:
LOG.info("Resuming the consumer as a response to a resume request");
+ if (consumer.committed(this.consumer.assignment()) != null) {
+ consumer.committed(this.consumer.assignment()).forEach((k, v) -> {
+ final TopicPartition tp = (TopicPartition) k;
+ LOG.info("Resuming from the offset {} for the topic {} with partition {}",
+ ((OffsetAndMetadata) v).offset(), tp.topic(), tp.partition());
+ consumer.seek(tp, ((OffsetAndMetadata) v).offset());
+ });
+ }
consumer.resume(consumer.assignment());
state = State.RUNNING;
break;