You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/06/29 13:50:48 UTC

[camel] branch camel-3.x updated: CAMEL-19285: prevent Kafka client from entering an endless loop

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.x by this push:
     new 3f9b2020d9f CAMEL-19285: prevent Kafka client from entering an endless loop
3f9b2020d9f is described below

commit 3f9b2020d9f5faef80ae3519f60ed0c5dd4f329a
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Jun 29 13:00:36 2023 +0200

    CAMEL-19285: prevent Kafka client from entering an endless loop
    
    When an authentication error is thrown by Kafka, this could cause an endless loop in the client. Potentially affecting the Kafka broker due to an excessive connection retries in a short period.
    
    This prevents authentication related errors to cause this.
---
 .../consumer/errorhandler/ReconnectErrorStrategy.java  | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
index 77036a9aa07..6af2781f01c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.errorhandler;
 
 import org.apache.camel.component.kafka.KafkaFetchRecords;
 import org.apache.camel.component.kafka.PollExceptionStrategy;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,11 +44,19 @@ public class ReconnectErrorStrategy implements PollExceptionStrategy {
 
     @Override
     public void handle(long partitionLastOffset, Exception exception) {
-        LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy");
+        if (exception instanceof AuthenticationException) {
+            LOG.warn("Kafka reported a non-recoverable authentication error. The client will not reconnect");
+
+            // disable reconnect: authentication errors are non-recoverable
+            recordFetcher.setReconnect(false);
+            recordFetcher.setConnected(false);
+        } else {
+            LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy");
 
-        // re-connect so the consumer can try the same message again
-        recordFetcher.setReconnect(true);
-        recordFetcher.setConnected(false);
+            // re-connect so the consumer can try the same message again
+            recordFetcher.setReconnect(true);
+            recordFetcher.setConnected(false);
+        }
 
         // to close the current consumer
         retry = false;