You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (Jira)" <ji...@apache.org> on 2022/12/13 08:05:00 UTC

[jira] [Resolved] (CAMEL-18796) camel-kafka: kafka consumer stops in case of an authentication issue

     [ https://issues.apache.org/jira/browse/CAMEL-18796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen resolved CAMEL-18796.
---------------------------------
    Resolution: Fixed

> camel-kafka: kafka consumer stops in case of an authentication issue
> --------------------------------------------------------------------
>
>                 Key: CAMEL-18796
>                 URL: https://issues.apache.org/jira/browse/CAMEL-18796
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.18.0, 3.19.0
>            Reporter: Luca Burgazzoli
>            Assignee: Otavio Rodolfo Piske
>            Priority: Major
>             Fix For: 3.18.5, 3.20.0
>
>
> I'm running in a strange behavior of the camle-kafka component in case of a glitch/temporary authentication issue. Assuming we have the following code:
> {code:java}
> //usr/bin/env jbang "$0" "$@" ; exit $?
> //
> //DEPS io.quarkus.platform:quarkus-camel-bom:2.14.2.Final@pom
> //DEPS org.apache.camel.quarkus:camel-quarkus-kafka
> //DEPS org.apache.camel.quarkus:camel-quarkus-log
> //DEPS org.apache.camel.quarkus:camel-quarkus-direct
> //
> //JAVAC_OPTIONS -parameters
> //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager
> //
> import org.apache.camel.ExtendedCamelContext;
> import org.apache.camel.builder.endpoint.EndpointRouteBuilder;
> public class ck extends EndpointRouteBuilder {
>     @Override
>     public void configure() throws Exception {
>         getCamelContext().adapt(ExtendedCamelContext.class)
>             .setErrorHandlerFactory(
>                 deadLetterChannel("direct:dlq")
>             );
>         var kafka = kafka("demo")
>             .brokers("{{test.kafka.broker}}")
>             .autoOffsetReset("earliest")
>             .securityProtocol("SASL_SSL")  
>             .saslMechanism("PLAIN")
>             .saslJaasConfig("org.apache.kafka.common.security.plain.PlainLoginModule required username='{{test.kafka.username}}' password='{{test.kafka.password}}';");      
>         from("direct:dlq")
>             .to("log:dlq?showAll=true&multiline=true");
>         from(kafka)
>             .to("log:kafka?showAll=true&multiline=true");
>     }
> }
> {code}
> What this route is doing is:
> 1. set-up a global error handler (send to a DLQ)
> 2. poll data from a kafka topic
> If for some reason there is a glitch in the authentication machinery, then the KafkaConsumer thread is terminated and no more poll/reconnection attempt are made.
> {code}
> 2022-12-05 21:52:48,728 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
> 2022-12-05 21:52:53,729 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
> 2022-12-05 21:52:58,730 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
> 2022-12-05 21:53:03,731 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
> 2022-12-05 21:53:08,732 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
> 2022-12-05 21:53:09,598 INFO  [org.apa.kaf.com.net.Selector] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) [Consumer clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1, groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Failed re-authentication with broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77 (channelId=2147483647) (Authentication failed: credentials for user could not be verified)
> 2022-12-05 21:53:09,602 ERROR [org.apa.kaf.cli.NetworkClient] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) [Consumer clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1, groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Connection to node 2147483647 (broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77:443) failed authentication due to: Authentication failed: credentials for user could not be verified
> 2022-12-05 21:53:09,605 WARN  [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Exception org.apache.kafka.common.errors.SaslAuthenticationException caught by thread demo-Thread 0 while polling topic demo from kafka: Authentication failed: credentials for user could not be verified: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified
> 2022-12-05 21:53:09,609 WARN  [org.apa.cam.com.kaf.con.err.BridgeErrorStrategy] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Deferring processing to the exception handler based on polling exception strategy
> 2022-12-05 21:53:09,624 DEBUG [org.apa.cam.pro.err.DeadLetterChannel] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Failed delivery for (MessageId: 386B9AF6D607152-0000000000000000 on ExchangeId: 386B9AF6D607152-0000000000000000). On delivery attempt: 0 caught: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified
> 2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) >>>> direct://dlq Exchange[386B9AF6D607152-0000000000000000]
> 2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) >>>> log://dlq?multiline=true&showAll=true Exchange[386B9AF6D607152-0000000000000000]
> 2022-12-05 21:53:09,629 INFO  [dlq] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Exchange[
>   Id: 386B9AF6D607152-0000000000000000
>   ExchangePattern: InOnly
>   Properties: {CamelErrorHandlerBridge=true, CamelExceptionCaught=org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified, CamelFailureRouteId=route2, CamelFatalFallbackErrorHandler=[route2], CamelToEndpoint=log://dlq?multiline=true&showAll=true}
>   Headers: {}
>   BodyType: null
>   Body: [Body is null]
>   CaughtExceptionType: org.apache.kafka.common.errors.SaslAuthenticationException  CaughtExceptionMessage: Authentication failed: credentials for user could not be verified  StackTrace: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified
> ]
> 2022-12-05 21:53:09,635 INFO  [org.apa.cam.com.kaf.con.err.SeekUtil] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Consumer seeking to next offset 1 to continue polling next message from topic demo on partition 0
> 2022-12-05 21:53:09,636 DEBUG [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Closing consumer demo-Thread 0
> 2022-12-05 21:53:09,636 DEBUG [org.apa.cam.com.kaf.con.sup.PartitionAssignmentListener] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) onPartitionsRevoked: demo-Thread 0 from demo
> 2022-12-05 21:53:09,643 INFO  [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Terminating KafkaConsumer thread demo-Thread 0 receiving from topic demo
> {code}
>  
> However according to the documentation, if the pollOnError is set to ERROR_HANDLER as in this case (it is the default), the strategy should use Camel’s error handler to process the exception, and afterwards continue to poll next message but this does not seems to be the case.
> This seems to be somehow related to:
> - https://issues.apache.org/jira/browse/CAMEL-17424
> - https://github.com/apache/camel/commit/55df049a96fd8f52265ef7e7a0cc9ca5a28ab6b3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)