You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/20 05:08:16 UTC
[camel] branch main updated: [camel-18627] added null check to check if there was previously committed offset before resuming
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new ce71c1e5304 [camel-18627] added null check to check if there was previously committed offset before resuming
ce71c1e5304 is described below
commit ce71c1e5304b933bc95a3478f45e006cacbfd350
Author: geekr <ge...@gmail.com>
AuthorDate: Wed Oct 19 17:56:12 2022 -0400
[camel-18627] added null check to check if there was previously committed offset before resuming
---
.../org/apache/camel/component/kafka/KafkaFetchRecords.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 079a14615ab..f94e3e118a5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -384,10 +384,12 @@ public class KafkaFetchRecords implements Runnable {
LOG.info("Resuming the consumer as a response to a resume request");
if (consumer.committed(this.consumer.assignment()) != null) {
consumer.committed(this.consumer.assignment()).forEach((k, v) -> {
- final TopicPartition tp = (TopicPartition) k;
- LOG.info("Resuming from the offset {} for the topic {} with partition {}",
- ((OffsetAndMetadata) v).offset(), tp.topic(), tp.partition());
- consumer.seek(tp, ((OffsetAndMetadata) v).offset());
+ if (v != null) {
+ final TopicPartition tp = (TopicPartition) k;
+ LOG.info("Resuming from the offset {} for the topic {} with partition {}",
+ ((OffsetAndMetadata) v).offset(), tp.topic(), tp.partition());
+ consumer.seek(tp, ((OffsetAndMetadata) v).offset());
+ }
});
}
consumer.resume(consumer.assignment());