You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "RivenSun (Jira)" <ji...@apache.org> on 2021/12/28 10:13:00 UTC

[jira] [Commented] (KAFKA-13422) Even if the correct username and password are configured, when ClientBroker or KafkaClient tries to establish a SASL connection to ServerBroker, an exception is thrown: (Authentication failed: Invalid username or password)

    [ https://issues.apache.org/jira/browse/KAFKA-13422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17466057#comment-17466057 ] 

RivenSun commented on KAFKA-13422:
----------------------------------

Hi [~rsivaram]  [~ijuma] , [~guozhang] 

can you give any suggestions?
Thanks.

> Even if the correct username and password are configured, when ClientBroker or KafkaClient tries to establish a SASL connection to ServerBroker, an exception is thrown: (Authentication failed: Invalid username or password)
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13422
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13422
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, core
>    Affects Versions: 2.7.1, 3.0.0
>            Reporter: RivenSun
>            Priority: Major
>         Attachments: CustomerAuthCallbackHandler.java, LoginContext_login_debug.png, SaslClientCallbackHandler_handle_debug.png
>
>
>  
> h1. Foreword:
> When deploying a Kafka cluster with a higher version (2.7.1), I encountered an exception of communication identity authentication failure between brokers. In the current latest version 3.0.0, this problem can also be reproduced.
> h1. Problem recurring:
> h2. 1)broker Version is 3.0.0
> h3. The content of kafka_server_jaas.conf of each broker is exactly the same, the content is as follows:
>  
>  
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="admin_scram_password";
>  
> };
> {code}
>  
>  
> h3. broker server.properties:
> One of the broker configuration files is provided, and the content of the configuration files of other brokers is only different from the localPublicIp of advertised.listeners.
>  
> {code:java}
> broker.id=1
> broker.rack=us-east-1a
> advertised.listeners=SASL_PLAINTEXT://localPublicIp:9779,SASL_SSL://localPublicIp:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://localPublicIp:9669
> log.dirs=/asyncmq/kafka/data_1,/asyncmq/kafka/data_2
> zookeeper.connect=***
> listeners=SASL_PLAINTEXT://:9779,SASL_SSL://:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://:9669
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL
> listener.name.plain_plugin_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler
> #ssl config
> ssl.keystore.password=***
> ssl.key.password=***
> ssl.truststore.password=***
> ssl.keystore.location=***
> ssl.truststore.location=***
> ssl.client.auth=none
> ssl.endpoint.identification.algorithm=
> #broker communicate config
> #security.inter.broker.protocol=SASL_PLAINTEXT
> inter.broker.listener.name=INTERNAL_SSL
> sasl.mechanism.inter.broker.protocol=PLAIN
> #sasl authentication config
> sasl.kerberos.service.name=kafka
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI
> delegation.token.master.key=***
> delegation.token.expiry.time.ms=86400000
> delegation.token.max.lifetime.ms=3153600000000
> {code}
>  
>  
> Then start all brokers at the same time. Each broker has actually been started successfully, but when establishing a connection between the controller node and all brokers, the identity authentication has always failed. The connection between brokers cannot be established normally, causing the entire Kafka cluster to be unable to provide external services.
> h3. The server log keeps printing abnormally like crazy:
> The real ip sensitive information of the broker in the log, I use ****** instead of here
>  
> {code:java}
> [2021-10-29 14:16:19,831] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] Started socket server acceptors and processors (kafka.network.SocketServer)
> [2021-10-29 14:16:19,836] INFO Kafka version: 3.0.0 (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka commitId: 8cb0a5e9d3441962 (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka startTimeMs: 1635516979831 (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,837] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)
> [2021-10-29 14:16:20,249] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] Failed authentication with /****** (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 14:16:20,680] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] Failed authentication with /****** (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 14:16:21,109] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] Failed authentication with /****** (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector)
> {code}
>  
>  
> h2. 2)Try to change the password of the PlainLoginModule communication between brokers in kafka_server_jaas.conf
> change is kJTVDziatPgjXG82sFHc4O1EIuewmlvS --> DziatPgjXG82sFHc4O1EIuewmlvS
>  
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="DziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="DziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="admin_scram_password";
>  
> };
> {code}
>  
>  
>  
> Restart all broker machines and find that connections can be established normally between the controller and all brokers, which can be verified by the netstat -anp | grep 9009 command
> Real ip sensitive information, I use ****** instead
>  
> {code:java}
> [root@ip-10-30-0-64 kafka]# netstat -anp | grep 9009
> tcp6       0      0 :::9009                 :::*                    LISTEN      24852/java          
> tcp6       0      0 ******:47502        ******:9009         ESTABLISHED 24852/java          
> tcp6       0      0 ******:9009         ******:41164        ESTABLISHED 24852/java          
> tcp6       0      0 ******:9009         ******:41168        ESTABLISHED 24852/java 
> {code}
>  
> The entire cluster can provide external services, which can be verified by creating a topic through the script bin/kafka-topics.sh.
> h1. Preliminary guess:
> 1)Does the admin password kJTVDziatPgjXG82sFHc4O1EIuewmlvS contain special characters not allowed by Kafka password?
> 2)Whether the content of the kafka_server_jaas.conf file does not conform to the standard format of the Kafka official website?
> 3)Whether the end newline character of each line in kafka_server_jaas.conf does not conform to the newline character of the Linux system?
> After consulting data and analysis, the above conjectures are not correct
> 1)kJTVDziatPgjXG82sFHc4O1EIuewmlvS does not contain special characters not allowed by Kafka password
> 2)The content of the kafka_server_jaas.conf file conforms to the standard format of Kafka official website, please refer to[https://kafka.apache.org/documentation/#security_sasl]
> 3)Open the kafka_server_jaas.conf file through the vim command, and through the :set list command, you can see that the end newline character of each line is the standard linux system file newline character
>  
> So in order to improve the analysis of the reasons, try to think that the log keeps outputting Invalid username or password, what is the username&password passed by ClientBroker? What is the username&password expected by ServerBroker?
>  
> h1. sasl.server.callback.handler.class implement
>  
> Refer to the [sasl.server.callback.handler.class|https://kafka.apache.org/documentation/#brokerconfigs_sasl.server.callback.handler.class] parameter and write the implementation class of the interface AuthenticateCallbackHandler :CustomerAuthCallbackHandler.
> In fact, the code implementation of CustomerAuthCallbackHandler is almost exactly the same as Kafka's own default implementation class PlainServerCallbackHandler, except that the log output is temporarily added to the native authenticate method. The complete code is included in the attachment"CustomerAuthCallbackHandler.java"
>   
> {code:java}
> private final static Logger log = LoggerFactory.getLogger(CustomerAuthCallbackHandler.class);
> ......
> protected boolean authenticate(String username, char[] password) throws IOException {
>         if (username == null)
>             return false;
>         else {
>             String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries,
>                     JAAS_USER_PREFIX + username,
>                     PlainLoginModule.class.getName());
>             boolean authenticateSuccess = expectedPassword != null && Utils.isEqualConstantTime(password, expectedPassword.toCharArray());
>             log.info("CustomerAuthCallbackHandler authenticate [{}] | user [{}] password is [{}] , expectedPassword is [{}] ", authenticateSuccess, username, new String(password), expectedPassword);
>             return authenticateSuccess;
>         }
>     }
> {code}
>  
>  
>  
>  
> Each broker's configuration file server.properties adds a new line of configuration
>  
> {code:java}
> listener.name.internal_ssl.plain.sasl.server.callback.handler.class=us.zoom.mq.security.plain.CustomerAuthCallbackHandler
> {code}
>  
> Rollback the password of admin in the content of kafka_server_jaas.conf, the content is as follows:
>  
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="admin_scram_password";
>  
> };
> {code}
>  
>  
> Restart all brokers, you can observe the log:
> The real ip sensitive information of the broker in the log, I use ****** instead of here
>  
> {code:java}
> [2021-10-29 15:31:09,886] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] Started data-plane acceptor and processor(s) for endpoint : ListenerName(SASL_PLAINTEXT) (kafka.network.SocketServer)
> [2021-10-29 15:31:09,925] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAIN_PLUGIN_SSL) (kafka.network.SocketServer)
> [2021-10-29 15:31:09,926] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] Started socket server acceptors and processors (kafka.network.SocketServer)
> [2021-10-29 15:31:09,932] INFO Kafka version: 3.0.0 (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 15:31:09,932] INFO Kafka commitId: 8cb0a5e9d3441962 (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 15:31:09,932] INFO Kafka startTimeMs: 1635521469926 (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 15:31:09,933] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)
> [2021-10-29 15:31:10,305] INFO CustomerAuthCallbackHandler authenticate [false] | user [admin] password is [admin_scram_password] , expectedPassword is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS]  (us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
> [2021-10-29 15:31:10,306] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] Failed authentication with /****** (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 15:31:10,734] INFO CustomerAuthCallbackHandler authenticate [false] | user [admin] password is [admin_scram_password] , expectedPassword is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS]  (us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
> [2021-10-29 15:31:10,735] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] Failed authentication with /****** (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 15:31:11,165] INFO CustomerAuthCallbackHandler authenticate [false] | user [admin] password is [admin_scram_password] , expectedPassword is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS]  (us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
> [2021-10-29 15:31:11,165] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] Failed authentication with /****** (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 15:31:11,596] INFO CustomerAuthCallbackHandler authenticate [false] | user [admin] password is [admin_scram_password] , expectedPassword is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS]  (us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
> {code}
>  
>  
> Through the log, we clearly know that the reason for Authentication failed is that the username passed by ClientBroker is correct when the connection between brokers is established, but the password and expectedPassword do not match, and the value of password is configured in the ScramLoginModule in the kafka_server_jaas.conf file password.
> At this point of analysis, we can only initially understand that the password value passed by ClientBroker is wrong, but we still don't know the reason for the wrong password. We can only continue to analyze RC by reading the source code of the Kafka server startup process.
>  
> h1. Kafka server Startup process source code analysis
> The startup entry is in the core project, the main method of the scala class kafka.Kafka
> KafkaServer's startup(), this method completes the initialization of many key modules, such as logManager, socketServer, kafkaController, tokenManager, groupCoordinator and other modules
> In the startup() method of KafkaServer, and the key pieces of code for this problem as follows:
> 1)socketServer.startup(startProcessingRequests = false)
> This method completes the creation of Acceptor and Processors for ControlPlane and DataPlane
>  
> 2)socketServer.startProcessingRequests(authorizerFutures)
> This method completes the start of Acceptor and Processors for ControlPlane and DataPlane
>  
> 3)Further trace to the startAcceptorAndProcessors method in SocketServer
> This method will start all Processors threads for each listener.name
> See method startProcessors(processors: Seq[Processor], processorThreadPrefix: String)
>  
> 4)Analyze configureNewConnections() in the run method of the Processor thread class
> There is a line of key code in this methodselector.register(connectionId(channel.socket), channel)
> Continue to analyze the code at the bottom
> registerChannel(id, socketChannel, SelectionKey.OP_READ) -->
> KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key) -->
> SaslChannelBuilder.buildChannel(...)
>  
> 5)The method buildChannel(...) of SaslChannelBuilder
> In this method, the type of Mode determines whether it is buildServerAuthenticator or buildClientAuthenticator
> The most critical difference between the two methods is that the former is createSaslServer and the latter is createSaslClient.
> SaslServer is responsible for ServerBroker verifying user passwords, and the logic of PlainLoginModule verifying passwords can refer to the default implementation class PlainServerCallbackHandler
> SaslClient is responsible for obtaining user passwords on KafkaClient
> So our focus should be buildClientAuthenticator, so the value of the instance variable Mode mode of SaslChannelBuilder should be CLIENT.
> So we can go back and trace the construction method :
> SaslChannelBuilder public SaslChannelBuilder(...)
> 6)the construction method public SaslChannelBuilder(...)
> This method is only called in the private static ChannelBuilder create(...) method of ChannelBuilders,
> Continue to trace up to static ChannelBuilder clientChannelBuilder(...)
> In fact, traced here, you can find that the caller of the clientChannelBuilder(...) method is either ClientBroker or ClientUtils class. The latter is for KafkaProducer/KafkaConsumer/KafkaAdminClient
> So in order to further analyze the root cause, I decided to use KafkaProducer to simulate ClientBroker request connection. Their underlying mechanisms are almost the same, except that some configuration items are only supported by ClientBroker or the section of static JAAS configuration is different.
> h1. Source code analysis of KafkaProducer's Sasl authentication process
>  
> ClientUtils#clientChannelBuilder(…) → ChannelBuilder#create(...) → SaslChannelBuilder#void configure(Map<String, ?> configs)
> The key operations completed in the above methods
> 1)JaasContext load
> If the kafka_Client_jaas.conf file is specified through the java.security.auth.login.config environment variable, the method of loading into JaasContext is the method getAppConfigurationEntry(String var1) in sun.security.provider.ConfigFile class.
>  
> {code:java}
>  public AppConfigurationEntry[] getAppConfigurationEntry(String var1) {
>         return this.spi.engineGetAppConfigurationEntry(var1);
>     }
> {code}
>  
>  
> 2)createClientCallbackHandler(Map<String, ?> configs)
> The default ClientCallbackHandler implementation class is SaslClientCallbackHandler. The role of SaslClientCallbackHandler is that KafkaClient takes the username and password from the saved authentication credentials in Subject.java in the LoginManager of its own Channel. *This is particularly critical, and we will analyze it in detail later.*
> 3)Initialization of LoginManager and loading of LoginContext
> Here we directly trace the construction method of LoginManager
> private LoginManager(...) → AbstractLogin#login()
> Then there are two key operations in the AbstractLogin#login() method
>  
> {code:java}
>     @Override
>     public LoginContext login() throws LoginException {
>         loginContext = new LoginContext(contextName, null, loginCallbackHandler, configuration);
>         loginContext.login();
>         log.info("Successfully logged in.");
>         return loginContext;
>     }
> {code}
>  
> The role of new LoginContext(...) is
> (1)Get the configuration variables in JaasContext, where all the credentials of kafka_Client_jaas.conf are stored
> (2)Complete the initialization of the instance variable moduleStack array
>  
> loginContext.login() → invokePriv(LOGIN_METHOD) → invoke(String methodName)
> (1)In these methods
>  Updated the Object module field of each element of the moduleStack array
> (2) Obtain all public methods of each type of LoginModule class through reflection
>  
> {code:java}
>   methods = moduleStack[i].module.getClass().getMethods();
> {code}
>  
> And get the index of the initialize method in the methods array through the INIT_METHOD constant
> Then execute the initialize method of each type of LoginModule class through reflection
>  
> {code:java}
>   methods[mIndex].invoke(moduleStack[i].module, initArgs);
> {code}
>  
> This method is very important. In this method invokePriv(LOGIN_METHOD), by looping through the moduleStack variables, LoginContext will execute all the initialize methods of the LoginModule class you configured, and the initialize methods of PlainLoginModule and ScramLoginModule will load the current username&password configured in kafka_Client_jaas.conf into the Subject of the LoginManager corresponding to the Channel. The data structure of the two fields storing username and password in Subject are both SecureSet, and in SecureSet, LinkedList is used to store the corresponding elements. *So the key point: the order of the elements in the two Credentials in the Subject must be added in the order of all LoginModules in kafka_Client_jaas.conf*
> 4)Let's go back and analyze the SaslClientCallbackHandler mentioned in the second step above
> Where is this class used? In fact, KafkaClient is ready to initiate the operation of establishing a connection. ClientFactoryImpl#createSaslClient(...) will use SaslClientCallbackHandler.
> The stack of the method is:
> NetworkClient#initiateConnect(Node node, long now)
> -->
> Selector#connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize)
> -->
> Selector#registerChannel(String id, SocketChannel socketChannel, int interestedOps)
> -->
> Selector#buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, SelectionKey key)
> -->
> SaslChannelBuilder#buildChannel(...) –> SaslChannelBuilder#buildClientAuthenticator
> -->
> SaslClientAuthenticator#SaslClient createSaslClient()
> -->
> Sasl.createSaslClient(...)
> -->
> ClientFactoryImpl 的createSaslClient(...) .
> The source code of this method ClientFactoryImpl#createSaslClient(...) is as follows:
> {code:java}
> public SaslClient createSaslClient(String[] var1, String var2, String var3, String var4, Map<String, ?> var5, CallbackHandler var6) throws SaslException {
>     for(int var7 = 0; var7 < var1.length; ++var7) {
>         if (var1[var7].equals(myMechs[0]) && PolicyUtils.checkPolicy(mechPolicies[0], var5)) {
>             return new ExternalClient(var2);
>         }
>         Object[] var8;
>         if (var1[var7].equals(myMechs[1]) && PolicyUtils.checkPolicy(mechPolicies[1], var5)) {
>             var8 = this.getUserInfo("CRAM-MD5", var2, var6);
>             return new CramMD5Client((String)var8[0], (byte[])((byte[])var8[1]));
>         }
>         if (var1[var7].equals(myMechs[2]) && PolicyUtils.checkPolicy(mechPolicies[2], var5)) {
>             var8 = this.getUserInfo("PLAIN", var2, var6);
>             return new PlainClient(var2, (String)var8[0], (byte[])((byte[])var8[1]));
>         }
>     }
>     return null;
> }
> {code}
>  
>  
> You can see the CallbackHandler passed in from the upper layer, which is the parameter var6. In fact, it is Kafka's own SaslClientCallbackHandler. Then continue to look at the source code of getUserInfo. You can clearly see that NameCallback and PasswordCallback are constructed, and the handle method of SaslClientCallbackHandler is executed: var3.handle(new Callback[]\{var6, var7});
>  
> {code:java}
> private Object[] getUserInfo(String var1, String var2, CallbackHandler var3) throws SaslException {
>         if (var3 == null) {
>             throw new SaslException("Callback handler to get username/password required");
>         } else {
>             try {
>                 String var4 = var1 + " authentication id: ";
>                 String var5 = var1 + " password: ";
>                 NameCallback var6 = var2 == null ? new NameCallback(var4) : new NameCallback(var4, var2);
>                 PasswordCallback var7 = new PasswordCallback(var5, false);
>                 var3.handle(new Callback[]{var6, var7});
>                 char[] var8 = var7.getPassword();
>                 byte[] var9;
>                 if (var8 != null) {
>                     var9 = (new String(var8)).getBytes("UTF8");
>                     var7.clearPassword();
>                 } else {
>                     var9 = null;
>                 }
>                 String var10 = var6.getName();
>                 return new Object[]{var10, var9};
>             } catch (IOException var11) {
>                 throw new SaslException("Cannot get password", var11);
>             } catch (UnsupportedCallbackException var12) {
>                 throw new SaslException("Cannot get userid/password", var12);
>             }
>         }
>     }
> {code}
>  
>  
>  
> The most important role of SaslClientCallbackHandler is its handle(Callback[] callbacks) method, which takes out username and password from Subject
> By analyzing the source code analysis, when Subject takes elements from each type of Credentials, *it will change the data structure of the Credentials SecureSet into a HashSet*, and then call *HashSet<String>.iterator().next()* to get *the first* item of each type of HashSet Elements, as username and password in the corresponding Callback
>  
> {code:java}
>  for (Callback callback : callbacks) {
>             if (callback instanceof NameCallback) {
>                 NameCallback nc = (NameCallback) callback;
>                 if (subject != null && !subject.getPublicCredentials(String.class).isEmpty()) {
>                     nc.setName(subject.getPublicCredentials(String.class).iterator().next());
>                 } else
>                     nc.setName(nc.getDefaultName());
>             } else if (callback instanceof PasswordCallback) {
>                 if (subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) {
>                     char[] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
>                     ((PasswordCallback) callback).setPassword(password);
>                 } else {
>                     String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" +
>                              " client code does not currently support obtaining a password from the user.";
>                     throw new UnsupportedCallbackException(callback, errorMessage);
>                 }
>             }
>             
>       .......
>       }
> {code}
>  
>  
> *The key point is: when KafkaClient takes the username and password from the* *Subject, it must be the first element in the HashSet<String>* *of the content of each* *Credentials* *element, and the index order of the elements in the HashSet<String> depends on the element's* {color:#ff0000}*hash( )*{color} *value.*
> h1. KafkaProducer Sasl identity authentication process Debug
>  
> h2. Precondition:
> h3. 1)kafka_server_jaas.conf Configuration:
> All the broker machines java.security.auth.login.config use this configuration, the broker starts normally, the Kafka cluster is healthy, and can provide services to the outside world
>  
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="admin_scram_password";
>  
> };
> {code}
>  
>  
> h3. 2)kafkaProducer key Configuration
> {code:java}
> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "******:9669"); 
> props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
> System.setProperty("java.security.auth.login.config","******\\kafka_Client_jaas.conf");
> props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
> props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "******\\client.truststore.jks");
> props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "******");
> props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
> KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
> {code}
>  
> h3. 3)kafka_Client_jaas.conf File,Simulate the JAAS configuration of ClientBroker
>  
> {code:java}
> KafkaClient {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="DziatPgjXG82sFHc4O1EIuewmlvS";
>  
> };
> {code}
>  
>  
> h3. Start the Debug for authentication process
> h4. 1)“LoginSucceeded = true” in the login() method of LoginContext; place breakpoint in this line code
> Code debug picture see LoginContext_login_debug
>   
>  We can see: *The order of the elements in the two Credentials in the* Subject *must be added in the order of all LoginModules in kafka_Client_jaas.conf*
>  
> h4. 2)Place breakpoint debugging to handle(Callback[] callbacks) of SaslClientCallbackHandler
> Code debug picture see SaslClientCallbackHandler_handle_debug
>   
>  You can see that the password field character array of PasswordCallback here starts with “DziatPgjXG”, which corresponds to the password of ScramLoginModule in the kafka_Client_jaas.conf file: "DziatPgjXG82sFHc4O1EIuewmlvS"
>  
> h4. 3)Cancel all breakpoints and run the KafkaProducer program
> You can see the Producer log
> The real ip sensitive information of the broker in the log, I use ****** instead of here
> {code:java}
> [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1635534803428
> [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker ******:9669 (id: -3 rack: null) disconnected
> [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker ******:9669 (id: -1 rack: null) disconnected
> [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Failed authentication with ******/****** (Authentication failed: Invalid username or password)
> [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -2 (******/******:9669) failed authentication due to: Authentication failed: Invalid username or password
> [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker ******:9669 (id: -2 rack: null) disconnected
> [main] ERROR ProducerTest - the producer has a error:Authentication failed: Invalid username or password
> [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Failed authentication with ******/****** (Authentication failed: Invalid username or password)
> [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -3 (******/******:9669) failed authentication due to: Authentication failed: Invalid username or password
> [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker ******:9669 (id: -3 rack: null) disconnected
> [main] ERROR ProducerTest - the producer has a error:Authentication failed: Invalid username or password
> {code}
>  
> You can see that the Producer log is also printing exceptions: Authentication failed: Invalid username or password
> Because the password of the PlainLoginModule expected by ServerBroker should be “kJTVDziatPgjXG82sFHc4O1EIuewmlvS”, but the password on the KafkaProducer side was wrong, it was taken as the password in the ScramLoginModule: “DziatPgjXG82sFHc4O1EIuewmlvS”
>  
>  
> h1. Root Cause:
> h2. 1. The Credentials stored in the Subject contain the username&password of {color:#ff0000}all LoginModules{color} in kafka_Client_jaas.conf
> The JDK's LoginContext class initialization code is as follows
> {code:java}
> public LoginContext(String name, Subject subject,
>                         CallbackHandler callbackHandler,
>                         Configuration config) throws LoginException {
>         this.config = config;
>         if (config != null) {
>             creatorAcc = java.security.AccessController.getContext();
>         }
>         init(name);
>        ....
>        }
> {code}
>  
>  
> When JAAS configuration is configured through java.security.auth.login.config, the config type here is actually the sun.security.provider.ConfigFile type of the JDK .
> Then when the above init(name); code is executed, config.getAppConfigurationEntry(name) will read all the LoginModules in the kafka_Client_jaas.conf file again, and use the entries variable to complete the assignment of the instance variable moduleStack in the following code.
>  
> {code:java}
> // get the LoginModules configured for this application
>         AppConfigurationEntry[] entries = config.getAppConfigurationEntry(name);
>         if (entries == null) {
>             if (sm != null && creatorAcc == null) {
>                 sm.checkPermission(new AuthPermission
>                                 ("createLoginContext." + OTHER));
>             }
>             entries = config.getAppConfigurationEntry(OTHER);
>             if (entries == null) {
>                 MessageFormat form = new MessageFormat(ResourcesMgr.getString
>                         ("No.LoginModules.configured.for.name"));
>                 Object[] source = {name};
>                 throw new LoginException(form.format(source));
>             }
>         }
>         moduleStack = new ModuleInfo[entries.length];
> {code}
>  
> *Eventually, the* Credentials *stored in the* Subject *will contain the username & password of all* LoginModules *in kafka_Client_jaas.conf.*
> h2. 2.In the Subject class in JDK, when two kinds of Credentials take elements, the order of taking elements may not be the same as the order in which Credentials store elements. The order of taking elements depends on the order of the {color:#ff0000}hash index{color} of the elements in HashSet.
>  
> h1. Suggestion & Solutions:
>  
> h2. 1.Modification of JaasContext and JaasConfig class construction methods
> h3.  
> 1)When KafkaClient initializes Map<String, JaasContext> jaasContexts, the construction method of JaasContext needs to pass in the clientSaslMechanism configured by KafkaClient
> And use clientSaslMechanism to filter the entries returned by configuration.getAppConfigurationEntry(name). When clientSaslMechanism = PLAIN, the final configurationEntries instance variable should only contain the PlainLoginModule content in kafka_Client_jaas.conf;When clientSaslMechanism =SCRAM-SHA-256, configurationEntries should only contain the content of ScramLoginModule 
> {code:java}
> public JaasContext(String name, Type type, Configuration configuration, Password dynamicJaasConfig) {
>         this.name = name;
>         this.type = type;
>         this.configuration = configuration;
>         AppConfigurationEntry[] entries = configuration.getAppConfigurationEntry(name);
>         if (entries == null)
>             throw new IllegalArgumentException("Could not find a '" + name + "' entry in this JAAS configuration.");
>         
>         //Add here the code that uses clientSaslMechanism to verify and filter entries
>         
>         this.configurationEntries = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(entries)));
>         this.dynamicJaasConfig = dynamicJaasConfig;
>     }
> {code}
> The advantages of this are two points:
>  (1)For mode == Mode.CLIENT, even if JAAS configuration is configured through java.security.auth.login.config, the JaasContext of KafkaClient (including KafkaProducer or KafkaConsumer, etc.) can reach the ClientBroker side customized configuration "[listener.name|http://listener.name/].\{listenerName}.\{saslMechanism}.sasl.jaas.config" semantics,See link [https://kafka.apache.org/documentation/#security_jaas_broker] . 
>  To achieve the goal: In Map<String, JaasContext> jaasContexts, the configurationEntries in the JaasContext corresponding to each saslMechanism only contains the contents of the LoginModule part corresponding to saslMechanism.
>  (2)For mode == Mode.SERVER,
>  SaslChannelBuilder#configure(Map<String, ?> configs)
>   
> {code:java}
> public void configure(Map<String, ?> configs) throws KafkaException {
>         try {
>             this.configs = configs;
>             if (mode == Mode.SERVER) {
>                 createServerCallbackHandlers(configs);
>                 createConnectionsMaxReauthMsMap(configs);
>             } else
>                 createClientCallbackHandler(configs);
>             for (Map.Entry<String, AuthenticateCallbackHandler> entry : saslCallbackHandlers.entrySet()) {
>                 String mechanism = entry.getKey();
>                 entry.getValue().configure(configs, mechanism, jaasContexts.get(mechanism).configurationEntries());
>             }
>             ......
>             
>             }
> {code}
> Because we have completed the verification of configurationEntries in jaasContexts.get(mechanism), when executing entry.getValue().configure(...) method, in particular, PlainServerCallbackHandler executes configure(...), The instance variable jaasConfigEntries in PlainServerCallbackHandler will become more pure, jaasConfigEntries will no longer contain the content of other LoginModule
> h3. 2)JaasConfig Class need to provide a new construction method
> In the construction method of JaasContext, the parameter Configuration configuration is uniformly converted into JaasConfig configuration. Because of the behavior in the init(String name) method of the LoginContext class of the JDK, we cannot change it.
>  * 
> {code:java}
> public JaasConfig(String loginContextName, List<AppConfigurationEntry> configurationEntries) {
> this.loginContextName = loginContextName;
> if (configurationEntries == null || configurationEntries.size() == 0)
>         throw new IllegalArgumentException("JAAS config property does not contain any login modules");
> this.configEntries = configurationEntries;
> }
> {code}
> Then in the construction method of JaasContext, use the configurationEntries after verification and filtering to restructure the configuration
>   
> {code:java}
> public JaasContext(String name, Type type, Configuration configuration, Password dynamicJaasConfig) {
>         this.name = name;
>         this.type = type;
>         this.configuration = configuration;
>         AppConfigurationEntry[] entries = configuration.getAppConfigurationEntry(name);
>         if (entries == null)
>             throw new IllegalArgumentException("Could not find a '" + name + "' entry in this JAAS configuration.");
>         
>         //Add here the code that uses clientSaslMechanism to verify and filter entries
>         
>         this.configurationEntries = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(entries)));
>         
>         if (configuration instanceof JaasConfig)
>             this.configuration = configuration;
>         else
>             this.configuration = new JaasConfig(name, configurationEntries);
>         
>         this.dynamicJaasConfig = dynamicJaasConfig;
>     }
> {code}
>  
> h2. 2.Each type of LoginModule in kafka_Client_jaas.conf should only be configured once
> For example, in the following configuration, JaasContext is loaded, and an exception should be thrown when executing the construction method of JaasContext
>  
> {code:java}
> KafkaClient {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin2"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_tom="tom";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="DziatPgjXG82sFHc4O1EIuewmlvS";
>  
> };
> {code}
>  
>  
> h2. 3. Discussion for Subject class in JDK
> The Subject class in the JDK, when fetching elements, SecureSet converts HashSet. I don't know what the JDK considers. We can try to submit an issue to the JDK official. When we can fetch elements, the Set type  uses SecureSet/LinkedHashSet,
> like SecureSet → SecureSet/LinkedHashSet .
> But I recommend making changes on the upper application side, Kafka level.
> First of all, whether it is Map<String, JaasContext> jaasContexts or Map<String, LoginManager> loginManagers variables, the key is the mechanism. Therefore, *the value corresponding to each mechanism should be purer and should not contain the contents of the LoginModule of other mechanisms*. This way, it is also avoided that there are elements greater than 1 in each Credentials in the Subject.
> Secondly, because JDK is designing Subject, Credentials may not pay attention to the semantics of "*first element*". What the JDK promises is to return every element in Credentials to you and allow you to change the returned data.
> See the comments of the getPrivateCredentials method
> {code:java}
> Return a Set of private credentials associated with this Subject that are instances or subclasses of the specified Class.
> The caller must have permission to access all of the requested Credentials, or a SecurityException will be thrown.
> The returned Set is not backed by this Subject's internal private Credential Set. A new Set is created and returned for each method invocation. Modifications to the returned Set will not affect the internal private Credential Set.
> Params:
> c – the returned Set of private credentials will all be instances of this class.
> Type parameters:
> <T> – the type of the class modeled by c
> Returns:
> a Set of private credentials that are instances of the specified Class.
> Throws:
> NullPointerException – if the specified Class is null.
> public <T> Set<T> getPrivateCredentials(Class<T> c) 
> {code}
>  
> However, when Kafka takes elements from the two Credentials in the Subject, it uses the semantics of "*take the first element*", which may conflict with the JDK's concept when designing the Subject. Because you should get all the elements of each Credentials for identity authentication.
> {code:java}
> subject.getPrivateCredentials(String.class).iterator().next()
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)