You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2023/02/12 20:03:36 UTC

[kafka] branch 3.3 updated: KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 6a86e54c764 KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211)
6a86e54c764 is described below

commit 6a86e54c7645c3db950640b10340ff347a0bc1a2
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Sun Feb 12 19:49:12 2023 +0000

    KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211)
    
    We currently cache login managers in static maps for both static JAAS config using system property and for JAAS config specified using Kafka config sasl.jaas.config. In addition to the JAAS config, the login manager callback handler is included in the key, but all other configs are ignored. This implementation is based on the assumption clients that require different logins (e.g. username/password) use different JAAS configs, because login properties are included in the JAAS config ra [...]
    
    This PR includes all SASL configs prefixed with sasl. to be included in the key so that logins are only shared if all the sasl configs are identical.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Kirk True <ki...@mustardgrain.com>
---
 .../security/authenticator/LoginManager.java       | 17 ++++++---
 .../security/authenticator/LoginManagerTest.java   | 43 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 6613fd147f8..f84a8ac9b75 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -99,14 +99,14 @@ public class LoginManager {
             LoginManager loginManager;
             Password jaasConfigValue = jaasContext.dynamicJaasConfig();
             if (jaasConfigValue != null) {
-                LoginMetadata<Password> loginMetadata = new LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass);
+                LoginMetadata<Password> loginMetadata = new LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass, configs);
                 loginManager = DYNAMIC_INSTANCES.get(loginMetadata);
                 if (loginManager == null) {
                     loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata);
                     DYNAMIC_INSTANCES.put(loginMetadata, loginManager);
                 }
             } else {
-                LoginMetadata<String> loginMetadata = new LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass);
+                LoginMetadata<String> loginMetadata = new LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass, configs);
                 loginManager = STATIC_INSTANCES.get(loginMetadata);
                 if (loginManager == null) {
                     loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata);
@@ -198,17 +198,23 @@ public class LoginManager {
         final T configInfo;
         final Class<? extends Login> loginClass;
         final Class<? extends AuthenticateCallbackHandler> loginCallbackClass;
+        final Map<String, Object> saslConfigs;
 
         LoginMetadata(T configInfo, Class<? extends Login> loginClass,
-                      Class<? extends AuthenticateCallbackHandler> loginCallbackClass) {
+                      Class<? extends AuthenticateCallbackHandler> loginCallbackClass,
+                      Map<String, ?> configs) {
             this.configInfo = configInfo;
             this.loginClass = loginClass;
             this.loginCallbackClass = loginCallbackClass;
+            this.saslConfigs = new HashMap<>();
+            configs.entrySet().stream()
+                    .filter(e -> e.getKey().startsWith("sasl."))
+                    .forEach(e -> saslConfigs.put(e.getKey(), e.getValue())); // value may be null
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(configInfo, loginClass, loginCallbackClass);
+            return Objects.hash(configInfo, loginClass, loginCallbackClass, saslConfigs);
         }
 
         @Override
@@ -219,7 +225,8 @@ public class LoginManager {
             LoginMetadata<?> loginMetadata = (LoginMetadata<?>) o;
             return Objects.equals(configInfo, loginMetadata.configInfo) &&
                    Objects.equals(loginClass, loginMetadata.loginClass) &&
-                   Objects.equals(loginCallbackClass, loginMetadata.loginCallbackClass);
+                   Objects.equals(loginCallbackClass, loginMetadata.loginCallbackClass) &&
+                   Objects.equals(saslConfigs, loginMetadata.saslConfigs);
         }
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
index 92edfae77f9..3e3300e78e4 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
+import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.JaasContext;
@@ -109,6 +110,48 @@ public class LoginManagerTest {
         verifyLoginManagerRelease(staticScramLogin, 2, scramJaasContext, configs);
     }
 
+    @Test
+    public void testLoginManagerWithDifferentConfigs() throws Exception {
+        Map<String, Object> configs1 = new HashMap<>();
+        configs1.put("sasl.jaas.config", dynamicPlainContext);
+        configs1.put("client.id", "client");
+        configs1.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://host1:1234");
+        Map<String, Object> configs2 = new HashMap<>(configs1);
+        configs2.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://host2:1234");
+        JaasContext dynamicContext = JaasContext.loadClientContext(configs1);
+        Map<String, Object> configs3 = new HashMap<>(configs1);
+        configs3.put("client.id", "client3");
+
+        LoginManager dynamicLogin1 = LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, configs1);
+        LoginManager dynamicLogin2 = LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, configs2);
+
+        assertEquals(dynamicPlainContext, dynamicLogin1.cacheKey());
+        assertEquals(dynamicPlainContext, dynamicLogin2.cacheKey());
+        assertNotSame(dynamicLogin1, dynamicLogin2);
+
+        assertSame(dynamicLogin1, LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, configs1));
+        assertSame(dynamicLogin2, LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, configs2));
+        assertSame(dynamicLogin1, LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, new HashMap<>(configs1)));
+        assertSame(dynamicLogin1, LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, configs3));
+
+
+        JaasContext staticContext = JaasContext.loadClientContext(Collections.emptyMap());
+        LoginManager staticLogin1 = LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", DefaultLogin.class, configs1);
+        LoginManager staticLogin2 = LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", DefaultLogin.class, configs2);
+        assertNotSame(staticLogin1, dynamicLogin1);
+        assertNotSame(staticLogin2, dynamicLogin2);
+        assertNotSame(staticLogin1, staticLogin2);
+        assertSame(staticLogin1, LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", DefaultLogin.class, configs1));
+        assertSame(staticLogin2, LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", DefaultLogin.class, configs2));
+        assertSame(staticLogin1, LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", DefaultLogin.class, new HashMap<>(configs1)));
+        assertSame(staticLogin1, LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", DefaultLogin.class, configs3));
+
+        verifyLoginManagerRelease(dynamicLogin1, 4, dynamicContext, configs1);
+        verifyLoginManagerRelease(dynamicLogin2, 2, dynamicContext, configs2);
+        verifyLoginManagerRelease(staticLogin1, 4, staticContext, configs1);
+        verifyLoginManagerRelease(staticLogin2, 2, staticContext, configs2);
+    }
+
     private void verifyLoginManagerRelease(LoginManager loginManager, int acquireCount, JaasContext jaasContext,
                                            Map<String, ?> configs) throws Exception {