You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Shankar Bhaskaran <ma...@gmail.com> on 2021/10/27 16:41:02 UTC

Re: Issue with Kafka consumers and producers on wildfly and SASL

Hi ,

I have a fix for this issue ,  how should i submit a patch ?

Regards,
Shankar

On Mon, Aug 30, 2021 at 3:40 AM Shankar Bhaskaran <ma...@gmail.com>
wrote:

> Hi ,
>
>
>
> I have set up a Kafka cluster on my linux machine secured using keycloak
> (OAUTHBEARER) Mechanism. I can use the Kafka Console Consumers and
> Producers to send and receive messages.
>
>
>
> I have tried to connect to Kafka from my consumers and producers deployed
> as module on the wildfly App serve (version 19, java 11) . I have set up
> all the required configuration (Config Section at the bottom) .
>
>
> The SASL_JAAS_CONFIG provided as consumerconfig option  has the details
> like (apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> required LoginStringClaim_sub='kafka-client');
>
>
>
> I am able to get authenticated with the broker , but in the client
> callback I am getting an Unsupported Callback error . I have 3 modules in
> wildfly
>
> 1) kafka producer consumer code dependent on the 2) oauth jar (for
> logincallbackhandler and login module)  dependent on the 3) kafka-client
> jar (2.8.0)]
>
>
>
> I can see that the CLIENT CALL BACK IS CLIENTCREDENTIAL INSTEAD OF
> OAuthBearerTokenCallback. The saslclient is getting set as
> AbstractSaslClient instead of OAuthBearerSaslClient.
>
>
>
> Can I get any pointers on this one ?
>
>
>
> LOGS
>
>
>
> rg.apache.kafka.common.errors.SaslAuthenticationException: An error:
> (java.security.PrivilegedActionException:
> javax.security.sasl.SaslException: ELY05176: Unsupported callback [Caused
> by javax.security.auth.callback.UnsupportedCallbackException]) occurred
> when evaluating SASL token received from the Kafka Broker. Kafka Client
> will go to AUTHENTICATION_FAILED state.
>
> Caused by: javax.security.sasl.SaslException: ELY05176: Unsupported
> callback [Caused by
> javax.security.auth.callback.UnsupportedCallbackException]
>
>                 at
> org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.mechanism.oauth2.OAuth2Client.getInitialResponse(OAuth2Client.java:58)
>
>                 at
> org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.sasl.oauth2.OAuth2SaslClient.evaluateMessage(OAuth2SaslClient.java:62)
>
>                 at
> org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.sasl.util.AbstractSaslParticipant.evaluateMessage(AbstractSaslParticipant.java:219)
>
>                 at
> org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.sasl.util.AbstractSaslClient.evaluateChallenge(AbstractSaslClient.java:98)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)
>
>                 at
> java.base/java.security.AccessController.doPrivileged(Native Method)
>
>                 at
> java.base/javax.security.auth.Subject.doAs(Subject.java:423)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.network.Selector.poll(Selector.java:481)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
>
>                 at
> org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>
>                 at
> com.lgc.common.core//com.lgc.dsl.notifications.consumer.DataChangeNoticeKafkaConsumer.poll(DataChangeNoticeKafkaConsumer.java:388)
>
>                 at
> com.lgc.common.core//com.lgc.dsds.notifications.producer.DataChangeNotificationProducer.consumeNotification(DataChangeNotificationProducer.java:204)
>
>                 at
> com.lgc.common.core//com.lgc.dsds.notifications.producer.DataChangeNotificationProducer.retrieveAndProcessNotificationObject(DataChangeNotificationProducer.java:106)
>
>                 at
> com.lgc.common.core//com.lgc.dsds.notifications.producer.DataChangeNotificationProducer.run(DataChangeNotificationProducer.java:75)
>
>                 at java.base/java.lang.Thread.run(Thread.java:834)
>
> Caused by:
> javax.security.auth.callback.UnsupportedCallbackException
>
>
>                 at
> com.lgc.common.koauth//com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler.handle(AuthOBearerSaslClientCallbackHandler.java:91)
>
>                 at
> org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.mechanism._private.MechanismUtil.handleCallbacks(MechanismUtil.java:156)
>
>                 at
> org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.mechanism.oauth2.OAuth2Client.getInitialResponse(OAuth2Client.java:56)
>
>                 ... 27 more
>
>
>
>
>
> *LOGS WHERE THE SSL HANDSHAKE IS SUCCESSFUL ,  THE CLIENT CALL BACK IS
> CLIENTCREDENTIAL INSTEAD OF OAuthBearerTokenCallback. The saslclient is
> getting set as AbstractSaslClient instead of OAuthBearerSaslClient*
>
>
>
> 21-08-29 16:21:25,756 DEBUG [io.undertow.request] (management I/O-1)
> Upgrading request HttpServerExchange{ GET /}
>
> 2021-08-29 16:21:25,760 DEBUG
> [org.apache.kafka.common.network.SslTransportLayer]
> (OWNotificationProducer) [SslTransportLayer channelId=-1
> key=channel=java.nio.channels.SocketChannel[connection-pending
> remote=i-10-134-194-96/10.134.194.96:9093],
> selector=sun.nio.ch.EPollSelectorImpl@50326a63, interestOps=8,
> readyOps=0] SSL handshake completed successfully with peerHost
> 'i-10-134-194-96' peerPort 9093 peerPrincipal 'CN=i-10-134-194-96, OU=Foo,
> O=acme corp, L=Duckburg, ST=Duckburg, C=WD' cipherSuite
> 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384'
>
> 2021-08-29 16:21:25,765 DEBUG
> [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator]
> (OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4,
> groupId=OpenWorksConsumer] Set SASL client state to
> RECEIVE_APIVERSIONS_RESPONSE
>
> 2021-08-29 16:21:25,766 DEBUG
> [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator]
> (OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4,
> groupId=OpenWorksConsumer] Set SASL client state to SEND_HANDSHAKE_REQUEST
>
> 2021-08-29 16:21:25,767 DEBUG [org.apache.kafka.clients.NetworkClient]
> (kafka-producer-network-thread | CommonKafkaProducer) [Producer
> clientId=CommonKafkaProducer] Give up sending metadata request since no
> node is available
>
> 2021-08-29 16:21:25,767 DEBUG
> [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator]
> (OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4,
> groupId=OpenWorksConsumer] Set SASL client state to
> RECEIVE_HANDSHAKE_RESPONSE
>
> 2021-08-29 16:21:25,768 DEBUG
> [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator]
> (OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4,
> groupId=OpenWorksConsumer] Set SASL client state to INITIAL
>
> 2021-08-29 16:21:25,769 INFO
> [com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler]
> (OWNotificationProducer) The class loaders are as follows ************
> Callbackclienthandler class
> com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler
> ModuleClassLoader for Module "com.lgc.common.koauth" from local module
> loader @6253c26 (finder: local module finder @49049a04 (roots:
> /opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))
>
> 2021-08-29 16:21:25,778 INFO
> [com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler]
> (OWNotificationProducer) The class loaders are as follows ************
> OAuthBearerTokenCallback class
> org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback
> ModuleClassLoader for Module "org.apache.kafka.clients" version 1.1.8.1
> from local module loader @6253c26 (finder: local module finder @49049a04
> (roots:
> /opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))
>
> 2021-08-29 16:21:25,787 INFO
> [com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler]
> (OWNotificationProducer) The class loaders are as follows ************
> OAuthBearerLoginModule class
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> ModuleClassLoader for Module "org.apache.kafka.clients" version 1.1.8.1
> from local module loader @6253c26 (finder: local module finder @49049a04
> (roots:
> /opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))
>
> 2021-08-29 16:21:25,800 DEBUG
> [org.jboss.jca.core.connectionmanager.pool.validator.ConnectionValidator]
> (ConnectionValidator) Notifying pools, interval: 500
>
> 2021-08-29 16:21:25,800 DEBUG
> [org.jboss.jca.core.connectionmanager.pool.strategy.PoolBySubject]
> (ConnectionValidator) Checking for connection within frequency
>
> 2021-08-29 16:21:25,796 INFO
> [com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler]
> (OWNotificationProducer) ******************* The callbacls are
> org.wildfly.security.auth.callback.CredentialCallback@2b29cf23 ClassName
> class
> org.wildfly.security.auth.callback.CredentialCallbackModuleClassLoader for
> Module "org.wildfly.security.elytron-private" version 1.11.4.Final from
> local module loader @6253c26 (finder: local module finder @49049a04 (roots:
> /opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))
>
> 2021-08-29 16:21:25,803 DEBUG
> [org.jboss.jca.core.connectionmanager.pool.strategy.PoolBySubject]
> (ConnectionValidator) Returning for connection within frequency
>
> 2021-08-29 16:21:25,803 DEBUG
> [org.jboss.jca.core.connectionmanager.pool.strategy.PoolBySubject]
> (ConnectionValidator) Checking for connection within frequency
>
> 2021-08-29 16:21:25,805 INFO  [org.apache.kafka.common.network.Selector]
> (OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4,
> groupId=OpenWorksConsumer] Failed authentication with i-10-134-194-96/
> 10.134.194.96 (An error: (java.security.PrivilegedActionException:
> javax.security.sasl.SaslException: ELY05176: Unsupported callback [Caused
> by javax.security.auth.callback.UnsupportedCallbackException]) occurred
> when evaluating SASL token received from the Kafka Broker. Kafka Client
> will go to AUTHENTICATION_FAILED state.)
>
> 2021-08-29 16:21:25,809 DEBUG [org.apache.kafka.clients.NetworkClient]
> (OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4,
> groupId=OpenWorksConsumer] Node -1 disconnected.
>
>
>
>
>
> *Config *
>
>
>
> 2021-08-29 16:21:25,119 INFO
> [org.apache.kafka.clients.consumer.ConsumerConfig] (OWNotificationProducer)
> ConsumerConfig values:
>
>                 allow.auto.create.topics = true
>
>                 auto.commit.interval.ms = 5000
>
>                 auto.offset.reset = latest
>
>                 bootstrap.servers = [i-10-134-194-96:9093]
>
>                 check.crcs = true
>
>                 client.dns.lookup = use_all_dns_ips
>
>                 client.id = consumer-OpenWorksConsumer-2
>
>                 client.rack =
>
>                 connections.max.idle.ms = 540000
>
>                 default.api.timeout.ms = 60000
>
>                 enable.auto.commit = false
>
>                 exclude.internal.topics = true
>
>                 fetch.max.bytes = 52428800
>
>                 fetch.max.wait.ms = 500
>
>                 fetch.min.bytes = 1
>
>                 group.id = OpenWorksConsumer
>
>                 group.instance.id = null
>
>                 heartbeat.interval.ms = 3000
>
>                 interceptor.classes = []
>
>                 internal.leave.group.on.close = true
>
>                 internal.throw.on.fetch.stable.offset.unsupported = false
>
>                 isolation.level = read_uncommitted
>
>                 key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
>
>                 max.partition.fetch.bytes = 1048576
>
>                 max.poll.interval.ms = 330000
>
>                 max.poll.records = 100
>
>                 metadata.max.age.ms = 300000
>
>                 metric.reporters = []
>
>                 metrics.num.samples = 2
>
>                 metrics.recording.level = INFO
>
>                 metrics.sample.window.ms = 30000
>
>                 partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor]
>
>                 receive.buffer.bytes = 65536
>
>                 reconnect.backoff.max.ms = 1000
>
>                 reconnect.backoff.ms = 50
>
>                 request.timeout.ms = 30000
>
>                 retry.backoff.ms = 100
>
>                 sasl.client.callback.handler.class = class
> com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler
>
>                 sasl.jaas.config = [hidden]
>
>                 sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
>                 sasl.kerberos.min.time.before.relogin = 60000
>
>                 sasl.kerberos.service.name = null
>
>                 sasl.kerberos.ticket.renew.jitter = 0.05
>
>                 sasl.kerberos.ticket.renew.window.factor = 0.8
>
>                 sasl.login.callback.handler.class = class
> com.oauth2.security.oauthbearer.OAuthAuthenticateLoginCallbackHandler
>
>                 sasl.login.class = null
>
>                 sasl.login.refresh.buffer.seconds = 300
>
>                 sasl.login.refresh.min.period.seconds = 60
>
>                 sasl.login.refresh.window.factor = 0.8
>
>                 sasl.login.refresh.window.jitter = 0.05
>
>                 sasl.mechanism = OAUTHBEARER
>
>                 security.protocol = SASL_SSL
>
>                 security.providers = null
>
>                 send.buffer.bytes = 131072
>
>                 session.timeout.ms = 60000
>
>                 socket.connection.setup.timeout.max.ms = 30000
>
>                 socket.connection.setup.timeout.ms = 10000
>
>                 ssl.cipher.suites = null
>
>                 ssl.enabled.protocols = [TLSv1.2]
>
>                 ssl.endpoint.identification.algorithm =
>
>                 ssl.engine.factory.class = null
>
>                 ssl.key.password = null
>
>                 ssl.keymanager.algorithm = SunX509
>
>                 ssl.keystore.certificate.chain = null
>
>                 ssl.keystore.key = null
>
>                 ssl.keystore.location = null
>
>                 ssl.keystore.password = null
>
>                 ssl.keystore.type = JKS
>
>                 ssl.protocol = TLSv1.2
>
>                 ssl.provider = null
>
>                 ssl.secure.random.implementation = null
>
>                 ssl.trustmanager.algorithm = PKIX
>
>                 ssl.truststore.certificates = null
>
>                 ssl.truststore.location =
> /opt/Landmark/new_certs/securityserver.keystore
>
>                 ssl.truststore.password = null
>
>                 ssl.truststore.type = JKS
>
>                 value.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
>
>
>
> Regards,
>
> Shankar
>