You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Luca Burgazzoli (Jira)" <ji...@apache.org> on 2022/12/05 21:11:00 UTC

[jira] [Created] (CAMEL-18796) camel-kafka: kafka consumer stops in case of an authentication issue

Luca Burgazzoli created CAMEL-18796:
---------------------------------------

             Summary: camel-kafka: kafka consumer stops in case of an authentication issue
                 Key: CAMEL-18796
                 URL: https://issues.apache.org/jira/browse/CAMEL-18796
             Project: Camel
          Issue Type: Bug
          Components: camel-kafka
    Affects Versions: 3.19.0, 3.18.0
            Reporter: Luca Burgazzoli


I'm running in a strange behavior of the camle-kafka component in case of a glitch/temporary authentication issue. Assuming we have the following code:

{code:java}
//usr/bin/env jbang "$0" "$@" ; exit $?
//
//DEPS io.quarkus.platform:quarkus-camel-bom:2.14.2.Final@pom
//DEPS org.apache.camel.quarkus:camel-quarkus-kafka
//DEPS org.apache.camel.quarkus:camel-quarkus-log
//DEPS org.apache.camel.quarkus:camel-quarkus-direct
//
//JAVAC_OPTIONS -parameters
//JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager
//

import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.builder.endpoint.EndpointRouteBuilder;

public class ck extends EndpointRouteBuilder {

    @Override
    public void configure() throws Exception {
        getCamelContext().adapt(ExtendedCamelContext.class)
            .setErrorHandlerFactory(
                deadLetterChannel("direct:dlq")
            );

        var kafka = kafka("demo")
            .brokers("{{test.kafka.broker}}")
            .autoOffsetReset("earliest")
            .securityProtocol("SASL_SSL")  
            .saslMechanism("PLAIN")
            .saslJaasConfig("org.apache.kafka.common.security.plain.PlainLoginModule required username='{{test.kafka.username}}' password='{{test.kafka.password}}';");      

        from("direct:dlq")
            .to("log:dlq?showAll=true&multiline=true");

        from(kafka)
            .to("log:kafka?showAll=true&multiline=true");
    }
}
{code}

What this route is doing is:
1. set-up a global error handler (send to a DLQ)
2. poll data from a kafka topic

If for some reason there is a glitch in the authentication machinery, then the KafkaConsumer thread is terminated and no more poll/reconnection attempt are made.

{code}
2022-12-05 21:52:48,728 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
2022-12-05 21:52:53,729 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
2022-12-05 21:52:58,730 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
2022-12-05 21:53:03,731 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
2022-12-05 21:53:08,732 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
2022-12-05 21:53:09,598 INFO  [org.apa.kaf.com.net.Selector] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) [Consumer clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1, groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Failed re-authentication with broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77 (channelId=2147483647) (Authentication failed: credentials for user could not be verified)
2022-12-05 21:53:09,602 ERROR [org.apa.kaf.cli.NetworkClient] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) [Consumer clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1, groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Connection to node 2147483647 (broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77:443) failed authentication due to: Authentication failed: credentials for user could not be verified
2022-12-05 21:53:09,605 WARN  [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Exception org.apache.kafka.common.errors.SaslAuthenticationException caught by thread demo-Thread 0 while polling topic demo from kafka: Authentication failed: credentials for user could not be verified: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified

2022-12-05 21:53:09,609 WARN  [org.apa.cam.com.kaf.con.err.BridgeErrorStrategy] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Deferring processing to the exception handler based on polling exception strategy
2022-12-05 21:53:09,624 DEBUG [org.apa.cam.pro.err.DeadLetterChannel] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Failed delivery for (MessageId: 386B9AF6D607152-0000000000000000 on ExchangeId: 386B9AF6D607152-0000000000000000). On delivery attempt: 0 caught: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified
2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) >>>> direct://dlq Exchange[386B9AF6D607152-0000000000000000]
2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) >>>> log://dlq?multiline=true&showAll=true Exchange[386B9AF6D607152-0000000000000000]
2022-12-05 21:53:09,629 INFO  [dlq] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Exchange[
  Id: 386B9AF6D607152-0000000000000000
  ExchangePattern: InOnly
  Properties: {CamelErrorHandlerBridge=true, CamelExceptionCaught=org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified, CamelFailureRouteId=route2, CamelFatalFallbackErrorHandler=[route2], CamelToEndpoint=log://dlq?multiline=true&showAll=true}
  Headers: {}
  BodyType: null
  Body: [Body is null]
  CaughtExceptionType: org.apache.kafka.common.errors.SaslAuthenticationException  CaughtExceptionMessage: Authentication failed: credentials for user could not be verified  StackTrace: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified

]
2022-12-05 21:53:09,635 INFO  [org.apa.cam.com.kaf.con.err.SeekUtil] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Consumer seeking to next offset 1 to continue polling next message from topic demo on partition 0
2022-12-05 21:53:09,636 DEBUG [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Closing consumer demo-Thread 0
2022-12-05 21:53:09,636 DEBUG [org.apa.cam.com.kaf.con.sup.PartitionAssignmentListener] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) onPartitionsRevoked: demo-Thread 0 from demo
2022-12-05 21:53:09,643 INFO  [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Terminating KafkaConsumer thread demo-Thread 0 receiving from topic demo
{code}
 
However according to the documentation, if the pollOnError is set to ERROR_HANDLER as in this case (it is the default), the strategy should use Camel’s error handler to process the exception, and afterwards continue to poll next message but this does not seems to be the case.

This seems to be somehow related to:
- https://issues.apache.org/jira/browse/CAMEL-17424
- https://github.com/apache/camel/commit/55df049a96fd8f52265ef7e7a0cc9ca5a28ab6b3





--
This message was sent by Atlassian Jira
(v8.20.10#820010)