You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/04/28 19:29:26 UTC

[camel] branch main updated: CAMEL-18020: fix a possible NPE due to uninitialized consumer in poll exception strategies

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 33807af8f58 CAMEL-18020: fix a possible NPE due to uninitialized consumer in poll exception strategies
33807af8f58 is described below

commit 33807af8f5881a4b1619a336db81ccfbafb6858d
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Apr 28 15:02:41 2022 +0200

    CAMEL-18020: fix a possible NPE due to uninitialized consumer in poll exception strategies
---
 .../java/org/apache/camel/component/kafka/KafkaFetchRecords.java    | 6 +++---
 .../component/kafka/consumer/errorhandler/KafkaErrorStrategies.java | 2 ++
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index c2002826d7c..0f090a34d41 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -73,7 +73,7 @@ public class KafkaFetchRecords implements Runnable {
     private final Pattern topicPattern;
     private final String threadId;
     private final Properties kafkaProps;
-    private final PollExceptionStrategy pollExceptionStrategy;
+    private PollExceptionStrategy pollExceptionStrategy;
     private final BridgeExceptionHandlerToErrorHandler bridge;
     private final ReentrantLock lock = new ReentrantLock();
     private CommitManager commitManager;
@@ -98,8 +98,6 @@ public class KafkaFetchRecords implements Runnable {
         this.consumerListener = consumerListener;
         this.threadId = topicName + "-" + "Thread " + id;
         this.kafkaProps = kafkaProps;
-
-        this.pollExceptionStrategy = KafkaErrorStrategies.strategies(this, kafkaConsumer.getEndpoint(), consumer);
     }
 
     @Override
@@ -257,6 +255,8 @@ public class KafkaFetchRecords implements Runnable {
                     }
                 }
             }
+
+            this.pollExceptionStrategy = KafkaErrorStrategies.strategies(this, kafkaConsumer.getEndpoint(), consumer);
         } finally {
             Thread.currentThread().setContextClassLoader(threadClassLoader);
         }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java
index 3541e08dbce..c02bf78d1eb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java
@@ -32,6 +32,8 @@ public final class KafkaErrorStrategies {
 
     public static PollExceptionStrategy strategies(
             KafkaFetchRecords recordFetcher, KafkaEndpoint endpoint, Consumer<?, ?> consumer) {
+        assert consumer != null;
+
         PollExceptionStrategy strategy = endpoint.getComponent().getPollExceptionStrategy();
         if (strategy != null) {
             return strategy;