You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/02/15 09:26:43 UTC

[kafka] branch trunk updated: MINOR: Support dynamic JAAS config for broker's LoginManager cache (#4568)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 015e224  MINOR: Support dynamic JAAS config for broker's LoginManager cache (#4568)
015e224 is described below

commit 015e224b3d3c8e7bc412686ff22d5e99324b1019
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Thu Feb 15 09:26:34 2018 +0000

    MINOR: Support dynamic JAAS config for broker's LoginManager cache (#4568)
    
    Fix LoginManager caching when sasl.jaas.config is defined for broker and add unit tests.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../apache/kafka/common/security/JaasContext.java  |  28 +++--
 .../security/authenticator/LoginManager.java       |  34 ++++--
 .../common/network/SaslChannelBuilderTest.java     |   2 +-
 .../ClientAuthenticationFailureTest.java           |   1 +
 .../security/authenticator/LoginManagerTest.java   | 129 +++++++++++++++++++++
 .../authenticator/SaslAuthenticatorTest.java       |   1 +
 .../authenticator/SaslServerAuthenticatorTest.java |   2 +-
 .../common/security/plain/PlainSaslServerTest.java |   2 +-
 8 files changed, 177 insertions(+), 22 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
index 6afed55..d72f00d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
@@ -64,10 +64,10 @@ public class JaasContext {
             throw new IllegalArgumentException("mechanism should not be null for SERVER");
         String globalContextName = GLOBAL_CONTEXT_NAME_SERVER;
         String listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + GLOBAL_CONTEXT_NAME_SERVER;
-        Password jaasConfigArgs = (Password) configs.get(mechanism.toLowerCase(Locale.ROOT) + "." + SaslConfigs.SASL_JAAS_CONFIG);
-        if (jaasConfigArgs == null && configs.get(SaslConfigs.SASL_JAAS_CONFIG) != null)
+        Password dynamicJaasConfig = (Password) configs.get(mechanism.toLowerCase(Locale.ROOT) + "." + SaslConfigs.SASL_JAAS_CONFIG);
+        if (dynamicJaasConfig == null && configs.get(SaslConfigs.SASL_JAAS_CONFIG) != null)
             LOG.warn("Server config {} should be prefixed with SASL mechanism name, ignoring config", SaslConfigs.SASL_JAAS_CONFIG);
-        return load(Type.SERVER, listenerContextName, globalContextName, jaasConfigArgs);
+        return load(Type.SERVER, listenerContextName, globalContextName, dynamicJaasConfig);
     }
 
     /**
@@ -80,20 +80,20 @@ public class JaasContext {
      */
     public static JaasContext loadClientContext(Map<String, ?> configs) {
         String globalContextName = GLOBAL_CONTEXT_NAME_CLIENT;
-        Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
-        return load(JaasContext.Type.CLIENT, null, globalContextName, jaasConfigArgs);
+        Password dynamicJaasConfig = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
+        return load(JaasContext.Type.CLIENT, null, globalContextName, dynamicJaasConfig);
     }
 
     static JaasContext load(JaasContext.Type contextType, String listenerContextName,
-                            String globalContextName, Password jaasConfigArgs) {
-        if (jaasConfigArgs != null) {
-            JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value());
+                            String globalContextName, Password dynamicJaasConfig) {
+        if (dynamicJaasConfig != null) {
+            JaasConfig jaasConfig = new JaasConfig(globalContextName, dynamicJaasConfig.value());
             AppConfigurationEntry[] contextModules = jaasConfig.getAppConfigurationEntry(globalContextName);
             if (contextModules == null || contextModules.length == 0)
                 throw new IllegalArgumentException("JAAS config property does not contain any login modules");
             else if (contextModules.length != 1)
                 throw new IllegalArgumentException("JAAS config property contains " + contextModules.length + " login modules, should be 1 module");
-            return new JaasContext(globalContextName, contextType, jaasConfig);
+            return new JaasContext(globalContextName, contextType, jaasConfig, dynamicJaasConfig);
         } else
             return defaultContext(contextType, listenerContextName, globalContextName);
     }
@@ -133,7 +133,7 @@ public class JaasContext {
             throw new IllegalArgumentException(errorMessage);
         }
 
-        return new JaasContext(contextName, contextType, jaasConfig);
+        return new JaasContext(contextName, contextType, jaasConfig, null);
     }
 
     /**
@@ -146,8 +146,9 @@ public class JaasContext {
     private final Type type;
     private final Configuration configuration;
     private final List<AppConfigurationEntry> configurationEntries;
+    private final Password dynamicJaasConfig;
 
-    public JaasContext(String name, Type type, Configuration configuration) {
+    public JaasContext(String name, Type type, Configuration configuration, Password dynamicJaasConfig) {
         this.name = name;
         this.type = type;
         this.configuration = configuration;
@@ -155,6 +156,7 @@ public class JaasContext {
         if (entries == null)
             throw new IllegalArgumentException("Could not find a '" + name + "' entry in this JAAS configuration.");
         this.configurationEntries = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(entries)));
+        this.dynamicJaasConfig = dynamicJaasConfig;
     }
 
     public String name() {
@@ -173,6 +175,10 @@ public class JaasContext {
         return configurationEntries;
     }
 
+    public Password dynamicJaasConfig() {
+        return dynamicJaasConfig;
+    }
+
     /**
      * Returns the configuration option for <code>key</code> from this context.
      * If login module name is specified, return option value only from that module.
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index dc264c8..81dc063 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -46,8 +46,8 @@ public class LoginManager {
     private int refCount;
 
     private LoginManager(JaasContext jaasContext, boolean hasKerberos, Map<String, ?> configs,
-                         Password jaasConfigValue) throws IOException, LoginException {
-        this.cacheKey = jaasConfigValue != null ? jaasConfigValue : jaasContext.name();
+                         Object cacheKey) throws IOException, LoginException {
+        this.cacheKey = cacheKey;
         login = hasKerberos ? new KerberosLogin() : new DefaultLogin();
         login.configure(configs, jaasContext);
         login.login();
@@ -57,20 +57,33 @@ public class LoginManager {
      * Returns an instance of `LoginManager` and increases its reference count.
      *
      * `release()` should be invoked when the `LoginManager` is no longer needed. This method will try to reuse an
-     * existing `LoginManager` for the provided context type and `SaslConfigs.SASL_JAAS_CONFIG` in `configs`,
-     * if available.
+     * existing `LoginManager` for the provided context type. If `jaasContext` was loaded from a dynamic config,
+     * login managers are reused for the same dynamic config value. For `jaasContext` loaded from static JAAS
+     * configuration, login managers are reused for static contexts with the same login context name.
      *
      * This is a bit ugly and it would be nicer if we could pass the `LoginManager` to `ChannelBuilders.create` and
      * shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more
      * complicated to do the latter without making the consumer API more complex.
+     *
+     * @param jaasContext Static or dynamic JAAS context. `jaasContext.dynamicJaasConfig()` is non-null for dynamic context.
+     *                    For static contexts, this may contain multiple login modules if the context type is SERVER.
+     *                    For CLIENT static contexts and dynamic contexts of CLIENT and SERVER, 'jaasContext` contains
+     *                    only one login module.
+     * @param saslMechanism SASL mechanism for which login manager is being acquired. For dynamic contexts, the single
+     *                      login module in `jaasContext` corresponds to this SASL mechanism. Hence `Login` class is
+     *                      chosen based on this mechanism.
+     * @param hasKerberos Boolean flag that indicates if Kerberos is enabled for the server listener or client. Since
+     *                    static broker configuration may contain multiple login modules in a login context, KerberosLogin
+     *                    must be used if Kerberos is enabled on the listener, even if `saslMechanism` is not GSSAPI.
+     * @param configs Config options used to configure `Login` if a new login manager is created.
+     *
      */
     public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism, boolean hasKerberos,
                                                    Map<String, ?> configs) throws IOException, LoginException {
         synchronized (LoginManager.class) {
-            // SASL_JAAS_CONFIG is only supported by clients
             LoginManager loginManager;
-            Password jaasConfigValue = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
-            if (jaasContext.type() == JaasContext.Type.CLIENT && jaasConfigValue != null) {
+            Password jaasConfigValue = jaasContext.dynamicJaasConfig();
+            if (jaasConfigValue != null) {
                 loginManager = DYNAMIC_INSTANCES.get(jaasConfigValue);
                 if (loginManager == null) {
                     loginManager = new LoginManager(jaasContext, saslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM), configs, jaasConfigValue);
@@ -79,7 +92,7 @@ public class LoginManager {
             } else {
                 loginManager = STATIC_INSTANCES.get(jaasContext.name());
                 if (loginManager == null) {
-                    loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasConfigValue);
+                    loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasContext.name());
                     STATIC_INSTANCES.put(jaasContext.name(), loginManager);
                 }
             }
@@ -95,6 +108,11 @@ public class LoginManager {
         return login.serviceName();
     }
 
+    // Only for testing
+    Object cacheKey() {
+        return cacheKey;
+    }
+
     private LoginManager acquire() {
         ++refCount;
         LOGGER.trace("{} acquired", this);
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index 6072bf5..26cc544 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -71,7 +71,7 @@ public class SaslChannelBuilderTest {
     private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol) {
         TestJaasConfig jaasConfig = new TestJaasConfig();
         jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
-        JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
+        JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null);
         Map<String, JaasContext> jaasContexts = Collections.singletonMap("PLAIN", jaasContext);
         return new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol, new ListenerName("PLAIN"),
                 false, "PLAIN", true, null, null);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index 7c028c4..c64597b 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -56,6 +56,7 @@ public class ClientAuthenticationFailureTest {
 
     @Before
     public void setup() throws Exception {
+        LoginManager.closeAll();
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
 
         saslServerConfigs = new HashMap<>();
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
new file mode 100644
index 0000000..8be72fb
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertEquals;
+
+public class LoginManagerTest {
+
+    private Password dynamicPlainContext;
+    private Password dynamicDigestContext;
+
+    @Before
+    public void setUp() {
+        dynamicPlainContext = new Password(PlainLoginModule.class.getName() +
+                " required user=\"plainuser\" password=\"plain-secret\";");
+        dynamicDigestContext = new Password(TestDigestLoginModule.class.getName() +
+                " required user=\"digestuser\" password=\"digest-secret\";");
+        TestJaasConfig.createConfiguration("SCRAM-SHA-256",
+                Collections.singletonList("SCRAM-SHA-256"));
+    }
+
+    @After
+    public void tearDown() {
+        LoginManager.closeAll();
+    }
+
+    @Test
+    public void testClientLoginManager() throws Exception {
+        Map<String, ?> configs = Collections.singletonMap("sasl.jaas.config", dynamicPlainContext);
+        JaasContext dynamicContext = JaasContext.loadClientContext(configs);
+        JaasContext staticContext = JaasContext.loadClientContext(Collections.<String, Object>emptyMap());
+
+        LoginManager dynamicLogin = LoginManager.acquireLoginManager(dynamicContext, "PLAIN",
+                false, configs);
+        assertEquals(dynamicPlainContext, dynamicLogin.cacheKey());
+        LoginManager staticLogin = LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256",
+                false, configs);
+        assertNotSame(dynamicLogin, staticLogin);
+        assertEquals("KafkaClient", staticLogin.cacheKey());
+
+        assertSame(dynamicLogin, LoginManager.acquireLoginManager(dynamicContext, "PLAIN",
+                false, configs));
+        assertSame(staticLogin, LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256",
+                false, configs));
+
+        verifyLoginManagerRelease(dynamicLogin, 2, dynamicContext, configs);
+        verifyLoginManagerRelease(staticLogin, 2, staticContext, configs);
+    }
+
+    @Test
+    public void testServerLoginManager() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put("plain.sasl.jaas.config", dynamicPlainContext);
+        configs.put("digest-md5.sasl.jaas.config", dynamicDigestContext);
+        ListenerName listenerName = new ListenerName("listener1");
+        JaasContext plainJaasContext = JaasContext.loadServerContext(listenerName, "PLAIN", configs);
+        JaasContext digestJaasContext = JaasContext.loadServerContext(listenerName, "DIGEST-MD5", configs);
+        JaasContext scramJaasContext = JaasContext.loadServerContext(listenerName, "SCRAM-SHA-256", configs);
+
+        LoginManager dynamicPlainLogin = LoginManager.acquireLoginManager(plainJaasContext, "PLAIN",
+                false, configs);
+        assertEquals(dynamicPlainContext, dynamicPlainLogin.cacheKey());
+        LoginManager dynamicDigestLogin = LoginManager.acquireLoginManager(digestJaasContext, "DIGEST-MD5",
+                false, configs);
+        assertNotSame(dynamicPlainLogin, dynamicDigestLogin);
+        assertEquals(dynamicDigestContext, dynamicDigestLogin.cacheKey());
+        LoginManager staticScramLogin = LoginManager.acquireLoginManager(scramJaasContext, "SCRAM-SHA-256",
+                false, configs);
+        assertNotSame(dynamicPlainLogin, staticScramLogin);
+        assertEquals("KafkaServer", staticScramLogin.cacheKey());
+
+        assertSame(dynamicPlainLogin, LoginManager.acquireLoginManager(plainJaasContext, "PLAIN",
+                false, configs));
+        assertSame(dynamicDigestLogin, LoginManager.acquireLoginManager(digestJaasContext, "DIGEST-MD5",
+                false, configs));
+        assertSame(staticScramLogin, LoginManager.acquireLoginManager(scramJaasContext, "SCRAM-SHA-256",
+                false, configs));
+
+        verifyLoginManagerRelease(dynamicPlainLogin, 2, plainJaasContext, configs);
+        verifyLoginManagerRelease(dynamicDigestLogin, 2, digestJaasContext, configs);
+        verifyLoginManagerRelease(staticScramLogin, 2, scramJaasContext, configs);
+    }
+
+    private void verifyLoginManagerRelease(LoginManager loginManager, int acquireCount, JaasContext jaasContext,
+                                           Map<String, ?> configs) throws Exception {
+
+        // Release all except one reference and verify that the loginManager is still cached
+        for (int i = 0; i < acquireCount - 1; i++)
+            loginManager.release();
+        assertSame(loginManager, LoginManager.acquireLoginManager(jaasContext, "PLAIN",
+                false, configs));
+
+        // Release all references and verify that new LoginManager is created on next acquire
+        for (int i = 0; i < 2; i++) // release all references
+            loginManager.release();
+        LoginManager newLoginManager = LoginManager.acquireLoginManager(jaasContext, "PLAIN",
+                false, configs);
+        assertNotSame(loginManager, newLoginManager);
+        newLoginManager.release();
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index ef2a075..b8edc61 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -104,6 +104,7 @@ public class SaslAuthenticatorTest {
 
     @Before
     public void setup() throws Exception {
+        LoginManager.closeAll();
         serverCertStores = new CertStores(true, "localhost");
         clientCertStores = new CertStores(false, "localhost");
         saslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 72c2969..17d31bd 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -110,7 +110,7 @@ public class SaslServerAuthenticatorTest {
         TestJaasConfig jaasConfig = new TestJaasConfig();
         jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
         Map<String, JaasContext> jaasContexts = Collections.singletonMap(mechanism,
-                new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig));
+                new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null));
         Map<String, Subject> subjects = Collections.singletonMap(mechanism, new Subject());
         return new SaslServerAuthenticator(configs, "node", jaasContexts, subjects, null, new CredentialCache(),
                 new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, new DelegationTokenCache(ScramMechanism.mechanismNames()));
diff --git a/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
index 4196db6..86baf3e 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
@@ -45,7 +45,7 @@ public class PlainSaslServerTest {
         options.put("user_" + USER_A, PASSWORD_A);
         options.put("user_" + USER_B, PASSWORD_B);
         jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), options);
-        JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
+        JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null);
         saslServer = new PlainSaslServer(jaasContext);
     }
 

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.