You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/06/10 07:32:03 UTC
[camel] branch main updated: CAMEL-17424: do handle Authorization/Authentication issues when failling to poll
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 55df049a96f CAMEL-17424: do handle Authorization/Authentication issues when failling to poll
55df049a96f is described below
commit 55df049a96fd8f52265ef7e7a0cc9ca5a28ab6b3
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Jun 7 10:35:20 2022 +0200
CAMEL-17424: do handle Authorization/Authentication issues when failling to poll
---
.../kafka/consumer/errorhandler/BridgeErrorStrategy.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
index fdc2c49b0fd..f8b00c92923 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
@@ -20,6 +20,8 @@ package org.apache.camel.component.kafka.consumer.errorhandler;
import org.apache.camel.component.kafka.KafkaFetchRecords;
import org.apache.camel.component.kafka.PollExceptionStrategy;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.AuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,6 +29,7 @@ public class BridgeErrorStrategy implements PollExceptionStrategy {
private static final Logger LOG = LoggerFactory.getLogger(BridgeErrorStrategy.class);
private final KafkaFetchRecords recordFetcher;
private final Consumer<?, ?> consumer;
+ private boolean continueFlag = true; // whether to continue polling or not
public BridgeErrorStrategy(KafkaFetchRecords recordFetcher, Consumer<?, ?> consumer) {
this.recordFetcher = recordFetcher;
@@ -35,7 +38,7 @@ public class BridgeErrorStrategy implements PollExceptionStrategy {
@Override
public boolean canContinue() {
- return true;
+ return continueFlag;
}
@Override
@@ -46,5 +49,9 @@ public class BridgeErrorStrategy implements PollExceptionStrategy {
recordFetcher.getBridge().handleException(exception);
// skip this poison message and seek to next message
SeekUtil.seekToNextOffset(consumer, partitionLastOffset);
+
+ if (exception instanceof AuthenticationException || exception instanceof AuthorizationException) {
+ continueFlag = false;
+ }
}
}