You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/10/20 08:29:57 UTC

[camel] branch camel-3.18.x updated: [camel-18627] added null check to check if there was previously committed offset before resuming (#8588)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.18.x by this push:
     new c40b78e4986 [camel-18627] added null check to check if there was previously committed offset before resuming (#8588)
c40b78e4986 is described below

commit c40b78e4986210ec5b99e29dabb3957925054d95
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Thu Oct 20 10:29:51 2022 +0200

    [camel-18627] added null check to check if there was previously committed offset before resuming (#8588)
    
    Co-authored-by: geekr <ge...@gmail.com>
---
 .../org/apache/camel/component/kafka/KafkaFetchRecords.java    | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 079a14615ab..f94e3e118a5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -384,10 +384,12 @@ public class KafkaFetchRecords implements Runnable {
                 LOG.info("Resuming the consumer as a response to a resume request");
                 if (consumer.committed(this.consumer.assignment()) != null) {
                     consumer.committed(this.consumer.assignment()).forEach((k, v) -> {
-                        final TopicPartition tp = (TopicPartition) k;
-                        LOG.info("Resuming from the offset {} for the topic {} with partition {}",
-                                ((OffsetAndMetadata) v).offset(), tp.topic(), tp.partition());
-                        consumer.seek(tp, ((OffsetAndMetadata) v).offset());
+                        if (v != null) {
+                            final TopicPartition tp = (TopicPartition) k;
+                            LOG.info("Resuming from the offset {} for the topic {} with partition {}",
+                                    ((OffsetAndMetadata) v).offset(), tp.topic(), tp.partition());
+                            consumer.seek(tp, ((OffsetAndMetadata) v).offset());
+                        }
                     });
                 }
                 consumer.resume(consumer.assignment());