You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/06/10 07:32:03 UTC

[camel] branch main updated: CAMEL-17424: do handle Authorization/Authentication issues when failling to poll

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 55df049a96f CAMEL-17424: do handle Authorization/Authentication issues when failling to poll
55df049a96f is described below

commit 55df049a96fd8f52265ef7e7a0cc9ca5a28ab6b3
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Jun 7 10:35:20 2022 +0200

    CAMEL-17424: do handle Authorization/Authentication issues when failling to poll
---
 .../kafka/consumer/errorhandler/BridgeErrorStrategy.java         | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
index fdc2c49b0fd..f8b00c92923 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
@@ -20,6 +20,8 @@ package org.apache.camel.component.kafka.consumer.errorhandler;
 import org.apache.camel.component.kafka.KafkaFetchRecords;
 import org.apache.camel.component.kafka.PollExceptionStrategy;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,6 +29,7 @@ public class BridgeErrorStrategy implements PollExceptionStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(BridgeErrorStrategy.class);
     private final KafkaFetchRecords recordFetcher;
     private final Consumer<?, ?> consumer;
+    private boolean continueFlag = true; // whether to continue polling or not
 
     public BridgeErrorStrategy(KafkaFetchRecords recordFetcher, Consumer<?, ?> consumer) {
         this.recordFetcher = recordFetcher;
@@ -35,7 +38,7 @@ public class BridgeErrorStrategy implements PollExceptionStrategy {
 
     @Override
     public boolean canContinue() {
-        return true;
+        return continueFlag;
     }
 
     @Override
@@ -46,5 +49,9 @@ public class BridgeErrorStrategy implements PollExceptionStrategy {
         recordFetcher.getBridge().handleException(exception);
         // skip this poison message and seek to next message
         SeekUtil.seekToNextOffset(consumer, partitionLastOffset);
+
+        if (exception instanceof AuthenticationException || exception instanceof AuthorizationException) {
+            continueFlag = false;
+        }
     }
 }