You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/06/29 13:51:23 UTC

[camel] branch camel-3.20.x updated: CAMEL-19285: prevent Kafka client from entering an endless loop

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.20.x by this push:
     new d6477f57174 CAMEL-19285: prevent Kafka client from entering an endless loop
d6477f57174 is described below

commit d6477f571747206160c8fd7835c1080400355dea
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Jun 29 13:00:36 2023 +0200

    CAMEL-19285: prevent Kafka client from entering an endless loop
    
    When an authentication error is thrown by Kafka, this could cause an endless loop in the client. Potentially affecting the Kafka broker due to an excessive connection retries in a short period.
    
    This prevents authentication related errors to cause this.
---
 .../consumer/errorhandler/ReconnectErrorStrategy.java  | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
index 77036a9aa07..6af2781f01c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.errorhandler;
 
 import org.apache.camel.component.kafka.KafkaFetchRecords;
 import org.apache.camel.component.kafka.PollExceptionStrategy;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,11 +44,19 @@ public class ReconnectErrorStrategy implements PollExceptionStrategy {
 
     @Override
     public void handle(long partitionLastOffset, Exception exception) {
-        LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy");
+        if (exception instanceof AuthenticationException) {
+            LOG.warn("Kafka reported a non-recoverable authentication error. The client will not reconnect");
+
+            // disable reconnect: authentication errors are non-recoverable
+            recordFetcher.setReconnect(false);
+            recordFetcher.setConnected(false);
+        } else {
+            LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy");
 
-        // re-connect so the consumer can try the same message again
-        recordFetcher.setReconnect(true);
-        recordFetcher.setConnected(false);
+            // re-connect so the consumer can try the same message again
+            recordFetcher.setReconnect(true);
+            recordFetcher.setConnected(false);
+        }
 
         // to close the current consumer
         retry = false;