You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/04 17:19:21 UTC

[kafka] branch trunk updated: KAFKA-6246; Dynamic update of listeners and security configs (#4488)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4019b21  KAFKA-6246; Dynamic update of listeners and security configs (#4488)
4019b21 is described below

commit 4019b21d602c0912d9dc0286f10701e7e724f750
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Sun Feb 4 09:19:17 2018 -0800

    KAFKA-6246; Dynamic update of listeners and security configs (#4488)
    
    Dynamic update of listeners as described in KIP-226. This includes:
      - Addition of new listeners with listener-prefixed security configs
      - Removal of existing listeners
      - Password encryption
      - sasl.jaas.config property for broker's JAAS config prefixed with listener and mechanism name
---
 checkstyle/import-control.xml                      |   1 +
 .../org/apache/kafka/common/Reconfigurable.java    |   9 +-
 .../apache/kafka/common/config/AbstractConfig.java |  16 +
 .../kafka/common/network/ChannelBuilders.java      |  18 +-
 .../apache/kafka/common/network/ListenerName.java  |   4 +
 .../kafka/common/network/SaslChannelBuilder.java   |  58 +--
 .../kafka/common/network/SslChannelBuilder.java    |   4 +-
 .../apache/kafka/common/security/JaasContext.java  |  84 ++---
 .../security/authenticator/CredentialCache.java    |   8 +-
 .../security/authenticator/LoginManager.java       |   4 +-
 .../authenticator/SaslServerAuthenticator.java     |  23 +-
 .../security/scram/ScramCredentialUtils.java       |   4 +-
 .../kafka/common/security/ssl/SslFactory.java      |   7 +-
 .../kafka/common/config/AbstractConfigTest.java    |  49 +++
 .../common/network/SaslChannelBuilderTest.java     |  20 +-
 .../kafka/common/security/JaasContextTest.java     |  25 +-
 .../authenticator/SaslAuthenticatorTest.java       |  48 ++-
 .../authenticator/SaslServerAuthenticatorTest.java |  13 +-
 .../security/authenticator/TestJaasConfig.java     |  14 +
 .../scala/kafka/controller/KafkaController.scala   |  67 +++-
 .../main/scala/kafka/network/SocketServer.scala    |  56 ++-
 .../scala/kafka/security/CredentialProvider.scala  |   6 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 254 ++++++++++---
 core/src/main/scala/kafka/server/KafkaApis.scala   |   2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  99 +++--
 core/src/main/scala/kafka/server/KafkaServer.scala |   9 +-
 .../main/scala/kafka/utils/PasswordEncoder.scala   | 175 +++++++++
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |   7 +
 .../kafka/api/AdminClientIntegrationTest.scala     |   4 +-
 .../scala/integration/kafka/api/SaslSetup.scala    |  16 +-
 .../server/DynamicBrokerReconfigurationTest.scala  | 404 +++++++++++++++++++--
 ...pleListenersWithAdditionalJaasContextTest.scala |  24 +-
 ...ltipleListenersWithDefaultJaasContextTest.scala |   9 +-
 ...ListenersWithSameSecurityProtocolBaseTest.scala |  81 +++--
 .../test/scala/unit/kafka/KafkaConfigTest.scala    |  12 +-
 .../scala/unit/kafka/admin/ConfigCommandTest.scala |   4 +-
 .../unit/kafka/network/SocketServerTest.scala      |   5 +-
 .../kafka/server/DynamicBrokerConfigTest.scala     | 155 ++++++--
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  10 +
 .../unit/kafka/utils/PasswordEncoderTest.scala     | 127 +++++++
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   2 +-
 41 files changed, 1574 insertions(+), 363 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 5662552..000acc3 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -50,6 +50,7 @@
     <disallow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.common" exact-match="true" />
     <allow pkg="org.apache.kafka.common.annotation" />
+    <allow pkg="org.apache.kafka.common.config" exact-match="true" />
     <allow pkg="org.apache.kafka.common.internals" exact-match="true" />
     <allow pkg="org.apache.kafka.test" />
 
diff --git a/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java b/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java
index 3339dce..8db9dc2 100644
--- a/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java
+++ b/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common;
 
+import org.apache.kafka.common.config.ConfigException;
+
 import java.util.Map;
 import java.util.Set;
 
@@ -33,9 +35,12 @@ public interface Reconfigurable extends Configurable {
      * Validates the provided configuration. The provided map contains
      * all configs including any reconfigurable configs that may be different
      * from the initial configuration. Reconfiguration will be not performed
-     * if this method returns false or throws any exception.
+     * if this method throws any exception.
+     * @throws ConfigException if the provided configs are not valid. The exception
+     *         message from ConfigException will be returned to the client in
+     *         the AlterConfigs response.
      */
-    boolean validateReconfiguration(Map<String, ?> configs);
+    void validateReconfiguration(Map<String, ?> configs) throws ConfigException;
 
     /**
      * Reconfigures this instance with the given key-value pairs. The provided
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index ce3ae43..1713d78 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -204,6 +204,16 @@ public class AbstractConfig {
      * put all the remaining keys with the prefix stripped and their parsed values in the result map.
      *
      * This is useful if one wants to allow prefixed configs to override default ones.
+     * <p>
+     * Two forms of prefixes are supported:
+     * <ul>
+     *     <li>listener.name.{listenerName}.some.prop: If the provided prefix is `listener.name.{listenerName}.`,
+     *         the key `some.prop` with the value parsed using the definition of `some.prop` is returned.</li>
+     *     <li>listener.name.{listenerName}.{mechanism}.some.prop: If the provided prefix is `listener.name.{listenerName}.`,
+     *         the key `{mechanism}.some.prop` with the value parsed using the definition of `some.prop` is returned.
+     *          This is used to provide per-mechanism configs for a broker listener (e.g sasl.jaas.config)</li>
+     * </ul>
+     * </p>
      */
     public Map<String, Object> valuesWithPrefixOverride(String prefix) {
         Map<String, Object> result = new RecordingMap<>(values(), prefix, true);
@@ -213,6 +223,12 @@ public class AbstractConfig {
                 ConfigDef.ConfigKey configKey = definition.configKeys().get(keyWithNoPrefix);
                 if (configKey != null)
                     result.put(keyWithNoPrefix, definition.parseValue(configKey, entry.getValue(), true));
+                else {
+                    String keyWithNoSecondaryPrefix = keyWithNoPrefix.substring(keyWithNoPrefix.indexOf('.') + 1);
+                    configKey = definition.configKeys().get(keyWithNoSecondaryPrefix);
+                    if (configKey != null)
+                        result.put(keyWithNoPrefix, definition.parseValue(configKey, entry.getValue(), true));
+                }
             }
         }
         return result;
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index e6df78e..80ccb7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -29,6 +29,9 @@ import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
 import org.apache.kafka.common.utils.Utils;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class ChannelBuilders {
@@ -105,9 +108,20 @@ public class ChannelBuilders {
             case SASL_SSL:
             case SASL_PLAINTEXT:
                 requireNonNullMode(mode, securityProtocol);
-                JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs);
+                Map<String, JaasContext> jaasContexts;
+                if (mode == Mode.SERVER) {
+                    List<String> enabledMechanisms = (List<String>) configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
+                    jaasContexts = new HashMap<>(enabledMechanisms.size());
+                    for (String mechanism : enabledMechanisms)
+                        jaasContexts.put(mechanism, JaasContext.loadServerContext(listenerName, mechanism, configs));
+                } else {
+                    // Use server context for inter-broker client connections and client context for other clients
+                    JaasContext jaasContext = contextType == JaasContext.Type.CLIENT ? JaasContext.loadClientContext(configs) :
+                            JaasContext.loadServerContext(listenerName, clientSaslMechanism, configs);
+                    jaasContexts = Collections.singletonMap(clientSaslMechanism, jaasContext);
+                }
                 channelBuilder = new SaslChannelBuilder(mode,
-                        jaasContext,
+                        jaasContexts,
                         securityProtocol,
                         listenerName,
                         isInterBrokerListener,
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
index 9da4cca..fc0cb14 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
@@ -71,4 +71,8 @@ public final class ListenerName {
     public String configPrefix() {
         return CONFIG_STATIC_PREFIX + "." + value.toLowerCase(Locale.ROOT) + ".";
     }
+
+    public String saslMechanismConfigPrefix(String saslMechanism) {
+        return configPrefix() + saslMechanism.toLowerCase(Locale.ROOT) + ".";
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 1716e0e..095f826 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -41,6 +41,7 @@ import java.net.Socket;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -55,18 +56,19 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
     private final boolean isInterBrokerListener;
     private final String clientSaslMechanism;
     private final Mode mode;
-    private final JaasContext jaasContext;
+    private final Map<String, JaasContext> jaasContexts;
     private final boolean handshakeRequestEnable;
     private final CredentialCache credentialCache;
     private final DelegationTokenCache tokenCache;
+    private final Map<String, LoginManager> loginManagers;
+    private final Map<String, Subject> subjects;
 
-    private LoginManager loginManager;
     private SslFactory sslFactory;
     private Map<String, ?> configs;
     private KerberosShortNamer kerberosShortNamer;
 
     public SaslChannelBuilder(Mode mode,
-                              JaasContext jaasContext,
+                              Map<String, JaasContext> jaasContexts,
                               SecurityProtocol securityProtocol,
                               ListenerName listenerName,
                               boolean isInterBrokerListener,
@@ -75,7 +77,9 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
                               CredentialCache credentialCache,
                               DelegationTokenCache tokenCache) {
         this.mode = mode;
-        this.jaasContext = jaasContext;
+        this.jaasContexts = jaasContexts;
+        this.loginManagers = new HashMap<>(jaasContexts.size());
+        this.subjects = new HashMap<>(jaasContexts.size());
         this.securityProtocol = securityProtocol;
         this.listenerName = listenerName;
         this.isInterBrokerListener = isInterBrokerListener;
@@ -89,14 +93,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
     public void configure(Map<String, ?> configs) throws KafkaException {
         try {
             this.configs = configs;
-            boolean hasKerberos;
-            if (mode == Mode.SERVER) {
-                @SuppressWarnings("unchecked")
-                List<String> enabledMechanisms = (List<String>) this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
-                hasKerberos = enabledMechanisms == null || enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM);
-            } else {
-                hasKerberos = clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM);
-            }
+            boolean hasKerberos = jaasContexts.containsKey(SaslConfigs.GSSAPI_MECHANISM);
 
             if (hasKerberos) {
                 String defaultRealm;
@@ -110,8 +107,14 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
                 if (principalToLocalRules != null)
                     kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
             }
-            this.loginManager = LoginManager.acquireLoginManager(jaasContext, hasKerberos, configs);
-
+            for (Map.Entry<String, JaasContext> entry : jaasContexts.entrySet()) {
+                String mechanism = entry.getKey();
+                // With static JAAS configuration, use KerberosLogin if Kerberos is enabled. With dynamic JAAS configuration,
+                // use KerberosLogin only for the LoginContext corresponding to GSSAPI
+                LoginManager loginManager = LoginManager.acquireLoginManager(entry.getValue(), mechanism, hasKerberos, configs);
+                loginManagers.put(mechanism, loginManager);
+                subjects.put(mechanism, loginManager.subject());
+            }
             if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
                 // Disable SSL client authentication as we are using SASL authentication
                 this.sslFactory = new SslFactory(mode, "none", isInterBrokerListener);
@@ -129,11 +132,9 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
     }
 
     @Override
-    public boolean validateReconfiguration(Map<String, ?> configs) {
+    public void validateReconfiguration(Map<String, ?> configs) {
         if (this.securityProtocol == SecurityProtocol.SASL_SSL)
-            return sslFactory.validateReconfiguration(configs);
-        else
-            return true;
+            sslFactory.validateReconfiguration(configs);
     }
 
     @Override
@@ -154,11 +155,13 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
             Socket socket = socketChannel.socket();
             TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
             Authenticator authenticator;
-            if (mode == Mode.SERVER)
-                authenticator = buildServerAuthenticator(configs, id, transportLayer, loginManager.subject());
-            else
+            if (mode == Mode.SERVER) {
+                authenticator = buildServerAuthenticator(configs, id, transportLayer, subjects);
+            } else {
+                LoginManager loginManager = loginManagers.get(clientSaslMechanism);
                 authenticator = buildClientAuthenticator(configs, id, socket.getInetAddress().getHostName(),
                         loginManager.serviceName(), transportLayer, loginManager.subject());
+            }
             return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
             log.info("Failed to create channel due to ", e);
@@ -168,10 +171,9 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
 
     @Override
     public void close()  {
-        if (loginManager != null) {
+        for (LoginManager loginManager : loginManagers.values())
             loginManager.release();
-            loginManager = null;
-        }
+        loginManagers.clear();
     }
 
     private TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {
@@ -185,8 +187,8 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
 
     // Visible to override for testing
     protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, String id,
-            TransportLayer transportLayer, Subject subject) throws IOException {
-        return new SaslServerAuthenticator(configs, id, jaasContext, subject,
+            TransportLayer transportLayer, Map<String, Subject> subjects) throws IOException {
+        return new SaslServerAuthenticator(configs, id, jaasContexts, subjects,
                 kerberosShortNamer, credentialCache, listenerName, securityProtocol, transportLayer, tokenCache);
     }
 
@@ -198,8 +200,8 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
     }
 
     // Package private for testing
-    LoginManager loginManager() {
-        return loginManager;
+    Map<String, LoginManager> loginManagers() {
+        return loginManagers;
     }
 
     private static String defaultKerberosRealm() throws ClassNotFoundException, NoSuchMethodException,
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index e024d32..941c455 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -71,8 +71,8 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
     }
 
     @Override
-    public boolean validateReconfiguration(Map<String, ?> configs) {
-        return sslFactory.validateReconfiguration(configs);
+    public void validateReconfiguration(Map<String, ?> configs) {
+        sslFactory.validateReconfiguration(configs);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
index a13acd2..46db345 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
@@ -41,57 +41,59 @@ public class JaasContext {
     /**
      * Returns an instance of this class.
      *
-     * For contextType SERVER, the context will contain the default Configuration and the context name will be one of:
+     * The context will contain the configuration specified by the JAAS configuration property
+     * {@link SaslConfigs#SASL_JAAS_CONFIG} with prefix `listener.name.{listenerName}.{mechanism}.`
+     * with listenerName and mechanism in lower case. The context `KafkaServer` will be returned
+     * with a single login context entry loaded from the property.
+     * <p>
+     * If the property is not defined, the context will contain the default Configuration and
+     * the context name will be one of:
+     * <ol>
+     *   <li>Lowercased listener name followed by a period and the string `KafkaServer`</li>
+     *   <li>The string `KafkaServer`</li>
+     *  </ol>
+     * If both are valid entries in the default JAAS configuration, the first option is chosen.
+     * </p>
      *
-     * 1. Lowercased listener name followed by a period and the string `KafkaServer`
-     * 2. The string `KafkaServer`
-     *
-     * If both are valid entries in the JAAS configuration, the first option is chosen.
+     * @throws IllegalArgumentException if listenerName or mechanism is not defined.
+     */
+    public static JaasContext loadServerContext(ListenerName listenerName, String mechanism, Map<String, ?> configs) {
+        if (listenerName == null)
+            throw new IllegalArgumentException("listenerName should not be null for SERVER");
+        if (mechanism == null)
+            throw new IllegalArgumentException("mechanism should not be null for SERVER");
+        String globalContextName = GLOBAL_CONTEXT_NAME_SERVER;
+        String listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + GLOBAL_CONTEXT_NAME_SERVER;
+        Password jaasConfigArgs = (Password) configs.get(mechanism.toLowerCase(Locale.ROOT) + "." + SaslConfigs.SASL_JAAS_CONFIG);
+        if (jaasConfigArgs == null && configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG))
+            LOG.warn("Server config {} should be prefixed with SASL mechanism name, ignoring config", SaslConfigs.SASL_JAAS_CONFIG);
+        return load(Type.SERVER, listenerContextName, globalContextName, jaasConfigArgs);
+    }
+
+    /**
+     * Returns an instance of this class.
      *
-     * For contextType CLIENT, if JAAS configuration property @link SaslConfigs#SASL_JAAS_CONFIG} is specified,
+     * If JAAS configuration property @link SaslConfigs#SASL_JAAS_CONFIG} is specified,
      * the configuration object is created by parsing the property value. Otherwise, the default Configuration
      * is returned. The context name is always `KafkaClient`.
      *
-     * @throws IllegalArgumentException if JAAS configuration property is specified for contextType SERVER, if
-     * listenerName is not defined for contextType SERVER of if listenerName is defined for contextType CLIENT.
      */
-    public static JaasContext load(JaasContext.Type contextType, ListenerName listenerName,
-                                   Map<String, ?> configs) {
-        String listenerContextName;
-        String globalContextName;
-        switch (contextType) {
-            case CLIENT:
-                if (listenerName != null)
-                    throw new IllegalArgumentException("listenerName should be null for CLIENT");
-                globalContextName = GLOBAL_CONTEXT_NAME_CLIENT;
-                listenerContextName = null;
-                break;
-            case SERVER:
-                if (listenerName == null)
-                    throw new IllegalArgumentException("listenerName should not be null for SERVER");
-                globalContextName = GLOBAL_CONTEXT_NAME_SERVER;
-                listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + GLOBAL_CONTEXT_NAME_SERVER;
-                break;
-            default:
-                throw new IllegalArgumentException("Unexpected context type " + contextType);
-        }
-        return load(contextType, listenerContextName, globalContextName, configs);
+    public static JaasContext loadClientContext(Map<String, ?> configs) {
+        String globalContextName = GLOBAL_CONTEXT_NAME_CLIENT;
+        Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
+        return load(JaasContext.Type.CLIENT, null, globalContextName, jaasConfigArgs);
     }
 
     static JaasContext load(JaasContext.Type contextType, String listenerContextName,
-                            String globalContextName, Map<String, ?> configs) {
-        Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
+                            String globalContextName, Password jaasConfigArgs) {
         if (jaasConfigArgs != null) {
-            if (contextType == JaasContext.Type.SERVER)
-                throw new IllegalArgumentException("JAAS config property not supported for server");
-            else {
-                JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value());
-                AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(globalContextName);
-                int numModules = clientModules == null ? 0 : clientModules.length;
-                if (numModules != 1)
-                    throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be 1 module");
-                return new JaasContext(globalContextName, contextType, jaasConfig);
-            }
+            JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value());
+            AppConfigurationEntry[] contextModules = jaasConfig.getAppConfigurationEntry(globalContextName);
+            if (contextModules == null || contextModules.length == 0)
+                throw new IllegalArgumentException("JAAS config property does not contain any login modules");
+            else if (contextModules.length != 1)
+                throw new IllegalArgumentException("JAAS config property contains " + contextModules.length + " login modules, should be 1 module");
+            return new JaasContext(globalContextName, contextType, jaasConfig);
         } else
             return defaultContext(contextType, listenerContextName, globalContextName);
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
index aa39101..96e7426 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
@@ -16,18 +16,16 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class CredentialCache {
 
-    private final Map<String, Cache<? extends Object>> cacheMap = new HashMap<>();
+    private final ConcurrentHashMap<String, Cache<? extends Object>> cacheMap = new ConcurrentHashMap<>();
 
     public <C> Cache<C> createCache(String mechanism, Class<C> credentialClass) {
         Cache<C> cache = new Cache<C>(credentialClass);
-        cacheMap.put(mechanism, cache);
-        return cache;
+        Cache<C> oldCache = (Cache<C>) cacheMap.putIfAbsent(mechanism, cache);
+        return oldCache == null ? cache : oldCache;
     }
 
     @SuppressWarnings("unchecked")
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index a576e37..dc264c8 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -64,7 +64,7 @@ public class LoginManager {
      * shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more
      * complicated to do the latter without making the consumer API more complex.
      */
-    public static LoginManager acquireLoginManager(JaasContext jaasContext, boolean hasKerberos,
+    public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism, boolean hasKerberos,
                                                    Map<String, ?> configs) throws IOException, LoginException {
         synchronized (LoginManager.class) {
             // SASL_JAAS_CONFIG is only supported by clients
@@ -73,7 +73,7 @@ public class LoginManager {
             if (jaasContext.type() == JaasContext.Type.CLIENT && jaasConfigValue != null) {
                 loginManager = DYNAMIC_INSTANCES.get(jaasConfigValue);
                 if (loginManager == null) {
-                    loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasConfigValue);
+                    loginManager = new LoginManager(jaasContext, saslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM), configs, jaasConfigValue);
                     DYNAMIC_INSTANCES.put(jaasConfigValue, loginManager);
                 }
             } else {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 6fcca57..ca6e9d2 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -101,8 +101,8 @@ public class SaslServerAuthenticator implements Authenticator {
     private final SecurityProtocol securityProtocol;
     private final ListenerName listenerName;
     private final String connectionId;
-    private final JaasContext jaasContext;
-    private final Subject subject;
+    private final Map<String, JaasContext> jaasContexts;
+    private final Map<String, Subject> subjects;
     private final CredentialCache credentialCache;
     private final TransportLayer transportLayer;
     private final Set<String> enabledMechanisms;
@@ -128,19 +128,17 @@ public class SaslServerAuthenticator implements Authenticator {
 
     public SaslServerAuthenticator(Map<String, ?> configs,
                                    String connectionId,
-                                   JaasContext jaasContext,
-                                   Subject subject,
+                                   Map<String, JaasContext> jaasContexts,
+                                   Map<String, Subject> subjects,
                                    KerberosShortNamer kerberosNameParser,
                                    CredentialCache credentialCache,
                                    ListenerName listenerName,
                                    SecurityProtocol securityProtocol,
                                    TransportLayer transportLayer,
                                    DelegationTokenCache tokenCache) throws IOException {
-        if (subject == null)
-            throw new IllegalArgumentException("subject cannot be null");
         this.connectionId = connectionId;
-        this.jaasContext = jaasContext;
-        this.subject = subject;
+        this.jaasContexts = jaasContexts;
+        this.subjects = subjects;
         this.credentialCache = credentialCache;
         this.listenerName = listenerName;
         this.securityProtocol = securityProtocol;
@@ -154,6 +152,12 @@ public class SaslServerAuthenticator implements Authenticator {
         if (enabledMechanisms == null || enabledMechanisms.isEmpty())
             throw new IllegalArgumentException("No SASL mechanisms are enabled");
         this.enabledMechanisms = new HashSet<>(enabledMechanisms);
+        for (String mechanism : enabledMechanisms) {
+            if (!jaasContexts.containsKey(mechanism))
+                throw new IllegalArgumentException("Jaas context not specified for SASL mechanism " + mechanism);
+            if (!subjects.containsKey(mechanism))
+                throw new IllegalArgumentException("Subject cannot be null for SASL mechanism " + mechanism);
+        }
 
         // Note that the old principal builder does not support SASL, so we do not need to pass the
         // authenticator or the transport layer
@@ -162,8 +166,9 @@ public class SaslServerAuthenticator implements Authenticator {
 
     private void createSaslServer(String mechanism) throws IOException {
         this.saslMechanism = mechanism;
+        Subject subject = subjects.get(mechanism);
         if (!ScramMechanism.isScram(mechanism))
-            callbackHandler = new SaslServerCallbackHandler(jaasContext);
+            callbackHandler = new SaslServerCallbackHandler(jaasContexts.get(mechanism));
         else
             callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class), tokenCache);
         callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism);
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
index 8d80542..b4875d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
@@ -76,9 +76,9 @@ public final class ScramCredentialUtils {
         return props;
     }
 
-    public static void createCache(CredentialCache cache, Collection<String> enabledMechanisms) {
+    public static void createCache(CredentialCache cache, Collection<String> mechanisms) {
         for (String mechanism : ScramMechanism.mechanismNames()) {
-            if (enabledMechanisms.contains(mechanism))
+            if (mechanisms.contains(mechanism))
                 cache.createCache(mechanism, ScramCredential.class);
         }
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 0d1fbf9..c54260d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -144,14 +144,13 @@ public class SslFactory implements Reconfigurable {
     }
 
     @Override
-    public boolean validateReconfiguration(Map<String, ?> configs) {
+    public void validateReconfiguration(Map<String, ?> configs) {
         try {
             SecurityStore newKeystore = maybeCreateNewKeystore(configs);
             if (newKeystore != null)
                 createSSLContext(newKeystore);
-            return true;
         } catch (Exception e) {
-            throw new KafkaException("Validation of dynamic config update failed", e);
+            throw new ConfigException("Validation of dynamic config update failed", e);
         }
     }
 
@@ -163,7 +162,7 @@ public class SslFactory implements Reconfigurable {
                 this.sslContext = createSSLContext(newKeystore);
                 this.keystore = newKeystore;
             } catch (Exception e) {
-                throw new KafkaException("Reconfiguration of SSL keystore failed", e);
+                throw new ConfigException("Reconfiguration of SSL keystore failed", e);
             }
         }
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 074df10..071deed 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.config;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.metrics.FakeMetricsReporter;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricsReporter;
@@ -140,6 +141,54 @@ public class AbstractConfigTest {
     }
 
     @Test
+    public void testValuesWithSecondaryPrefix() {
+        String prefix = "listener.name.listener1.";
+        Password saslJaasConfig1 =  new Password("test.myLoginModule1 required;");
+        Password saslJaasConfig2 =  new Password("test.myLoginModule2 required;");
+        Password saslJaasConfig3 =  new Password("test.myLoginModule3 required;");
+        Properties props = new Properties();
+        props.put("listener.name.listener1.test-mechanism.sasl.jaas.config", saslJaasConfig1.value());
+        props.put("test-mechanism.sasl.jaas.config", saslJaasConfig2.value());
+        props.put("sasl.jaas.config", saslJaasConfig3.value());
+        props.put("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2");
+        props.put("listener.name.listener1.gssapi.sasl.kerberos.service.name", "testkafka");
+        props.put("listener.name.listener1.gssapi.sasl.kerberos.min.time.before.relogin", "60000");
+        props.put("ssl.provider", "TEST");
+        TestSecurityConfig config = new TestSecurityConfig(props);
+        Map<String, Object> valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix);
+
+        // prefix with mechanism overrides global
+        assertTrue(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
+        assertTrue(config.unused().contains("test-mechanism.sasl.jaas.config"));
+        assertEquals(saslJaasConfig1, valuesWithPrefixOverride.get("test-mechanism.sasl.jaas.config"));
+        assertEquals(saslJaasConfig3, valuesWithPrefixOverride.get("sasl.jaas.config"));
+        assertFalse(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
+        assertFalse(config.unused().contains("test-mechanism.sasl.jaas.config"));
+        assertFalse(config.unused().contains("sasl.jaas.config"));
+
+        // prefix with mechanism overrides default
+        assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
+        assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd"));
+        assertFalse(config.unused().contains("gssapi.sasl.kerberos.kinit.cmd"));
+        assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
+        assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.kinit.cmd"));
+        assertFalse(config.unused().contains("listener.name.listener1.sasl.kerberos.kinit.cmd"));
+
+        // prefix override for mechanism with no default
+        assertFalse(config.unused().contains("sasl.kerberos.service.name"));
+        assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+        assertFalse(config.unused().contains("gssapi.sasl.kerberos.service.name"));
+        assertFalse(config.unused().contains("sasl.kerberos.service.name"));
+        assertEquals("testkafka", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.service.name"));
+        assertFalse(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+
+        // unset with no default
+        assertTrue(config.unused().contains("ssl.provider"));
+        assertNull(valuesWithPrefixOverride.get("gssapi.ssl.provider"));
+        assertTrue(config.unused().contains("ssl.provider"));
+    }
+
+    @Test
     public void testValuesWithPrefixAllOrNothing() {
         String prefix1 = "prefix1.";
         String prefix2 = "prefix2.";
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index 86d26d4..6072bf5 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -26,9 +26,10 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class SaslChannelBuilderTest {
@@ -37,20 +38,20 @@ public class SaslChannelBuilderTest {
     public void testCloseBeforeConfigureIsIdempotent() {
         SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_PLAINTEXT);
         builder.close();
-        assertNull(builder.loginManager());
+        assertTrue(builder.loginManagers().isEmpty());
         builder.close();
-        assertNull(builder.loginManager());
+        assertTrue(builder.loginManagers().isEmpty());
     }
 
     @Test
     public void testCloseAfterConfigIsIdempotent() {
         SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_PLAINTEXT);
         builder.configure(new HashMap<String, Object>());
-        assertNotNull(builder.loginManager());
+        assertNotNull(builder.loginManagers().get("PLAIN"));
         builder.close();
-        assertNull(builder.loginManager());
+        assertTrue(builder.loginManagers().isEmpty());
         builder.close();
-        assertNull(builder.loginManager());
+        assertTrue(builder.loginManagers().isEmpty());
     }
 
     @Test
@@ -61,17 +62,18 @@ public class SaslChannelBuilderTest {
             builder.configure(Collections.singletonMap(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "1"));
             fail("Exception should have been thrown");
         } catch (KafkaException e) {
-            assertNull(builder.loginManager());
+            assertTrue(builder.loginManagers().isEmpty());
         }
         builder.close();
-        assertNull(builder.loginManager());
+        assertTrue(builder.loginManagers().isEmpty());
     }
 
     private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol) {
         TestJaasConfig jaasConfig = new TestJaasConfig();
         jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
         JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
-        return new SaslChannelBuilder(Mode.CLIENT, jaasContext, securityProtocol, new ListenerName("PLAIN"),
+        Map<String, JaasContext> jaasContexts = Collections.singletonMap("PLAIN", jaasContext);
+        return new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol, new ListenerName("PLAIN"),
                 false, "PLAIN", true, null, null);
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
index 49d5a86..e8535d2 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
@@ -188,8 +188,8 @@ public class JaasContextTest {
                 "KafkaServer { test.LoginModuleDefault required; };",
                 "plaintext.KafkaServer { test.LoginModuleOverride requisite; };"
         ));
-        JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
-                Collections.<String, Object>emptyMap());
+        JaasContext context = JaasContext.loadServerContext(new ListenerName("plaintext"),
+                "SOME-MECHANISM", Collections.<String, Object>emptyMap());
         assertEquals("plaintext.KafkaServer", context.name());
         assertEquals(JaasContext.Type.SERVER, context.type());
         assertEquals(1, context.configurationEntries().size());
@@ -203,8 +203,8 @@ public class JaasContextTest {
                 "KafkaServer { test.LoginModule required; };",
                 "other.KafkaServer { test.LoginModuleOther requisite; };"
         ));
-        JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
-                Collections.<String, Object>emptyMap());
+        JaasContext context = JaasContext.loadServerContext(new ListenerName("plaintext"),
+                "SOME-MECHANISM", Collections.<String, Object>emptyMap());
         assertEquals("KafkaServer", context.name());
         assertEquals(JaasContext.Type.SERVER, context.type());
         assertEquals(1, context.configurationEntries().size());
@@ -215,24 +215,13 @@ public class JaasContextTest {
     @Test(expected = IllegalArgumentException.class)
     public void testLoadForServerWithWrongListenerName() throws IOException {
         writeConfiguration("Server", "test.LoginModule required;");
-        JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
-                Collections.<String, Object>emptyMap());
-    }
-
-    /**
-     * ListenerName can only be used with Type.SERVER.
-     */
-    @Test(expected = IllegalArgumentException.class)
-    public void testLoadForClientWithListenerName() {
-        JaasContext.load(JaasContext.Type.CLIENT, new ListenerName("foo"),
+        JaasContext.loadServerContext(new ListenerName("plaintext"), "SOME-MECHANISM",
                 Collections.<String, Object>emptyMap());
     }
 
     private AppConfigurationEntry configurationEntry(JaasContext.Type contextType, String jaasConfigProp) {
-        Map<String, Object> configs = new HashMap<>();
-        if (jaasConfigProp != null)
-            configs.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfigProp));
-        JaasContext context = JaasContext.load(contextType, null, contextType.name(), configs);
+        Password saslJaasConfig = jaasConfigProp == null ? null : new Password(jaasConfigProp);
+        JaasContext context = JaasContext.load(contextType, null, contextType.name(), saslJaasConfig);
         List<AppConfigurationEntry> entries = context.configurationEntries();
         assertEquals(1, entries.size());
         return entries.get(0);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index e3d6b7a..ef2a075 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -715,7 +715,7 @@ public class SaslAuthenticatorTest {
      * property override is used during authentication.
      */
     @Test
-    public void testDynamicJaasConfiguration() throws Exception {
+    public void testClientDynamicJaasConfiguration() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("PLAIN"));
@@ -756,6 +756,37 @@ public class SaslAuthenticatorTest {
         }
     }
 
+    /**
+     * Tests dynamic JAAS configuration property for SASL server. Invalid server credentials
+     * are set in the static JVM-wide configuration instance to ensure that the dynamic
+     * property override is used during authentication.
+     */
+    @Test
+    public void testServerDynamicJaasConfiguration() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+        saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("PLAIN"));
+        Map<String, Object> serverOptions = new HashMap<>();
+        serverOptions.put("user_user1", "user1-secret");
+        serverOptions.put("user_user2", "user2-secret");
+        saslServerConfigs.put("listener.name.sasl_ssl.plain." + SaslConfigs.SASL_JAAS_CONFIG,
+                TestJaasConfig.jaasConfigProperty("PLAIN", serverOptions));
+        TestJaasConfig staticJaasConfig = new TestJaasConfig();
+        staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(),
+                Collections.<String, Object>emptyMap());
+        staticJaasConfig.setClientOptions("PLAIN", "user1", "user1-secret");
+        Configuration.setConfiguration(staticJaasConfig);
+        server = createEchoServer(securityProtocol);
+
+        // Check that 'user1' can connect with static Jaas config
+        createAndCheckClientConnection(securityProtocol, "1");
+
+        // Check that user 'user2' can also connect with a Jaas config override
+        saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
+                TestJaasConfig.jaasConfigProperty("PLAIN", "user2", "user2-secret"));
+        createAndCheckClientConnection(securityProtocol, "2");
+    }
+
     @Test
     public void testJaasConfigurationForListener() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
@@ -986,18 +1017,19 @@ public class SaslAuthenticatorTest {
             throws Exception {
         final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
         final Map<String, ?> configs = Collections.emptyMap();
-        final JaasContext jaasContext = JaasContext.load(JaasContext.Type.SERVER, listenerName, configs);
+        final JaasContext jaasContext = JaasContext.loadServerContext(listenerName, saslMechanism, configs);
+        final Map<String, JaasContext> jaasContexts = Collections.singletonMap(saslMechanism, jaasContext);
 
         boolean isScram = ScramMechanism.isScram(saslMechanism);
         if (isScram)
             ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
-        SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContext,
+        SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContexts,
                 securityProtocol, listenerName, false, saslMechanism, true, credentialCache, null) {
 
             @Override
             protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, String id,
-                            TransportLayer transportLayer, Subject subject) throws IOException {
-                return new SaslServerAuthenticator(configs, id, jaasContext, subject, null,
+                            TransportLayer transportLayer, Map<String, Subject> subjects) throws IOException {
+                return new SaslServerAuthenticator(configs, id, jaasContexts, subjects, null,
                                 credentialCache, listenerName, securityProtocol, transportLayer, null) {
 
                     @Override
@@ -1032,8 +1064,10 @@ public class SaslAuthenticatorTest {
 
         final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
         final Map<String, ?> configs = Collections.emptyMap();
-        final JaasContext jaasContext = JaasContext.load(JaasContext.Type.CLIENT, null, configs);
-        SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContext,
+        final JaasContext jaasContext = JaasContext.loadClientContext(configs);
+        final Map<String, JaasContext> jaasContexts = Collections.singletonMap(saslMechanism, jaasContext);
+
+        SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts,
                 securityProtocol, listenerName, false, saslMechanism, true, null, null) {
 
             @Override
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 51ea58e..72c2969 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -51,7 +51,7 @@ public class SaslServerAuthenticatorTest {
         TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
         Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
                 Collections.singletonList(SCRAM_SHA_256.mechanismName()));
-        SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer);
+        SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName());
 
         final Capture<ByteBuffer> size = EasyMock.newCapture();
         EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer<Integer>() {
@@ -72,7 +72,7 @@ public class SaslServerAuthenticatorTest {
         TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
         Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
                 Collections.singletonList(SCRAM_SHA_256.mechanismName()));
-        SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer);
+        SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName());
 
         final RequestHeader header = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", 13243);
         final Struct headerStruct = header.toStruct();
@@ -106,12 +106,13 @@ public class SaslServerAuthenticatorTest {
         }
     }
 
-    private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, TransportLayer transportLayer) throws IOException {
+    private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, TransportLayer transportLayer, String mechanism) throws IOException {
         TestJaasConfig jaasConfig = new TestJaasConfig();
         jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
-        JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
-        Subject subject = new Subject();
-        return new SaslServerAuthenticator(configs, "node", jaasContext, subject, null, new CredentialCache(),
+        Map<String, JaasContext> jaasContexts = Collections.singletonMap(mechanism,
+                new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig));
+        Map<String, Subject> subjects = Collections.singletonMap(mechanism, new Subject());
+        return new SaslServerAuthenticator(configs, "node", jaasContexts, subjects, null, new CredentialCache(),
                 new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, new DelegationTokenCache(ScramMechanism.mechanismNames()));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
index 5336fd7..dafa79d 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
@@ -54,6 +54,20 @@ public class TestJaasConfig extends Configuration {
         return new Password(loginModule(mechanism) + " required username=" + username + " password=" + password + ";");
     }
 
+    public static Password jaasConfigProperty(String mechanism, Map<String, Object> options) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(loginModule(mechanism));
+        builder.append(" required");
+        for (Map.Entry<String, Object> option : options.entrySet()) {
+            builder.append(' ');
+            builder.append(option.getKey());
+            builder.append('=');
+            builder.append(option.getValue());
+        }
+        builder.append(';');
+        return new Password(builder.toString());
+    }
+
     public void setClientOptions(String saslMechanism, String clientUsername, String clientPassword) {
         Map<String, Object> options = new HashMap<>();
         if (clientUsername != null)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a8e1249..f36fc79 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -54,11 +54,13 @@ object KafkaController extends Logging {
 
 }
 
-class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, brokerInfo: BrokerInfo,
+class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, initialBrokerInfo: BrokerInfo,
                       tokenManager: DelegationTokenManager, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
 
   this.logIdent = s"[Controller id=${config.brokerId}] "
 
+  @volatile private var brokerInfo = initialBrokerInfo
+
   private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
   val controllerContext = new ControllerContext
 
@@ -77,6 +79,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
   private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
+  private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty
   private val topicChangeHandler = new TopicChangeHandler(this, eventManager)
   private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager)
   private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
@@ -274,6 +277,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
     zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
     zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
+    unregisterBrokerModificationsHandler(brokerModificationsHandlers.keySet)
 
     // reset topic deletion manager
     topicDeletionManager.reset()
@@ -360,6 +364,23 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
         s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics")
       topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
     }
+    registerBrokerModificationsHandler(newBrokers)
+  }
+
+  private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {
+    debug(s"Register BrokerModifications handler for $brokerIds")
+    brokerIds.foreach { brokerId =>
+      val brokerModificationsHandler = new BrokerModificationsHandler(this, eventManager, brokerId)
+      zkClient.registerZNodeChangeHandler(brokerModificationsHandler)
+      brokerModificationsHandlers.put(brokerId, brokerModificationsHandler)
+    }
+  }
+
+  private def unregisterBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {
+    debug(s"Unregister BrokerModifications handler for $brokerIds")
+    brokerIds.foreach { brokerId =>
+      brokerModificationsHandlers.remove(brokerId).foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path))
+    }
   }
 
   /*
@@ -374,6 +395,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     info(s"Removed $deadBrokersThatWereShuttingDown from list of shutting down brokers.")
     val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)
     onReplicasBecomeOffline(allReplicasOnDeadBrokers)
+
+    unregisterBrokerModificationsHandler(deadBrokers)
+  }
+
+  private def onBrokerUpdate(updatedBrokers: Seq[Int]) {
+    info(s"Broker info update callback for ${updatedBrokers.mkString(",")}")
+    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
   }
 
   /**
@@ -613,6 +641,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
     controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicPartition, LeaderIsrAndControllerEpoch]
     controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
+    // register broker modifications handlers
+    registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id))
     // update the leader and isr cache for all existing partitions from Zookeeper
     updateLeaderAndIsrCache()
     // start the channel manager
@@ -1209,6 +1239,29 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   }
 
+  case object BrokerModifications extends ControllerEvent {
+    override def state: ControllerState = ControllerState.BrokerChange
+
+    override def process(): Unit = {
+      if (!isActive) return
+      val curBrokers = zkClient.getAllBrokersInCluster.toSet
+      val updatedBrokers = controllerContext.liveBrokers.filter { broker =>
+        val existingBroker = curBrokers.find(_.id == broker.id)
+        existingBroker match {
+          case Some(b) => broker.endPoints != b.endPoints
+          case None => false
+        }
+      }
+      if (updatedBrokers.nonEmpty) {
+        val updatedBrokerIdsSorted = updatedBrokers.map(_.id).toSeq.sorted
+        info(s"Updated brokers: $updatedBrokers")
+
+        controllerContext.liveBrokers = curBrokers // Update broker metadata
+        onBrokerUpdate(updatedBrokerIdsSorted)
+      }
+    }
+  }
+
   case object TopicChange extends ControllerEvent {
     override def state: ControllerState = ControllerState.TopicChange
 
@@ -1458,7 +1511,17 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 class BrokerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
   override val path: String = BrokerIdsZNode.path
 
-  override def handleChildChange(): Unit = eventManager.put(controller.BrokerChange)
+  override def handleChildChange(): Unit = {
+    eventManager.put(controller.BrokerChange)
+  }
+}
+
+class BrokerModificationsHandler(controller: KafkaController, eventManager: ControllerEventManager, brokerId: Int) extends ZNodeChangeHandler {
+  override val path: String = BrokerIdZNode.path(brokerId)
+
+  override def handleDataChange(): Unit = {
+    eventManager.put(controller.BrokerModifications)
+  }
 }
 
 class TopicChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index f6ec974..c11ebcb 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -54,7 +54,6 @@ import scala.util.control.ControlThrowable
  */
 class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
 
-  private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap
   private val maxQueuedRequests = config.queuedMaxRequests
 
   private val maxConnectionsPerIp = config.maxConnectionsPerIp
@@ -72,7 +71,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   private val processors = new ConcurrentHashMap[Int, Processor]()
   private var nextProcessorId = 0
 
-  private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
+  private[network] val acceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
   private var connectionQuotas: ConnectionQuotas = _
   private var stoppedProcessingRequests = false
 
@@ -82,7 +81,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   def startup() {
     this.synchronized {
       connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
-      createProcessors(config.numNetworkThreads)
+      createProcessors(config.numNetworkThreads, config.listeners)
     }
 
     newGauge("NetworkProcessorAvgIdlePercent",
@@ -111,14 +110,17 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
     info("Started " + acceptors.size + " acceptor threads")
   }
 
-  private def createProcessors(newProcessorsPerListener: Int): Unit = synchronized {
+  private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
+
+  private def createProcessors(newProcessorsPerListener: Int,
+                               endpoints: Seq[EndPoint]): Unit = synchronized {
 
     val sendBufferSize = config.socketSendBufferBytes
     val recvBufferSize = config.socketReceiveBufferBytes
     val brokerId = config.brokerId
 
     val numProcessorThreads = config.numNetworkThreads
-    config.listeners.foreach { endpoint =>
+    endpoints.foreach { endpoint =>
       val listenerName = endpoint.listenerName
       val securityProtocol = endpoint.securityProtocol
       val listenerProcessors = new ArrayBuffer[Processor]()
@@ -130,12 +132,10 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
       }
       listenerProcessors.foreach(p => processors.put(p.id, p))
 
-      val acceptor = acceptors.getOrElseUpdate(endpoint, {
-        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
-        KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
-        acceptor.awaitStartup()
-        acceptor
-      })
+      val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
+      KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
+      acceptor.awaitStartup()
+      acceptors.put(endpoint, acceptor)
       acceptor.addProcessors(listenerProcessors)
     }
   }
@@ -153,7 +153,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   def stopProcessingRequests() = {
     info("Stopping socket server request processors")
     this.synchronized {
-      acceptors.values.foreach(_.shutdown)
+      acceptors.asScala.values.foreach(_.shutdown)
       processors.asScala.values.foreach(_.shutdown)
       requestChannel.clear()
       stoppedProcessingRequests = true
@@ -161,12 +161,12 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
     info("Stopped socket server request processors")
   }
 
-  def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = {
+  def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized {
     info(s"Resizing network thread pool size for each listener from $oldNumNetworkThreads to $newNumNetworkThreads")
     if (newNumNetworkThreads > oldNumNetworkThreads)
-      createProcessors(newNumNetworkThreads - oldNumNetworkThreads)
+      createProcessors(newNumNetworkThreads - oldNumNetworkThreads, config.listeners)
     else if (newNumNetworkThreads < oldNumNetworkThreads)
-      acceptors.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel))
+      acceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel))
   }
 
   /**
@@ -185,9 +185,22 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
 
   def boundPort(listenerName: ListenerName): Int = {
     try {
-      acceptors(endpoints(listenerName)).serverChannel.socket.getLocalPort
+      acceptors.get(endpoints(listenerName)).serverChannel.socket.getLocalPort
     } catch {
-      case e: Exception => throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e)
+      case e: Exception =>
+        throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e)
+    }
+  }
+
+  def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
+    info(s"Adding listeners for endpoints $listenersAdded")
+    createProcessors(config.numNetworkThreads, listenersAdded)
+  }
+
+  def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
+    info(s"Removing listeners for endpoints $listenersRemoved")
+    listenersRemoved.foreach { endpoint =>
+      acceptors.asScala.remove(endpoint).foreach(_.shutdown())
     }
   }
 
@@ -239,8 +252,8 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
    * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
    */
   def shutdown(): Unit = {
-    alive.set(false)
-    wakeup()
+    if (alive.getAndSet(false))
+      wakeup()
     shutdownLatch.await()
   }
 
@@ -312,6 +325,11 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
     toRemove.foreach(processor => requestChannel.removeProcessor(processor.id))
   }
 
+  override def shutdown(): Unit = {
+    super.shutdown()
+    processors.foreach(_.shutdown())
+  }
+
   /**
    * Accept loop that checks for new connection attempts
    */
diff --git a/core/src/main/scala/kafka/security/CredentialProvider.scala b/core/src/main/scala/kafka/security/CredentialProvider.scala
index 120e8f9..0e7ebb6 100644
--- a/core/src/main/scala/kafka/security/CredentialProvider.scala
+++ b/core/src/main/scala/kafka/security/CredentialProvider.scala
@@ -17,7 +17,7 @@
 
 package kafka.security
 
-import java.util.{List, Properties}
+import java.util.{Collection, Properties}
 
 import org.apache.kafka.common.security.authenticator.CredentialCache
 import org.apache.kafka.common.security.scram.{ScramCredential, ScramCredentialUtils, ScramMechanism}
@@ -25,10 +25,10 @@ import org.apache.kafka.common.config.ConfigDef
 import org.apache.kafka.common.config.ConfigDef._
 import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
 
-class CredentialProvider(saslEnabledMechanisms: List[String], val tokenCache: DelegationTokenCache) {
+class CredentialProvider(scramMechanisms: Collection[String], val tokenCache: DelegationTokenCache) {
 
   val credentialCache = new CredentialCache
-  ScramCredentialUtils.createCache(credentialCache, saslEnabledMechanisms)
+  ScramCredentialUtils.createCache(credentialCache, scramMechanisms)
 
   def updateCredentials(username: String, config: Properties) {
     for (mechanism <- ScramMechanism.values()) {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 9c85f9b..a95de0a 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -17,20 +17,22 @@
 
 package kafka.server
 
-import java.nio.charset.StandardCharsets
 import java.util
 import java.util.{Collections, Properties}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
+import kafka.cluster.EndPoint
 import kafka.log.{LogCleaner, LogConfig, LogManager}
 import kafka.server.DynamicBrokerConfig._
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.{ConfigDef, ConfigException, SslConfigs}
-import org.apache.kafka.common.network.ListenerReconfigurable
 import org.apache.kafka.common.metrics.MetricsReporter
-import org.apache.kafka.common.utils.{Base64, Utils}
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
+import org.apache.kafka.common.security.authenticator.LoginManager
+import org.apache.kafka.common.utils.Utils
 
 import scala.collection._
 import scala.collection.JavaConverters._
@@ -71,11 +73,7 @@ import scala.collection.JavaConverters._
   */
 object DynamicBrokerConfig {
 
-  private val DynamicPasswordConfigs = Set(
-    SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
-    SslConfigs.SSL_KEY_PASSWORD_CONFIG
-  )
-  private val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
+  private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
 
   val AllDynamicConfigs = mutable.Set[String]()
   AllDynamicConfigs ++= DynamicSecurityConfigs
@@ -83,11 +81,17 @@ object DynamicBrokerConfig {
   AllDynamicConfigs ++= DynamicLogConfig.ReconfigurableConfigs
   AllDynamicConfigs ++= DynamicThreadPool.ReconfigurableConfigs
   AllDynamicConfigs ++= Set(KafkaConfig.MetricReporterClassesProp)
+  AllDynamicConfigs ++= DynamicListenerConfig.ReconfigurableConfigs
 
-  private val PerBrokerConfigs = DynamicSecurityConfigs
+  private val PerBrokerConfigs = DynamicSecurityConfigs  ++
+    DynamicListenerConfig.ReconfigurableConfigs
 
   val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
 
+  private[server] val DynamicPasswordConfigs = {
+    val passwordConfigs = KafkaConfig.configKeys.filter(_._2.`type` == ConfigDef.Type.PASSWORD).keySet
+    AllDynamicConfigs.intersect(passwordConfigs)
+  }
 
   def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = {
     name match {
@@ -123,11 +127,14 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
   private var currentConfig = kafkaConfig
+  private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
 
   private[server] def initialize(zkClient: KafkaZkClient): Unit = {
     val adminZkClient = new AdminZkClient(zkClient)
     updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
-    updateBrokerConfig(kafkaConfig.brokerId, adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString))
+    val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)
+    val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
+    updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
   }
 
   def addReconfigurables(kafkaServer: KafkaServer): Unit = {
@@ -136,6 +143,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       addBrokerReconfigurable(kafkaServer.logManager.cleaner)
     addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
     addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
+    addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
   }
 
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
@@ -187,22 +195,37 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     }
   }
 
+  private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
+   secret.map { secret =>
+      new PasswordEncoder(secret,
+        kafkaConfig.passwordEncoderKeyFactoryAlgorithm,
+        kafkaConfig.passwordEncoderCipherAlgorithm,
+        kafkaConfig.passwordEncoderKeyLength,
+        kafkaConfig.passwordEncoderIterations)
+    }
+  }
+
+  private def passwordEncoder: PasswordEncoder = {
+    dynamicConfigPasswordEncoder.getOrElse(throw new ConfigException("Password encoder secret not configured"))
+  }
+
   private[server] def toPersistentProps(configProps: Properties, perBrokerConfig: Boolean): Properties = {
     val props = configProps.clone().asInstanceOf[Properties]
-    // TODO (KAFKA-6246): encrypt passwords
+
     def encodePassword(configName: String): Unit = {
       val value = props.getProperty(configName)
       if (value != null) {
         if (!perBrokerConfig)
           throw new ConfigException("Password config can be defined only at broker level")
-        props.setProperty(configName, Base64.encoder.encodeToString(value.getBytes(StandardCharsets.UTF_8)))
+        props.setProperty(configName, passwordEncoder.encode(new Password(value)))
       }
     }
     DynamicPasswordConfigs.foreach(encodePassword)
     props
   }
 
-  private[server] def fromPersistentProps(persistentProps: Properties, perBrokerConfig: Boolean): Properties = {
+  private[server] def fromPersistentProps(persistentProps: Properties,
+                                          perBrokerConfig: Boolean): Properties = {
     val props = persistentProps.clone().asInstanceOf[Properties]
 
     // Remove all invalid configs from `props`
@@ -219,17 +242,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     if (!perBrokerConfig)
       removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored")
 
-    // TODO (KAFKA-6246): encrypt passwords
     def decodePassword(configName: String): Unit = {
       val value = props.getProperty(configName)
       if (value != null) {
-        props.setProperty(configName, new String(Base64.decoder.decode(value), StandardCharsets.UTF_8))
+        try {
+          props.setProperty(configName, passwordEncoder.decode(value).value)
+        } catch {
+          case e: Exception =>
+            error(s"Dynamic password config $configName could not be decoded, ignoring.", e)
+            props.remove(configName)
+        }
       }
     }
+
     DynamicPasswordConfigs.foreach(decodePassword)
     props
   }
 
+  // If the secret has changed, password.encoder.old.secret contains the old secret that was used
+  // to encode the configs in ZK. Decode passwords using the old secret and update ZK with values
+  // encoded using the current secret. Ignore any errors during decoding since old secret may not
+  // have been removed during broker restart.
+  private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = {
+    val props = persistentProps.clone().asInstanceOf[Properties]
+    if (!props.asScala.keySet.exists(DynamicPasswordConfigs.contains)) {
+      maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder =>
+        DynamicPasswordConfigs.foreach { configName =>
+          val value = props.getProperty(configName)
+          if (value != null) {
+            val decoded = try {
+              Some(passwordDecoder.decode(value).value)
+            } catch {
+              case _: Exception =>
+                debug(s"Dynamic password config $configName could not be decoded using old secret, new secret will be used.")
+                None
+            }
+            decoded.foreach { value => props.put(configName, passwordEncoder.encode(new Password(value))) }
+          }
+        }
+        adminZkClient.changeBrokerConfig(Seq(kafkaConfig.brokerId), props)
+      }
+    }
+    props
+  }
+
   private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) {
     def checkInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
       if (invalidPropNames.nonEmpty)
@@ -331,14 +387,18 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     newProps ++= staticBrokerConfigs
     overrideProps(newProps, dynamicDefaultConfigs)
     overrideProps(newProps, dynamicBrokerConfigs)
-    val newConfig = processReconfiguration(newProps, validateOnly = false)
+    val oldConfig = currentConfig
+    val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false)
     if (newConfig ne currentConfig) {
       currentConfig = newConfig
-      kafkaConfig.updateCurrentConfig(currentConfig)
+      kafkaConfig.updateCurrentConfig(newConfig)
+
+      // Process BrokerReconfigurable updates after current config is updated
+      brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig))
     }
   }
 
-  private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean): KafkaConfig = {
+  private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean): (KafkaConfig, List[BrokerReconfigurable]) = {
     val newConfig = new KafkaConfig(newProps.asJava, !validateOnly, None)
     val updatedMap = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals)
     if (updatedMap.nonEmpty) {
@@ -352,16 +412,22 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
             val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
             val updatedKeys = updatedConfigs(newValues, oldValues).keySet
             if (needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys))
-              processReconfigurable(listenerReconfigurable, newValues, customConfigs, validateOnly)
+              processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
           case reconfigurable =>
             if (needsReconfiguration(reconfigurable.reconfigurableConfigs, updatedMap.keySet))
-              processReconfigurable(reconfigurable, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
+              processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
         }
+
+        // BrokerReconfigurable updates are processed after config is updated. Only do the validation here.
+        val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]()
         brokerReconfigurables.foreach { reconfigurable =>
-          if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet))
-            processBrokerReconfigurable(reconfigurable, currentConfig, newConfig, validateOnly)
+          if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet)) {
+            reconfigurable.validateReconfiguration(newConfig)
+            if (!validateOnly)
+              brokerReconfigurablesToUpdate += reconfigurable
+          }
         }
-        newConfig
+        (newConfig, brokerReconfigurablesToUpdate.toList)
       } catch {
         case e: Exception =>
           if (!validateOnly)
@@ -370,7 +436,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
     }
     else
-      currentConfig
+      (currentConfig, List.empty)
   }
 
   private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys: Set[String]): Boolean = {
@@ -378,27 +444,25 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   }
 
   private def processReconfigurable(reconfigurable: Reconfigurable,
+                                    updatedConfigNames: Set[String],
                                     allNewConfigs: util.Map[String, _],
                                     newCustomConfigs: util.Map[String, Object],
                                     validateOnly: Boolean): Unit = {
     val newConfigs = new util.HashMap[String, Object]
     allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) }
     newConfigs.putAll(newCustomConfigs)
-    if (validateOnly) {
-      if (!reconfigurable.validateReconfiguration(newConfigs))
-        throw new ConfigException("Validation of dynamic config update failed")
-    } else
-      reconfigurable.reconfigure(newConfigs)
-  }
+    try {
+      reconfigurable.validateReconfiguration(newConfigs)
+    } catch {
+      case e: ConfigException => throw e
+      case _: Exception =>
+        throw new ConfigException(s"Validation of dynamic config update of $updatedConfigNames failed with class ${reconfigurable.getClass}")
+    }
 
-  private def processBrokerReconfigurable(reconfigurable: BrokerReconfigurable,
-                                          oldConfig: KafkaConfig,
-                                          newConfig: KafkaConfig,
-                                          validateOnly: Boolean): Unit = {
-    if (validateOnly)
-      reconfigurable.validateReconfiguration(newConfig)
-    else
-      reconfigurable.reconfigure(oldConfig, newConfig)
+    if (!validateOnly) {
+      info(s"Reconfiguring $reconfigurable, updated configs: $updatedConfigNames custom configs: $newCustomConfigs")
+      reconfigurable.reconfigure(newConfigs)
+    }
   }
 }
 
@@ -427,11 +491,10 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi
     DynamicLogConfig.ReconfigurableConfigs.asJava
   }
 
-  override def validateReconfiguration(configs: util.Map[String, _]): Boolean = {
+  override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
     // For update of topic config overrides, only config names and types are validated
     // Names and types have already been validated. For consistency with topic config
     // validation, no additional validation is performed.
-    true
   }
 
   override def reconfigure(configs: util.Map[String, _]): Unit = {
@@ -538,7 +601,7 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
     configs
   }
 
-  override def validateReconfiguration(configs: util.Map[String, _]): Boolean = {
+  override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
     val updatedMetricsReporters = metricsReporterClasses(configs)
 
     // Ensure all the reporter classes can be loaded and have a default constructor
@@ -548,10 +611,11 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
     }
 
     // Validate the new configuration using every reconfigurable reporter instance that is not being deleted
-    currentReporters.values.forall {
+    currentReporters.values.foreach {
       case reporter: Reconfigurable =>
-        !updatedMetricsReporters.contains(reporter.getClass.getName) || reporter.validateReconfiguration(configs)
-      case _ => true
+        if (updatedMetricsReporters.contains(reporter.getClass.getName))
+          reporter.validateReconfiguration(configs)
+      case _ =>
     }
   }
 
@@ -588,3 +652,103 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
     configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
   }
 }
+object DynamicListenerConfig {
+
+  val ReconfigurableConfigs = Set(
+    // Listener configs
+    KafkaConfig.AdvertisedListenersProp,
+    KafkaConfig.ListenersProp,
+    KafkaConfig.ListenerSecurityProtocolMapProp,
+
+    // SSL configs
+    KafkaConfig.PrincipalBuilderClassProp,
+    KafkaConfig.SslProtocolProp,
+    KafkaConfig.SslProviderProp,
+    KafkaConfig.SslCipherSuitesProp,
+    KafkaConfig.SslEnabledProtocolsProp,
+    KafkaConfig.SslKeystoreTypeProp,
+    KafkaConfig.SslKeystoreLocationProp,
+    KafkaConfig.SslKeystorePasswordProp,
+    KafkaConfig.SslKeyPasswordProp,
+    KafkaConfig.SslTruststoreTypeProp,
+    KafkaConfig.SslTruststoreLocationProp,
+    KafkaConfig.SslTruststorePasswordProp,
+    KafkaConfig.SslKeyManagerAlgorithmProp,
+    KafkaConfig.SslTrustManagerAlgorithmProp,
+    KafkaConfig.SslEndpointIdentificationAlgorithmProp,
+    KafkaConfig.SslSecureRandomImplementationProp,
+    KafkaConfig.SslClientAuthProp,
+
+    // SASL configs
+    KafkaConfig.SaslMechanismInterBrokerProtocolProp,
+    KafkaConfig.SaslJaasConfigProp,
+    KafkaConfig.SaslEnabledMechanismsProp,
+    KafkaConfig.SaslKerberosServiceNameProp,
+    KafkaConfig.SaslKerberosKinitCmdProp,
+    KafkaConfig.SaslKerberosTicketRenewWindowFactorProp,
+    KafkaConfig.SaslKerberosTicketRenewJitterProp,
+    KafkaConfig.SaslKerberosMinTimeBeforeReloginProp,
+    KafkaConfig.SaslKerberosPrincipalToLocalRulesProp
+  )
+}
+
+class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable with Logging {
+
+  override def reconfigurableConfigs: Set[String] = {
+    DynamicListenerConfig.ReconfigurableConfigs
+  }
+
+  def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+
+    def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = {
+      newConfig.originals.asScala
+        .filterKeys(_.startsWith(prefix))
+        .filterKeys(k => !DynamicSecurityConfigs.contains(k))
+    }
+
+    val oldConfig = server.config
+    val newListeners = listenersToMap(newConfig.listeners)
+    val newAdvertisedListeners = listenersToMap(newConfig.advertisedListeners)
+    val oldListeners = listenersToMap(oldConfig.listeners)
+    if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
+      throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
+    if (newListeners.keySet != newConfig.listenerSecurityProtocolMap.keySet)
+      throw new ConfigException(s"Listeners '$newListeners' and listener map '${newConfig.listenerSecurityProtocolMap}' don't match")
+    newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
+      val prefix = listenerName.configPrefix
+      val newListenerProps = immutableListenerConfigs(newConfig, prefix)
+      val oldListenerProps = immutableListenerConfigs(oldConfig, prefix)
+      if (newListenerProps != oldListenerProps)
+        throw new ConfigException(s"Configs cannot be updated dynamically for existing listener $listenerName, " +
+          "restart broker or create a new listener for update")
+      if (oldConfig.listenerSecurityProtocolMap(listenerName) != newConfig.listenerSecurityProtocolMap(listenerName))
+        throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName")
+    }
+    if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName))
+      throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}")
+  }
+
+  def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
+    val newListeners = newConfig.listeners
+    val newListenerMap = listenersToMap(newListeners)
+    val oldListeners = oldConfig.listeners
+    val oldListenerMap = listenersToMap(oldListeners)
+    val listenersRemoved = oldListeners.filterNot(e => newListenerMap.contains(e.listenerName))
+    val listenersAdded = newListeners.filterNot(e => oldListenerMap.contains(e.listenerName))
+
+    // Clear SASL login cache to force re-login
+    if (listenersAdded.nonEmpty || listenersRemoved.nonEmpty)
+      LoginManager.closeAll()
+
+    server.socketServer.removeListeners(listenersRemoved)
+    if (listenersAdded.nonEmpty)
+      server.socketServer.addListeners(listenersAdded)
+
+    server.zkClient.updateBrokerInfoInZk(server.createBrokerInfo)
+  }
+
+  private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] =
+    listeners.map(e => (e.listenerName, e)).toMap
+
+}
+
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2dd6951..1f448af 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1291,7 +1291,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleSaslHandshakeRequest(request: RequestChannel.Request) {
-    sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms))
+    sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, Collections.emptySet()))
   }
 
   def handleSaslAuthenticateRequest(request: RequestChannel.Request) {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 144dd65..64698f7 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -223,6 +223,11 @@ object Defaults {
   val DelegationTokenMaxLifeTimeMsDefault = 7 * 24 * 60 * 60 * 1000L
   val DelegationTokenExpiryTimeMsDefault = 24 * 60 * 60 * 1000L
   val DelegationTokenExpiryCheckIntervalMsDefault = 1 * 60 * 60 * 1000L
+
+  /** ********* Password encryption configuration for dynamic configs *********/
+  val PasswordEncoderCipherAlgorithm = "AES/CBC/PKCS5Padding"
+  val PasswordEncoderKeyLength = 128
+  val PasswordEncoderIterations = 4096
 }
 
 object KafkaConfig {
@@ -412,6 +417,7 @@ object KafkaConfig {
 
   /** ********* SASL Configuration ****************/
   val SaslMechanismInterBrokerProtocolProp = "sasl.mechanism.inter.broker.protocol"
+  val SaslJaasConfigProp = SaslConfigs.SASL_JAAS_CONFIG
   val SaslEnabledMechanismsProp = BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG
   val SaslKerberosServiceNameProp = SaslConfigs.SASL_KERBEROS_SERVICE_NAME
   val SaslKerberosKinitCmdProp = SaslConfigs.SASL_KERBEROS_KINIT_CMD
@@ -426,6 +432,14 @@ object KafkaConfig {
   val DelegationTokenExpiryTimeMsProp = "delegation.token.expiry.time.ms"
   val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms"
 
+  /** ********* Password encryption configuration for dynamic configs *********/
+  val PasswordEncoderSecretProp = "password.encoder.secret"
+  val PasswordEncoderOldSecretProp = "password.encoder.old.secret"
+  val PasswordEncoderKeyFactoryAlgorithmProp = "password.encoder.keyfactory.algorithm"
+  val PasswordEncoderCipherAlgorithmProp = "password.encoder.cipher.algorithm"
+  val PasswordEncoderKeyLengthProp =  "password.encoder.key.length"
+  val PasswordEncoderIterationsProp =  "password.encoder.iterations"
+
   /* Documentation */
   /** ********* Zookeeper Configuration ***********/
   val ZkConnectDoc = "Zookeeper host string"
@@ -689,6 +703,7 @@ object KafkaConfig {
 
   /** ********* Sasl Configuration ****************/
   val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for inter-broker communication. Default is GSSAPI."
+  val SaslJaasConfigDoc = SaslConfigs.SASL_JAAS_CONFIG_DOC
   val SaslEnabledMechanismsDoc = SaslConfigs.SASL_ENABLED_MECHANISMS_DOC
   val SaslKerberosServiceNameDoc = SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC
   val SaslKerberosKinitCmdDoc = SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC
@@ -704,6 +719,16 @@ object KafkaConfig {
   val DelegationTokenExpiryTimeMsDoc = "The token validity time in seconds before the token needs to be renewed. Default value 1 day."
   val DelegationTokenExpiryCheckIntervalDoc = "Scan interval to remove expired delegation tokens."
 
+  /** ********* Password encryption configuration for dynamic configs *********/
+  val PasswordEncoderSecretDoc = "The secret used for encoding dynamically configured passwords for this broker."
+  val PasswordEncoderOldSecretDoc = "The old secret that was used for encoding dynamically configured passwords. " +
+    "This is required only when the secret is updated. If specified, all dynamically encoded passwords are " +
+    s"decoded using this old secret and re-encoded using $PasswordEncoderSecretProp when broker starts up."
+  val PasswordEncoderKeyFactoryAlgorithmDoc = "The SecretKeyFactory algorithm used for encoding dynamically configured passwords. " +
+    "Default is PBKDF2WithHmacSHA512 if available and PBKDF2WithHmacSHA1 otherwise."
+  val PasswordEncoderCipherAlgorithmDoc = "The Cipher algorithm used for encoding dynamically configured passwords."
+  val PasswordEncoderKeyLengthDoc =  "The key length used for encoding dynamically configured passwords."
+  val PasswordEncoderIterationsDoc =  "The iteration count used for encoding dynamically configured passwords."
 
   private val configDef = {
     import ConfigDef.Importance._
@@ -898,6 +923,7 @@ object KafkaConfig {
 
       /** ********* Sasl Configuration ****************/
       .define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc)
+      .define(SaslJaasConfigProp, PASSWORD, null, MEDIUM, SaslJaasConfigDoc)
       .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, MEDIUM, SaslEnabledMechanismsDoc)
       .define(SaslKerberosServiceNameProp, STRING, null, MEDIUM, SaslKerberosServiceNameDoc)
       .define(SaslKerberosKinitCmdProp, STRING, Defaults.SaslKerberosKinitCmd, MEDIUM, SaslKerberosKinitCmdDoc)
@@ -911,6 +937,13 @@ object KafkaConfig {
       .define(DelegationTokenExpiryTimeMsProp, LONG, Defaults.DelegationTokenExpiryTimeMsDefault, atLeast(1), MEDIUM, DelegationTokenExpiryTimeMsDoc)
       .define(DelegationTokenExpiryCheckIntervalMsProp, LONG, Defaults.DelegationTokenExpiryCheckIntervalMsDefault, atLeast(1), LOW, DelegationTokenExpiryCheckIntervalDoc)
 
+      /** ********* Password encryption configuration for dynamic configs *********/
+      .define(PasswordEncoderSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderSecretDoc)
+      .define(PasswordEncoderOldSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderOldSecretDoc)
+      .define(PasswordEncoderKeyFactoryAlgorithmProp, STRING, null, LOW, PasswordEncoderKeyFactoryAlgorithmDoc)
+      .define(PasswordEncoderCipherAlgorithmProp, STRING, Defaults.PasswordEncoderCipherAlgorithm, LOW, PasswordEncoderCipherAlgorithmDoc)
+      .define(PasswordEncoderKeyLengthProp, INT, Defaults.PasswordEncoderKeyLength, atLeast(8), LOW, PasswordEncoderKeyLengthDoc)
+      .define(PasswordEncoderIterationsProp, INT, Defaults.PasswordEncoderIterations, atLeast(1024), LOW, PasswordEncoderIterationsDoc)
   }
 
   def configNames() = configDef.names().asScala.toList.sorted
@@ -942,9 +975,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
 
   def this(props: java.util.Map[_, _]) = this(props, true, None)
   def this(props: java.util.Map[_, _], doLog: Boolean) = this(props, doLog, None)
-  private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))
   // Cache the current config to avoid acquiring read lock to access from dynamicConfig
   @volatile private var currentConfig = this
+  private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))
 
   private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = {
     this.currentConfig = newConfig
@@ -1076,8 +1109,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
   def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
 
-  val (interBrokerListenerName, interBrokerSecurityProtocol) = getInterBrokerListenerNameAndSecurityProtocol
-
   // We keep the user-provided String as `ApiVersion.apply` can choose a slightly different version (eg if `0.10.0`
   // is passed, `0.10.0-IV0` may be picked)
   val interBrokerProtocolVersionString = getString(KafkaConfig.InterBrokerProtocolVersionProp)
@@ -1120,32 +1151,21 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
   val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp)
 
-  /** ********* SSL Configuration **************/
-  val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp)
-  val sslProtocol = getString(KafkaConfig.SslProtocolProp)
-  val sslProvider = getString(KafkaConfig.SslProviderProp)
-  val sslEnabledProtocols = getList(KafkaConfig.SslEnabledProtocolsProp)
-  def sslKeystoreType = getString(KafkaConfig.SslKeystoreTypeProp)
-  def sslKeystoreLocation = getString(KafkaConfig.SslKeystoreLocationProp)
-  def sslKeystorePassword = getPassword(KafkaConfig.SslKeystorePasswordProp)
-  def sslKeyPassword = getPassword(KafkaConfig.SslKeyPasswordProp)
-  val sslTruststoreType = getString(KafkaConfig.SslTruststoreTypeProp)
-  val sslTruststoreLocation = getString(KafkaConfig.SslTruststoreLocationProp)
-  val sslTruststorePassword = getPassword(KafkaConfig.SslTruststorePasswordProp)
-  val sslKeyManagerAlgorithm = getString(KafkaConfig.SslKeyManagerAlgorithmProp)
-  val sslTrustManagerAlgorithm = getString(KafkaConfig.SslTrustManagerAlgorithmProp)
-  val sslClientAuth = getString(KafkaConfig.SslClientAuthProp)
-  val sslCipher = getList(KafkaConfig.SslCipherSuitesProp)
-
-  /** ********* Sasl Configuration **************/
-  val saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
-  val saslEnabledMechanisms = getList(KafkaConfig.SaslEnabledMechanismsProp)
-  val saslKerberosServiceName = getString(KafkaConfig.SaslKerberosServiceNameProp)
-  val saslKerberosKinitCmd = getString(KafkaConfig.SaslKerberosKinitCmdProp)
-  val saslKerberosTicketRenewWindowFactor = getDouble(KafkaConfig.SaslKerberosTicketRenewWindowFactorProp)
-  val saslKerberosTicketRenewJitter = getDouble(KafkaConfig.SaslKerberosTicketRenewJitterProp)
-  val saslKerberosMinTimeBeforeRelogin = getLong(KafkaConfig.SaslKerberosMinTimeBeforeReloginProp)
-  val saslKerberosPrincipalToLocalRules = getList(KafkaConfig.SaslKerberosPrincipalToLocalRulesProp)
+  /** ********* SSL/SASL Configuration **************/
+  // Security configs may be overridden for listeners, so it is not safe to use the base values
+  // Hence the base SSL/SASL configs are not fields of KafkaConfig, listener configs should be
+  // retrieved using KafkaConfig#valuesWithPrefixOverride
+  private def saslEnabledMechanisms(listenerName: ListenerName): Set[String] = {
+    val value = valuesWithPrefixOverride(listenerName.configPrefix).get(KafkaConfig.SaslEnabledMechanismsProp)
+    if (value != null)
+      value.asInstanceOf[util.List[String]].asScala.toSet
+    else
+      Set.empty[String]
+  }
+
+  def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1
+  def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2
+  def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
   val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1
 
   /** ********* DelegationToken Configuration **************/
@@ -1155,6 +1175,14 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   val delegationTokenExpiryTimeMs = getLong(KafkaConfig.DelegationTokenExpiryTimeMsProp)
   val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp)
 
+  /** ********* Password encryption configuration for dynamic configs *********/
+  def passwordEncoderSecret = Option(getPassword(KafkaConfig.PasswordEncoderSecretProp))
+  def passwordEncoderOldSecret = Option(getPassword(KafkaConfig.PasswordEncoderOldSecretProp))
+  def passwordEncoderCipherAlgorithm = getString(KafkaConfig.PasswordEncoderCipherAlgorithmProp)
+  def passwordEncoderKeyFactoryAlgorithm = Option(getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp))
+  def passwordEncoderKeyLength = getInt(KafkaConfig.PasswordEncoderKeyLengthProp)
+  def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp)
+
   /** ********* Quota Configuration **************/
   val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
   val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
@@ -1170,9 +1198,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
 
   val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
   def compressionType = getString(KafkaConfig.CompressionTypeProp)
-  val listeners: Seq[EndPoint] = getListeners
-  val advertisedListeners: Seq[EndPoint] = getAdvertisedListeners
-  private[kafka] lazy val listenerSecurityProtocolMap = getListenerSecurityProtocolMap
 
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
     dynamicConfig.addReconfigurable(reconfigurable)
@@ -1203,7 +1228,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
 
   // If the user did not define listeners but did define host or port, let's use them in backward compatible way
   // If none of those are defined, we default to PLAINTEXT://:9092
-  private def getListeners: Seq[EndPoint] = {
+  def listeners: Seq[EndPoint] = {
     Option(getString(KafkaConfig.ListenersProp)).map { listenerProp =>
       CoreUtils.listenerListToEndPoints(listenerProp, listenerSecurityProtocolMap)
     }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap))
@@ -1212,14 +1237,14 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   // If the user defined advertised listeners, we use those
   // If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults
   // If none of these are defined, we'll use the listeners
-  private def getAdvertisedListeners: Seq[EndPoint] = {
+  def advertisedListeners: Seq[EndPoint] = {
     val advertisedListenersProp = getString(KafkaConfig.AdvertisedListenersProp)
     if (advertisedListenersProp != null)
       CoreUtils.listenerListToEndPoints(advertisedListenersProp, listenerSecurityProtocolMap)
     else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null)
       CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort, listenerSecurityProtocolMap)
     else
-      getListeners
+      listeners
   }
 
   private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = {
@@ -1248,7 +1273,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     }
   }
 
-  private def getListenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
+  def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
     getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp))
       .map { case (listenerName, protocolName) =>
       ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
@@ -1295,7 +1320,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL
     require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM,
       s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
-    require(!interBrokerUsesSasl || saslEnabledMechanisms.contains(saslMechanismInterBrokerProtocol),
+    require(!interBrokerUsesSasl || saslEnabledMechanisms(interBrokerListenerName).contains(saslMechanismInterBrokerProtocol),
       s"${KafkaConfig.SaslMechanismInterBrokerProtocolProp} must be included in ${KafkaConfig.SaslEnabledMechanismsProp} when SASL is used for inter-broker communication")
     require(queuedMaxBytes <= 0 || queuedMaxBytes >= socketRequestMaxBytes,
       s"${KafkaConfig.QueuedMaxBytesProp} must be larger or equal to ${KafkaConfig.SocketRequestMaxBytesProp}")
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 747a0df..0212181 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -44,6 +44,7 @@ import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
@@ -236,8 +237,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         logManager.startup()
 
         metadataCache = new MetadataCache(config.brokerId)
-        tokenCache = new DelegationTokenCache(config.saslEnabledMechanisms)
-        credentialProvider = new CredentialProvider(config.saslEnabledMechanisms, tokenCache)
+        // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
+        // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
+        tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
+        credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
 
         socketServer = new SocketServer(config, metrics, time, credentialProvider)
         socketServer.startup()
@@ -366,7 +369,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
     zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
   }
 
-  private def createBrokerInfo: BrokerInfo = {
+  private[server] def createBrokerInfo: BrokerInfo = {
     val listeners = config.advertisedListeners.map { endpoint =>
       if (endpoint.port == 0)
         endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
new file mode 100644
index 0000000..ff11e24
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.utils
+
+import java.nio.charset.StandardCharsets
+import java.security.{AlgorithmParameters, NoSuchAlgorithmException, SecureRandom}
+import java.security.spec.AlgorithmParameterSpec
+import javax.crypto.{Cipher, SecretKeyFactory}
+import javax.crypto.spec._
+
+import kafka.utils.PasswordEncoder._
+import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.utils.Base64
+
+import scala.collection.Map
+
+object PasswordEncoder {
+  val KeyFactoryAlgorithmProp = "keyFactoryAlgorithm"
+  val CipherAlgorithmProp = "cipherAlgorithm"
+  val InitializationVectorProp = "initializationVector"
+  val KeyLengthProp = "keyLength"
+  val SaltProp = "salt"
+  val IterationsProp = "iterations"
+  val EncyrptedPasswordProp = "encryptedPassword"
+  val PasswordLengthProp = "passwordLength"
+}
+
+/**
+  * Password encoder and decoder implementation. Encoded passwords are persisted as a CSV map
+  * containing the encoded password in base64 and along with the properties used for encryption.
+  *
+  * @param secret The secret used for encoding and decoding
+  * @param keyFactoryAlgorithm  Key factory algorithm if configured. By default, PBKDF2WithHmacSHA512 is
+  *                             used if available, PBKDF2WithHmacSHA1 otherwise.
+  * @param cipherAlgorithm Cipher algorithm used for encoding.
+  * @param keyLength Key length used for encoding. This should be valid for the specified algorithms.
+  * @param iterations Iteration count used for encoding.
+  *
+  * The provided `keyFactoryAlgorithm`, 'cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords.
+  * The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
+  *
+  */
+class PasswordEncoder(secret: Password,
+                      keyFactoryAlgorithm: Option[String],
+                      cipherAlgorithm: String,
+                      keyLength: Int,
+                      iterations: Int) extends Logging {
+
+  private val secureRandom = new SecureRandom
+  private val cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm)
+
+  def encode(password: Password): String = {
+    val salt = new Array[Byte](256)
+    secureRandom.nextBytes(salt)
+    val cipher = Cipher.getInstance(cipherAlgorithm)
+    val keyFactory = secretKeyFactory(keyFactoryAlgorithm)
+    val keySpec = secretKeySpec(keyFactory, cipherAlgorithm, keyLength, salt, iterations)
+    cipher.init(Cipher.ENCRYPT_MODE, keySpec)
+    val encryptedPassword = cipher.doFinal(password.value.getBytes(StandardCharsets.UTF_8))
+    val encryptedMap = Map(
+      KeyFactoryAlgorithmProp -> keyFactory.getAlgorithm,
+      CipherAlgorithmProp -> cipherAlgorithm,
+      KeyLengthProp -> keyLength,
+      SaltProp -> base64Encode(salt),
+      IterationsProp -> iterations.toString,
+      EncyrptedPasswordProp -> base64Encode(encryptedPassword),
+      PasswordLengthProp -> password.value.length
+    ) ++ cipherParamsEncoder.toMap(cipher.getParameters)
+    encryptedMap.map { case (k, v) => s"$k:$v" }.mkString(",")
+  }
+
+  def decode(encodedPassword: String): Password = {
+    val params = CoreUtils.parseCsvMap(encodedPassword)
+    val keyFactoryAlg = params(KeyFactoryAlgorithmProp)
+    val cipherAlg = params(CipherAlgorithmProp)
+    val keyLength = params(KeyLengthProp).toInt
+    val salt = base64Decode(params(SaltProp))
+    val iterations = params(IterationsProp).toInt
+    val encryptedPassword = base64Decode(params(EncyrptedPasswordProp))
+    val passwordLengthProp = params(PasswordLengthProp).toInt
+    val cipher = Cipher.getInstance(cipherAlg)
+    val keyFactory = secretKeyFactory(Some(keyFactoryAlg))
+    val keySpec = secretKeySpec(keyFactory, cipherAlg, keyLength, salt, iterations)
+    cipher.init(Cipher.DECRYPT_MODE, keySpec, cipherParamsEncoder.toParameterSpec(params))
+    val password = try {
+      val decrypted = cipher.doFinal(encryptedPassword)
+      new String(decrypted, StandardCharsets.UTF_8)
+    } catch {
+      case e: Exception => throw new ConfigException("Password could not be decoded", e)
+    }
+    if (password.length != passwordLengthProp) // Sanity check
+      throw new ConfigException("Password could not be decoded, sanity check of length failed")
+    new Password(password)
+  }
+
+  private def secretKeyFactory(keyFactoryAlg: Option[String]): SecretKeyFactory = {
+    keyFactoryAlg match {
+      case Some(algorithm) => SecretKeyFactory.getInstance(algorithm)
+      case None =>
+        try {
+          SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512")
+        } catch {
+          case _: NoSuchAlgorithmException => SecretKeyFactory.getInstance("PBKDF2WithHmacSHA1")
+        }
+    }
+  }
+
+  private def secretKeySpec(keyFactory: SecretKeyFactory,
+                            cipherAlg: String,
+                            keyLength: Int,
+                            salt: Array[Byte], iterations: Int): SecretKeySpec = {
+    val keySpec = new PBEKeySpec(secret.value.toCharArray, salt, iterations, keyLength)
+    val algorithm = if (cipherAlg.indexOf('/') > 0) cipherAlg.substring(0, cipherAlg.indexOf('/')) else cipherAlg
+    new SecretKeySpec(keyFactory.generateSecret(keySpec).getEncoded, algorithm)
+  }
+
+  private def base64Encode(bytes: Array[Byte]): String = Base64.encoder.encodeToString(bytes)
+
+  private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.decoder.decode(encoded)
+
+  private def cipherParamsInstance(cipherAlgorithm: String): CipherParamsEncoder = {
+    val aesPattern = "AES/(.*)/.*".r
+    cipherAlgorithm match {
+      case aesPattern("GCM") => new GcmParamsEncoder
+      case _ => new IvParamsEncoder
+    }
+  }
+
+  private trait CipherParamsEncoder {
+    def toMap(cipher: AlgorithmParameters): Map[String, String]
+    def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec
+  }
+
+  private class IvParamsEncoder extends CipherParamsEncoder {
+    def toMap(cipherParams: AlgorithmParameters): Map[String, String] = {
+      if (cipherParams != null) {
+        val ivSpec = cipherParams.getParameterSpec(classOf[IvParameterSpec])
+        Map(InitializationVectorProp -> base64Encode(ivSpec.getIV))
+      } else
+        throw new IllegalStateException("Could not determine initialization vector for cipher")
+    }
+    def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec = {
+      new IvParameterSpec(base64Decode(paramMap(InitializationVectorProp)))
+    }
+  }
+
+  private class GcmParamsEncoder extends CipherParamsEncoder {
+    def toMap(cipherParams: AlgorithmParameters): Map[String, String] = {
+      if (cipherParams != null) {
+        val spec = cipherParams.getParameterSpec(classOf[GCMParameterSpec])
+        Map(InitializationVectorProp -> base64Encode(spec.getIV),
+            "authenticationTagLength" -> spec.getTLen.toString)
+      } else
+        throw new IllegalStateException("Could not determine initialization vector for cipher")
+    }
+    def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec = {
+      new GCMParameterSpec(paramMap("authenticationTagLength").toInt, base64Decode(paramMap(InitializationVectorProp)))
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index d683a8d..b545455 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -82,6 +82,13 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
     info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: ${brokerInfo.broker.endPoints}")
   }
 
+  def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
+    val brokerIdPath = brokerInfo.path
+    val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
+    retryRequestUntilConnected(setDataRequest)
+    info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
+  }
+
   /**
    * Gets topic partition states for the given partitions.
    * @param partitions the partitions for which we want ot get states.
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 6f45bca..867e03d 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -430,13 +430,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityProtocolMap.name)
     assertFalse(listenerSecurityProtocolMap.isDefault)
     assertFalse(listenerSecurityProtocolMap.isSensitive)
-    assertTrue(listenerSecurityProtocolMap.isReadOnly)
+    assertFalse(listenerSecurityProtocolMap.isReadOnly)
     val truststorePassword = configs.get(brokerResource1).get(KafkaConfig.SslTruststorePasswordProp)
     assertEquals(KafkaConfig.SslTruststorePasswordProp, truststorePassword.name)
     assertNull(truststorePassword.value)
     assertFalse(truststorePassword.isDefault)
     assertTrue(truststorePassword.isSensitive)
-    assertTrue(truststorePassword.isReadOnly)
+    assertFalse(truststorePassword.isReadOnly)
     val compressionType = configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp)
     assertEquals(servers(1).config.compressionType.toString, compressionType.value)
     assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name)
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index f459252..273b247 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -59,12 +59,7 @@ trait SaslSetup {
       case _ => false
     })
     if (hasKerberos) {
-      val (serverKeytabFile, clientKeytabFile) = maybeCreateEmptyKeytabFiles()
-      kdc = new MiniKdc(kdcConf, workDir)
-      kdc.start()
-      kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost")
-      kdc.createPrincipal(clientKeytabFile,
-        JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
+      initializeKerberos()
     }
     writeJaasConfigurationToFile(jaasSections)
     val hasZk = jaasSections.exists(_.modules.exists {
@@ -75,6 +70,15 @@ trait SaslSetup {
       System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
   }
 
+  protected def initializeKerberos(): Unit = {
+    val (serverKeytabFile, clientKeytabFile) = maybeCreateEmptyKeytabFiles()
+    kdc = new MiniKdc(kdcConf, workDir)
+    kdc.start()
+    kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost")
+    kdc.createPrincipal(clientKeytabFile,
+      JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
+  }
+
   /** Return a tuple with the path to the server keytab file and client keytab file */
   protected def maybeCreateEmptyKeytabFiles(): (File, File) = {
     if (serverKeytabFile.isEmpty)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 1224274..b7f0ae8 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -18,28 +18,29 @@
 
 package kafka.server
 
-import java.io.{Closeable, File, FileOutputStream, FileWriter}
+import java.io.{Closeable, File, FileWriter}
 import java.nio.file.{Files, StandardCopyOption}
 import java.lang.management.ManagementFactory
 import java.util
 import java.util.{Collections, Properties}
-import java.util.concurrent.{ConcurrentLinkedQueue, ExecutionException, TimeUnit}
+import java.util.concurrent._
 import javax.management.ObjectName
 
 import kafka.admin.ConfigCommand
-import kafka.api.SaslSetup
-import kafka.log.LogConfig
+import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.coordinator.group.OffsetConfig
+import kafka.log.LogConfig
 import kafka.message.ProducerCompressionCodec
-import kafka.utils.{ShutdownableThread, TestUtils}
+import kafka.utils._
 import kafka.utils.Implicits._
 import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
+import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition}
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.{ConfigException, ConfigResource, SslConfigs}
 import org.apache.kafka.common.config.SslConfigs._
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException}
@@ -55,6 +56,7 @@ import org.junit.{After, Before, Test}
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConverters._
+import scala.collection.Seq
 
 object DynamicBrokerReconfigurationTest {
   val SecureInternal = "INTERNAL"
@@ -65,12 +67,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
 
   import DynamicBrokerReconfigurationTest._
 
-  private var servers = new ArrayBuffer[KafkaServer]
+  private val servers = new ArrayBuffer[KafkaServer]
   private val numServers = 3
   private val producers = new ArrayBuffer[KafkaProducer[String, String]]
   private val consumers = new ArrayBuffer[KafkaConsumer[String, String]]
   private val adminClients = new ArrayBuffer[AdminClient]()
   private val clientThreads = new ArrayBuffer[ShutdownableThread]()
+  private val executors = new ArrayBuffer[ExecutorService]
   private val topic = "testtopic"
 
   private val kafkaClientSaslMechanism = "PLAIN"
@@ -95,10 +98,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
       props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL")
       props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
+      props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
       props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
       props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
       props.put(KafkaConfig.LogSegmentBytesProp, "2000")
       props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000")
+      props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret")
+      props.put(KafkaConfig.PasswordEncoderOldSecretProp, "old-dynamic-config-secret")
 
       props ++= sslProperties1
       addKeystoreWithListenerPrefix(sslProperties1, props, SecureInternal)
@@ -127,8 +133,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     clientThreads.foreach(_.interrupt())
     clientThreads.foreach(_.initiateShutdown())
     clientThreads.foreach(_.join(5 * 1000))
-    producers.foreach(_.close())
-    consumers.foreach(_.close())
+    executors.foreach(_.shutdownNow())
+    producers.foreach(_.close(0, TimeUnit.MILLISECONDS))
+    consumers.foreach(_.close(0, TimeUnit.MILLISECONDS))
     adminClients.foreach(_.close())
     TestUtils.shutdownServers(servers)
     super.tearDown()
@@ -514,13 +521,217 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     stopAndVerifyProduceConsume(producerThread, consumerThread)
   }
 
+  @Test
+  def testAdvertisedListenerUpdate(): Unit = {
+    val adminClient = adminClients.head
+    val externalAdminClient = createAdminClient(SecurityProtocol.SASL_SSL, SecureExternal)
+
+    // Ensure connections are made to brokers before external listener is made inaccessible
+    describeConfig(externalAdminClient)
+
+    // Update broker keystore for external listener to use invalid listener address
+    // any address other than localhost is sufficient to fail (either connection or host name verification failure)
+    val invalidHost = "192.168.0.1"
+    alterAdvertisedListener(adminClient, externalAdminClient, "localhost", invalidHost)
+
+    // Verify that producer connections fail since advertised listener is invalid
+    val bootstrap = bootstrapServers.replaceAll(invalidHost, "localhost") // allow bootstrap connection to succeed
+    val producer1 = createProducer(trustStoreFile1, retries = 0, bootstrap = bootstrap)
+
+    val sendFuture = verifyConnectionFailure(producer1)
+
+    alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost")
+
+    // Verify that produce/consume work now
+    val producer = createProducer(trustStoreFile1, retries = 0)
+    val consumer = createConsumer("group2", trustStoreFile1, topic)
+    verifyProduceConsume(producer, consumer, 10, topic)
+
+    // Verify updating inter-broker listener
+    val props = new Properties
+    props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal)
+    try {
+      reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.InterBrokerListenerNameProp, SecureExternal))
+      fail("Inter-broker listener cannot be dynamically updated")
+    } catch {
+      case e: ExecutionException =>
+        assertTrue(s"Unexpected exception ${e.getCause}", e.getCause.isInstanceOf[InvalidRequestException])
+        servers.foreach(server => assertEquals(SecureInternal, server.config.interBrokerListenerName.value))
+    }
+
+    // Verify that the other send did not complete
+    verifyTimeout(sendFuture)
+  }
+
+  @Test
+  def testAddRemoveSslListener(): Unit = {
+    verifyAddListener("SSL", SecurityProtocol.SSL, Seq.empty)
+
+    // Restart servers and check secret rotation
+    servers.foreach(_.shutdown())
+    servers.foreach(_.awaitShutdown())
+    adminClients.foreach(_.close())
+    adminClients.clear()
+
+    // All passwords are currently encoded with password.encoder.secret. Encode with password.encoder.old.secret
+    // and update ZK. When each server is started, it should decode using password.encoder.old.secret and update
+    // ZK with newly encoded values using password.encoder.secret.
+    servers.foreach { server =>
+      val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
+      val config = server.config
+      val secret = config.passwordEncoderSecret.getOrElse(throw new IllegalStateException("Password encoder secret not configured"))
+      val oldSecret = config.passwordEncoderOldSecret.getOrElse(throw new IllegalStateException("Password encoder old secret not configured"))
+      val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.DynamicPasswordConfigs.contains)
+      val passwordDecoder = new PasswordEncoder(secret,
+        config.passwordEncoderKeyFactoryAlgorithm,
+        config.passwordEncoderCipherAlgorithm,
+        config.passwordEncoderKeyLength,
+        config.passwordEncoderIterations)
+      val passwordEncoder = new PasswordEncoder(oldSecret,
+        config.passwordEncoderKeyFactoryAlgorithm,
+        config.passwordEncoderCipherAlgorithm,
+        config.passwordEncoderKeyLength,
+        config.passwordEncoderIterations)
+      passwordConfigs.foreach { case (name, value) =>
+          val decoded = passwordDecoder.decode(value).value
+          props.put(name, passwordEncoder.encode(new Password(decoded)))
+      }
+      val brokerId = server.config.brokerId
+      adminZkClient.changeBrokerConfig(Seq(brokerId), props)
+      val updatedProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
+      passwordConfigs.foreach { case (name, value) => assertNotEquals(value, updatedProps.get(name)) }
+
+      server.startup()
+      TestUtils.retry(10000) {
+        val newProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
+        passwordConfigs.foreach { case (name, value) => assertEquals(value, newProps.get(name)) }
+      }
+    }
+
+    verifyListener(SecurityProtocol.SSL, None)
+    createAdminClient(SecurityProtocol.SSL, SecureInternal)
+    verifyRemoveListener("SSL", SecurityProtocol.SSL, Seq.empty)
+  }
+
+  @Test
+  def testAddRemoveSaslListeners(): Unit = {
+    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
+    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword)
+    initializeKerberos()
+
+    //verifyAddListener("SASL_SSL", SecurityProtocol.SASL_SSL, Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN"))
+    verifyAddListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI"))
+    //verifyRemoveListener("SASL_SSL", SecurityProtocol.SASL_SSL, Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN"))
+    verifyRemoveListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI"))
+  }
+
+  private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol,
+                                saslMechanisms: Seq[String]): Unit = {
+    val config = servers.head.config
+    val existingListenerCount = config.listeners.size
+    val listeners = config.listeners
+      .map(e => s"${e.listenerName.value}://${e.host}:${e.port}")
+      .mkString(",") + s",$listenerName://localhost:0"
+    val listenerMap = config.listenerSecurityProtocolMap
+      .map { case (name, protocol) => s"${name.value}:${protocol.name}" }
+      .mkString(",") + s",$listenerName:${securityProtocol.name}"
+
+    val props =  adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString)
+    props.put(KafkaConfig.ListenersProp, listeners)
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap)
+    securityProtocol match {
+      case SecurityProtocol.SSL =>
+        addListenerPropsSsl(listenerName, props)
+      case SecurityProtocol.SASL_PLAINTEXT =>
+        addListenerPropsSasl(listenerName, saslMechanisms, props)
+      case SecurityProtocol.SASL_SSL =>
+        addListenerPropsSasl(listenerName, saslMechanisms, props)
+        addListenerPropsSsl(listenerName, props)
+      case SecurityProtocol.PLAINTEXT => // no additional props
+    }
+
+    alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get
+
+    TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount + 1),
+      "Listener config not updated")
+    TestUtils.waitUntilTrue(() => servers.forall(server => {
+      try {
+        server.socketServer.boundPort(new ListenerName(listenerName)) > 0
+      } catch {
+        case _: Exception => false
+      }
+    }), "Listener not created")
+
+    if (saslMechanisms.nonEmpty)
+      saslMechanisms.foreach(mechanism => verifyListener(securityProtocol, Some(mechanism)))
+    else
+      verifyListener(securityProtocol, None)
+  }
+
+  private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol,
+                                   saslMechanisms: Seq[String]): Unit = {
+    val saslMechanism = if (saslMechanisms.isEmpty) "" else saslMechanisms.head
+    val producer1 = createProducer(listenerName, securityProtocol, saslMechanism)
+    val consumer1 = createConsumer(listenerName, securityProtocol, saslMechanism,
+      s"remove-listener-group-$securityProtocol")
+    verifyProduceConsume(producer1, consumer1, numRecords = 10, topic)
+    // send another message to check consumer later
+    producer1.send(new ProducerRecord(topic, "key", "value")).get(100, TimeUnit.MILLISECONDS)
+
+    val config = servers.head.config
+    val existingListenerCount = config.listeners.size
+    val listeners = config.listeners
+      .filter(e => e.listenerName.value != securityProtocol.name)
+      .map(e => s"${e.listenerName.value}://${e.host}:${e.port}")
+      .mkString(",")
+    val listenerMap = config.listenerSecurityProtocolMap
+      .filterKeys(listenerName => listenerName.value != securityProtocol.name)
+      .map { case (listenerName, protocol) => s"${listenerName.value}:${protocol.name}" }
+      .mkString(",")
+
+    val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString)
+    val listenerProps = props.asScala.keySet.filter(_.startsWith(new ListenerName(listenerName).configPrefix))
+    listenerProps.foreach(props.remove)
+    props.put(KafkaConfig.ListenersProp, listeners)
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap)
+    alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get
+
+    TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount - 1),
+      "Listeners not updated")
+
+    // Test that connections using deleted listener don't work
+    val producerFuture = verifyConnectionFailure(producer1)
+    val consumerFuture = verifyConnectionFailure(consumer1)
+
+    // Test that other listeners still work
+    val producer2 = createProducer(trustStoreFile1, retries = 0)
+    val consumer2 = createConsumer(s"remove-listener-group2-$securityProtocol", trustStoreFile1, topic, autoOffsetReset = "latest")
+    verifyProduceConsume(producer2, consumer2, numRecords = 10, topic)
+
+    // Verify that producer/consumer using old listener don't work
+    verifyTimeout(producerFuture)
+    verifyTimeout(consumerFuture)
+  }
+
+  private def verifyListener(securityProtocol: SecurityProtocol, saslMechanism: Option[String]): Unit = {
+    val mechanism = saslMechanism.getOrElse("")
+    val producer = createProducer(securityProtocol.name, securityProtocol, mechanism)
+    val consumer = createConsumer(securityProtocol.name, securityProtocol, mechanism,
+      s"add-listener-group-$securityProtocol-$mechanism")
+    verifyProduceConsume(producer, consumer, numRecords = 10, topic)
+  }
+
+  private def bootstrapServers: String = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
+
   private def createProducer(trustStore: File, retries: Int,
-                             clientId: String = "test-producer"): KafkaProducer[String, String] = {
-    val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
+                             clientId: String = "test-producer",
+                             bootstrap: String = bootstrapServers,
+                             securityProtocol: SecurityProtocol = SecurityProtocol.SASL_SSL): KafkaProducer[String, String] = {
     val propsOverride = new Properties
     propsOverride.put(ProducerConfig.CLIENT_ID_CONFIG, clientId)
+    propsOverride.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS")
     val producer = TestUtils.createNewProducer(
-      bootstrapServers,
+      bootstrap,
       acks = -1,
       retries = retries,
       securityProtocol = SecurityProtocol.SASL_SSL,
@@ -533,17 +744,72 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     producer
   }
 
-  private def createConsumer(groupId: String, trustStore: File, topic: String = topic):KafkaConsumer[String, String] = {
-    val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
+  private def createConsumer(groupId: String, trustStore: File,
+                             topic: String = topic,
+                             bootstrap: String = bootstrapServers,
+                             securityProtocol: SecurityProtocol = SecurityProtocol.SASL_SSL,
+                             autoOffsetReset: String = "earliest"):KafkaConsumer[String, String] = {
+    val propsOverride = new Properties
+    propsOverride.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS")
     val consumer = TestUtils.createNewConsumer(
-      bootstrapServers,
+      bootstrap,
       groupId,
-      securityProtocol = SecurityProtocol.SASL_SSL,
+      autoOffsetReset = autoOffsetReset,
+      securityProtocol = securityProtocol,
       trustStoreFile = Some(trustStore),
       saslProperties = Some(clientSaslProps),
       keyDeserializer = new StringDeserializer,
       valueDeserializer = new StringDeserializer)
     consumer.subscribe(Collections.singleton(topic))
+    if (autoOffsetReset == "latest") {
+      do {
+        consumer.poll(1)
+      } while (consumer.assignment.isEmpty)
+    }
+    consumers += consumer
+    consumer
+  }
+
+  private def clientProps(securityProtocol: SecurityProtocol, saslMechanism: String): Properties = {
+    val props = new Properties
+    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
+    props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS")
+    val saslProps = if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
+      Some(kafkaClientSaslProperties(saslMechanism, dynamicJaasConfig = true))
+    } else
+      None
+    val securityProps = TestUtils.securityConfigs(Mode.CLIENT, securityProtocol,
+      Some(trustStoreFile1), "client", TestUtils.SslCertificateCn, saslProps)
+    props ++= securityProps
+    props
+  }
+
+  private def createProducer(listenerName: String, securityProtocol: SecurityProtocol,
+                             saslMechanism: String): KafkaProducer[String, String] = {
+    val bootstrapServers =  TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
+    val producer = TestUtils.createNewProducer(bootstrapServers,
+      acks = -1, retries = 0,
+      securityProtocol = securityProtocol,
+      keySerializer = new StringSerializer,
+      valueSerializer = new StringSerializer,
+      props = Some(clientProps(securityProtocol, saslMechanism)))
+    producers += producer
+    producer
+  }
+
+  private def createConsumer(listenerName: String, securityProtocol: SecurityProtocol,
+                             saslMechanism: String, group: String): KafkaConsumer[String, String] = {
+    val bootstrapServers =  TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
+    val consumer = TestUtils.createNewConsumer(bootstrapServers, group,
+      autoOffsetReset = "latest",
+      securityProtocol = securityProtocol,
+      keyDeserializer = new StringDeserializer,
+      valueDeserializer = new StringDeserializer,
+      props = Some(clientProps(securityProtocol, saslMechanism)))
+    consumer.subscribe(Collections.singleton(topic))
+    do {
+      consumer.poll(1)
+    } while (consumer.assignment.isEmpty)
     consumers += consumer
     consumer
   }
@@ -552,6 +818,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     val config = new util.HashMap[String, Object]
     val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
+    config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
     val securityProps: util.Map[Object, Object] =
       TestUtils.adminClientSecurityConfigs(securityProtocol, Some(trustStoreFile1), Some(clientSaslProps))
     securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
@@ -563,7 +830,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   private def verifyProduceConsume(producer: KafkaProducer[String, String],
                                    consumer: KafkaConsumer[String, String],
                                    numRecords: Int,
-                                   topic: String = topic): Unit = {
+                                   topic: String): Unit = {
     val producerRecords = (1 to numRecords).map(i => new ProducerRecord(topic, s"key$i", s"value$i"))
     producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
 
@@ -639,6 +906,39 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     waitForConfig(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
   }
 
+  private def serverEndpoints(adminClient: AdminClient): String = {
+    val nodes = adminClient.describeCluster().nodes().get
+    nodes.asScala.map { node =>
+      s"${node.host}:${node.port}"
+    }.mkString(",")
+  }
+
+  private def alterAdvertisedListener(adminClient: AdminClient, externalAdminClient: AdminClient, oldHost: String, newHost: String): Unit = {
+    val configs = servers.map { server =>
+      val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
+      val newListeners = server.config.advertisedListeners.map { e =>
+        if (e.listenerName.value == SecureExternal)
+          s"${e.listenerName.value}://$newHost:${server.boundPort(e.listenerName)}"
+        else
+          s"${e.listenerName.value}://${e.host}:${server.boundPort(e.listenerName)}"
+      }.mkString(",")
+      val configEntry = new ConfigEntry(KafkaConfig.AdvertisedListenersProp, newListeners)
+      (resource, new Config(Collections.singleton(configEntry)))
+    }.toMap.asJava
+    adminClient.alterConfigs(configs).all.get
+    servers.foreach { server =>
+      TestUtils.retry(10000) {
+        val externalListener = server.config.advertisedListeners.find(_.listenerName.value == SecureExternal)
+          .getOrElse(throw new IllegalStateException("External listener not found"))
+        assertTrue("Config not updated", externalListener.host == newHost)
+      }
+    }
+    val (endpoints, altered) = TestUtils.computeUntilTrue(serverEndpoints(externalAdminClient)) { endpoints =>
+      !endpoints.contains(oldHost)
+    }
+    assertTrue(s"Advertised listener update not propagated by controller: $endpoints", altered)
+  }
+
   private def alterConfigs(adminClient: AdminClient, props: Properties, perBrokerConfig: Boolean): AlterConfigsResult = {
     val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
     val newConfig = new Config(configEntries)
@@ -701,12 +1001,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     adminZkClient.changeBrokerConfig(brokers, keystoreProps)
   }
 
-  private def waitForKeystore(sslProperties: Properties, maxWaitMs: Long = 10000): Unit = {
-    waitForConfig(new ListenerName(SecureExternal).configPrefix + SSL_KEYSTORE_LOCATION_CONFIG,
-      sslProperties.getProperty(SSL_KEYSTORE_LOCATION_CONFIG), maxWaitMs)
-
-  }
-
   private def waitForConfig(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
     servers.foreach { server => waitForConfigOnServer(server, propName, propValue, maxWaitMs) }
   }
@@ -774,6 +1068,64 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     }
   }
 
+  private def verifyConnectionFailure(producer: KafkaProducer[String, String]): Future[_] = {
+    val executor = Executors.newSingleThreadExecutor
+    executors += executor
+    val future = executor.submit(new Runnable() {
+      def run() {
+        producer.send(new ProducerRecord(topic, "key", "value")).get
+      }
+    })
+    verifyTimeout(future)
+    future
+  }
+
+  private def verifyConnectionFailure(consumer: KafkaConsumer[String, String]): Future[_] = {
+    val executor = Executors.newSingleThreadExecutor
+    executors += executor
+    val future = executor.submit(new Runnable() {
+      def run() {
+        assertEquals(0, consumer.poll(100).count)
+      }
+    })
+    verifyTimeout(future)
+    future
+  }
+
+  private def verifyTimeout(future: Future[_]): Unit = {
+    try {
+      future.get(100, TimeUnit.MILLISECONDS)
+      fail("Operation should not have completed")
+    } catch {
+      case _: TimeoutException => // expected exception
+    }
+  }
+
+  private def addListenerPropsSsl(listenerName: String, props: Properties): Unit = {
+    val prefix = new ListenerName(listenerName).configPrefix
+    sslProperties1.keySet.asScala.foreach { name =>
+      val value = sslProperties1.get(name)
+      val valueStr = value match {
+        case password: Password => password.value
+        case list: util.List[_] => list.asScala.map(_.toString).mkString(",")
+        case _ => value.toString
+      }
+      props.put(s"$prefix$name", valueStr)
+    }
+  }
+
+  private def addListenerPropsSasl(listener: String, mechanisms: Seq[String], props: Properties): Unit = {
+    val listenerName = new ListenerName(listener)
+    val prefix = listenerName.configPrefix
+    props.put(prefix + KafkaConfig.SaslEnabledMechanismsProp, mechanisms.mkString(","))
+    props.put(prefix + KafkaConfig.SaslKerberosServiceNameProp, "kafka")
+    mechanisms.foreach { mechanism =>
+      val jaasSection = jaasSections(Seq(mechanism), None, KafkaSasl, "").head
+      val jaasConfig = jaasSection.modules.head.toString
+      props.put(listenerName.saslMechanismConfigPrefix(mechanism) + KafkaConfig.SaslJaasConfigProp, jaasConfig)
+    }
+  }
+
   private class ProducerThread(clientId: String, retries: Int) extends ShutdownableThread(clientId, isInterruptible = false) {
     private val producer = createProducer(trustStoreFile1, retries, clientId)
     @volatile var sent = 0
@@ -877,8 +1229,10 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
     Set(PollingIntervalProp).asJava
   }
 
-  override def validateReconfiguration(configs: util.Map[String, _]): Boolean = {
-    configs.get(PollingIntervalProp).toString.toInt > 0
+  override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+    val pollingInterval = configs.get(PollingIntervalProp).toString
+    if (configs.get(PollingIntervalProp).toString.toInt <= 0)
+      throw new ConfigException(s"Invalid polling interval $pollingInterval")
   }
 
   override def reconfigure(configs: util.Map[String, _]): Unit = {
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
index 666ba4b..f2c6753 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
@@ -21,25 +21,23 @@ import java.util.Properties
 
 import kafka.utils.JaasTestUtils
 import kafka.utils.JaasTestUtils.JaasSection
-import org.apache.kafka.common.network.ListenerName
 
 class MultipleListenersWithAdditionalJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest {
 
   import MultipleListenersWithSameSecurityProtocolBaseTest._
 
-  override def saslProperties(listenerName: ListenerName): Properties = {
-    listenerName.value match {
-      case SecureInternal => kafkaClientSaslProperties(Plain, dynamicJaasConfig = true)
-      case SecureExternal => kafkaClientSaslProperties(GssApi, dynamicJaasConfig = true)
-      case _ => throw new IllegalArgumentException(s"Unexpected listener name $listenerName")
-    }
+  override def staticJaasSections: Seq[JaasSection] = {
+    val (serverKeytabFile, _) = maybeCreateEmptyKeytabFiles()
+    JaasTestUtils.zkSections :+
+      JaasTestUtils.kafkaServerSection("secure_external.KafkaServer", kafkaServerSaslMechanisms(SecureExternal), Some(serverKeytabFile))
   }
 
-  override def jaasSections: Seq[JaasSection] = {
-    val (serverKeytabFile, _) = maybeCreateEmptyKeytabFiles()
-    JaasTestUtils.zkSections ++ Seq(
-      JaasTestUtils.kafkaServerSection("secure_external.KafkaServer", Seq(GssApi), Some(serverKeytabFile)),
-      JaasTestUtils.kafkaServerSection("secure_internal.KafkaServer", Seq(Plain), None)
-    )
+  override protected def dynamicJaasSections: Properties = {
+    val props = new Properties
+    kafkaServerSaslMechanisms(SecureInternal).foreach { mechanism =>
+      addDynamicJaasSection(props, SecureInternal, mechanism,
+        JaasTestUtils.kafkaServerSection("secure_internal.KafkaServer", Seq(mechanism), None))
+    }
+    props
   }
 }
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
index f3e1b8b..10df84e 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
@@ -26,12 +26,9 @@ import org.apache.kafka.common.network.ListenerName
 
 class MultipleListenersWithDefaultJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest {
 
-  import MultipleListenersWithSameSecurityProtocolBaseTest._
+  override def staticJaasSections: Seq[JaasSection] =
+    jaasSections(kafkaServerSaslMechanisms.values.flatMap(identity).toSeq, Some(kafkaClientSaslMechanism), Both)
 
-  override def saslProperties(listenerName: ListenerName): Properties =
-    kafkaClientSaslProperties(Plain, dynamicJaasConfig = true)
-
-  override def jaasSections: Seq[JaasSection] =
-    jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both)
+  override protected def dynamicJaasSections: Properties = new Properties
 
 }
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index da8437e..50ebaa4 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -19,13 +19,13 @@
 package kafka.server
 
 import java.io.File
-import java.util.{Collections, Properties}
+import java.util.{Collections, Objects, Properties}
 import java.util.concurrent.TimeUnit
 
 import kafka.api.SaslSetup
 import kafka.coordinator.group.OffsetConfig
 import kafka.utils.JaasTestUtils.JaasSection
-import kafka.utils.TestUtils
+import kafka.utils.{JaasTestUtils, TestUtils}
 import kafka.utils.Implicits._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
@@ -55,18 +55,20 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
 
   private val trustStoreFile = File.createTempFile("truststore", ".jks")
   private val servers = new ArrayBuffer[KafkaServer]
-  private val producers = mutable.Map[ListenerName, KafkaProducer[Array[Byte], Array[Byte]]]()
-  private val consumers = mutable.Map[ListenerName, KafkaConsumer[Array[Byte], Array[Byte]]]()
+  private val producers = mutable.Map[ClientMetadata, KafkaProducer[Array[Byte], Array[Byte]]]()
+  private val consumers = mutable.Map[ClientMetadata, KafkaConsumer[Array[Byte], Array[Byte]]]()
 
   protected val kafkaClientSaslMechanism = Plain
-  protected val kafkaServerSaslMechanisms = List(GssApi, Plain)
+  protected val kafkaServerSaslMechanisms = Map(
+    SecureExternal -> Seq("SCRAM-SHA-256", GssApi),
+    SecureInternal -> Seq(Plain, "SCRAM-SHA-512"))
 
-  protected def saslProperties(listenerName: ListenerName): Properties
-  protected def jaasSections: Seq[JaasSection]
+  protected def staticJaasSections: Seq[JaasSection]
+  protected def dynamicJaasSections: Properties
 
   @Before
   override def setUp(): Unit = {
-    startSasl(jaasSections)
+    startSasl(staticJaasSections)
     super.setUp()
     // 2 brokers so that we can test that the data propagates correctly via UpdateMetadadaRequest
     val numServers = 2
@@ -82,8 +84,12 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
       props.put(KafkaConfig.InterBrokerListenerNameProp, Internal)
       props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
       props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, kafkaClientSaslMechanism)
-      props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
+      props.put(s"${new ListenerName(SecureInternal).configPrefix}${KafkaConfig.SaslEnabledMechanismsProp}",
+        kafkaServerSaslMechanisms(SecureInternal).mkString(","))
+      props.put(s"${new ListenerName(SecureExternal).configPrefix}${KafkaConfig.SaslEnabledMechanismsProp}",
+        kafkaServerSaslMechanisms(SecureExternal).mkString(","))
       props.put(KafkaConfig.SaslKerberosServiceNameProp, "kafka")
+      props ++= dynamicJaasSections
 
       props ++= TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId")
 
@@ -107,26 +113,37 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
     TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
       replicationFactor = 2, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
 
+    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
+
     servers.head.config.listeners.foreach { endPoint =>
       val listenerName = endPoint.listenerName
 
-      TestUtils.createTopic(zkClient, listenerName.value, 2, 2, servers)
-
       val trustStoreFile =
         if (TestUtils.usesSslTransportLayer(endPoint.securityProtocol)) Some(this.trustStoreFile)
         else None
 
-      val saslProps =
-        if (TestUtils.usesSaslAuthentication(endPoint.securityProtocol)) Some(saslProperties(listenerName))
-        else None
-
       val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName)
 
-      producers(listenerName) = TestUtils.createNewProducer(bootstrapServers, acks = -1,
-        securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps)
+      def addProducerConsumer(listenerName: ListenerName, mechanism: String, saslProps: Option[Properties]): Unit = {
+
+        val topic = s"${listenerName.value}${producers.size}"
+        TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+        val clientMetadata = ClientMetadata(listenerName, mechanism, topic)
+
+        producers(clientMetadata) = TestUtils.createNewProducer(bootstrapServers, acks = -1,
+          securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps)
+
+        consumers(clientMetadata) = TestUtils.createNewConsumer(bootstrapServers, groupId = clientMetadata.toString,
+          securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps)
+      }
 
-      consumers(listenerName) = TestUtils.createNewConsumer(bootstrapServers, groupId = listenerName.value,
-        securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps)
+      if (TestUtils.usesSaslAuthentication(endPoint.securityProtocol)) {
+        kafkaServerSaslMechanisms(endPoint.listenerName.value).foreach { mechanism =>
+          addProducerConsumer(listenerName, mechanism, Some(kafkaClientSaslProperties(mechanism, dynamicJaasConfig = true)))
+        }
+      } else {
+        addProducerConsumer(listenerName, "", saslProps = None)
+      }
     }
   }
 
@@ -145,18 +162,34 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
     */
   @Test
   def testProduceConsume(): Unit = {
-    producers.foreach { case (listenerName, producer) =>
-      val producerRecords = (1 to 10).map(i => new ProducerRecord(listenerName.value, s"key$i".getBytes,
+    producers.foreach { case (clientMetadata, producer) =>
+      val producerRecords = (1 to 10).map(i => new ProducerRecord(clientMetadata.topic, s"key$i".getBytes,
         s"value$i".getBytes))
       producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
 
-      val consumer = consumers(listenerName)
-      consumer.subscribe(Collections.singleton(listenerName.value))
+      val consumer = consumers(clientMetadata)
+      consumer.subscribe(Collections.singleton(clientMetadata.topic))
       val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
       TestUtils.waitUntilTrue(() => {
         records ++= consumer.poll(50).asScala
         records.size == producerRecords.size
-      }, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records")
+      }, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records with mechanism ${clientMetadata.saslMechanism}")
+    }
+  }
+
+  protected def addDynamicJaasSection(props: Properties, listener: String, mechanism: String, jaasSection: JaasSection): Unit = {
+    val listenerName = new ListenerName(listener)
+    val prefix = listenerName.saslMechanismConfigPrefix(mechanism)
+    val jaasConfig = jaasSection.modules.head.toString
+    props.put(s"${prefix}${KafkaConfig.SaslJaasConfigProp}", jaasConfig)
+  }
+
+  case class ClientMetadata(val listenerName: ListenerName, val saslMechanism: String, topic: String) {
+    override def hashCode: Int = Objects.hash(listenerName, saslMechanism)
+    override def equals(obj: Any): Boolean = obj match {
+      case other: ClientMetadata => listenerName == other.listenerName && saslMechanism == other.saslMechanism && topic == other.topic
+      case _ => false
     }
+    override def toString: String = s"${listenerName.value}:$saslMechanism:$topic"
   }
 }
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index c934c4a..18be167 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -87,13 +87,13 @@ class KafkaTest {
     val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "ssl.keystore.password=keystore_password",
                                                                                     "--override", "ssl.key.password=key_password",
                                                                                     "--override", "ssl.truststore.password=truststore_password")))
-    assertEquals(Password.HIDDEN, config.sslKeyPassword.toString)
-    assertEquals(Password.HIDDEN, config.sslKeystorePassword.toString)
-    assertEquals(Password.HIDDEN, config.sslTruststorePassword.toString)
+    assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeyPasswordProp).toString)
+    assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeystorePasswordProp).toString)
+    assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslTruststorePasswordProp).toString)
 
-    assertEquals("key_password", config.sslKeyPassword.value)
-    assertEquals("keystore_password", config.sslKeystorePassword.value)
-    assertEquals("truststore_password", config.sslTruststorePassword.value)
+    assertEquals("key_password", config.getPassword(KafkaConfig.SslKeyPasswordProp).value)
+    assertEquals("keystore_password", config.getPassword(KafkaConfig.SslKeystorePasswordProp).value)
+    assertEquals("truststore_password", config.getPassword(KafkaConfig.SslTruststorePasswordProp).value)
   }
 
   def prepareDefaultConfig(): String = {
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 6e78423..b3c46fa 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -27,10 +27,10 @@ import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.internals.KafkaFutureImpl
-import org.apache.kafka.common.{KafkaFuture, Node}
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.security.scram.ScramCredentialUtils
 import org.apache.kafka.common.utils.Sanitizer
-import org.easymock.{EasyMock, IAnswer}
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
 
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 13299c7..724e05b 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.security.scram.ScramMechanism
 import org.apache.kafka.common.utils.{LogContext, MockTime, Time}
 import org.apache.log4j.Level
 import org.junit.Assert._
@@ -61,7 +62,7 @@ class SocketServerTest extends JUnitSuite {
   props.put("connections.max.idle.ms", "60000")
   val config = KafkaConfig.fromProps(props)
   val metrics = new Metrics
-  val credentialProvider = new CredentialProvider(config.saslEnabledMechanisms, null)
+  val credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, null)
   val localAddress = InetAddress.getLoopbackAddress
 
   // Clean-up any metrics left around by previous tests
@@ -406,7 +407,7 @@ class SocketServerTest extends JUnitSuite {
     // the following sleep is necessary to reliably detect the connection close when we send data below
     Thread.sleep(200L)
     // make sure the sockets are open
-    server.acceptors.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed))
+    server.acceptors.asScala.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed))
     // then shutdown the server
     shutdownServerAndMetrics(server)
 
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 6dedbe0..2e3e274 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -17,13 +17,18 @@
 
 package kafka.server
 
+import java.util
 import java.util.Properties
 
 import kafka.utils.TestUtils
+import org.apache.kafka.common.Reconfigurable
+import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigException, SslConfigs}
 import org.junit.Assert._
 import org.junit.Test
 
+import scala.collection.JavaConverters._
+
 class DynamicBrokerConfigTest {
 
   @Test
@@ -34,7 +39,7 @@ class DynamicBrokerConfigTest {
     val config = KafkaConfig(props)
     val dynamicConfig = config.dynamicConfig
     assertSame(config, dynamicConfig.currentKafkaConfig)
-    assertEquals(oldKeystore, config.sslKeystoreLocation)
+    assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
     assertEquals(oldKeystore,
       config.valuesFromThisConfigWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
     assertEquals(oldKeystore, config.originalsFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
@@ -55,7 +60,7 @@ class DynamicBrokerConfigTest {
       assertEquals(newKeystore,
         config.originalsWithPrefix("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
 
-      assertEquals(oldKeystore, config.sslKeystoreLocation)
+      assertEquals(oldKeystore, config.getString(KafkaConfig.SslKeystoreLocationProp))
       assertEquals(oldKeystore, config.originals.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
       assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
       assertEquals(oldKeystore, config.originalsStrings.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
@@ -75,38 +80,48 @@ class DynamicBrokerConfigTest {
     origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
     val config = KafkaConfig(origProps)
 
-    def verifyConfigUpdateWithInvalidConfig(validProps: Map[String, String], invalidProps: Map[String, String]): Unit = {
-      val props = new Properties
-      validProps.foreach { case (k, v) => props.put(k, v) }
-      invalidProps.foreach { case (k, v) => props.put(k, v) }
-
-      // DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided in
-      // in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid.
-      try {
-        config.dynamicConfig.validate(props, perBrokerConfig = true)
-        fail("Invalid config did not fail validation")
-      } catch {
-        case e: ConfigException => // expected exception
-      }
-
-      // DynamicBrokerConfig#updateBrokerConfig is used to update configs from ZooKeeper during
-      // startup and when configs are updated in ZK. Update should apply valid configs and ignore
-      // invalid ones.
-      config.dynamicConfig.updateBrokerConfig(0, props)
-      validProps.foreach { case (name, value) => assertEquals(value, config.originals.get(name)) }
-      invalidProps.keySet.foreach { name =>
-        assertEquals(origProps.get(name), config.originals.get(name))
-      }
-    }
+    val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" -> "ks.p12")
 
-    val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" ->"ks.p12")
     val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12")
-    verifyConfigUpdateWithInvalidConfig(validProps, securityPropsWithoutListenerPrefix)
+    verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, securityPropsWithoutListenerPrefix)
     val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181")
-    verifyConfigUpdateWithInvalidConfig(validProps, nonDynamicProps)
+    verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps)
 
+    // Test update of configs with invalid type
     val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "invalid")
-    verifyConfigUpdateWithInvalidConfig(validProps, invalidProps)
+    verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps)
+  }
+
+  @Test
+  def testConfigUpdateWithReconfigurableValidationFailure(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    origProps.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "100000000")
+    val config = KafkaConfig(origProps)
+    val validProps = Map.empty[String, String]
+    val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "20")
+
+    def validateLogCleanerConfig(configs: util.Map[String, _]): Unit = {
+      val cleanerThreads = configs.get(KafkaConfig.LogCleanerThreadsProp).toString.toInt
+      if (cleanerThreads <=0 || cleanerThreads >= 5)
+        throw new ConfigException(s"Invalid cleaner threads $cleanerThreads")
+    }
+    val reconfigurable = new Reconfigurable {
+      override def configure(configs: util.Map[String, _]): Unit = {}
+      override def reconfigurableConfigs(): util.Set[String] = Set(KafkaConfig.LogCleanerThreadsProp).asJava
+      override def validateReconfiguration(configs: util.Map[String, _]): Unit = validateLogCleanerConfig(configs)
+      override def reconfigure(configs: util.Map[String, _]): Unit = {}
+    }
+    config.dynamicConfig.addReconfigurable(reconfigurable)
+    verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps)
+    config.dynamicConfig.removeReconfigurable(reconfigurable)
+
+    val brokerReconfigurable = new BrokerReconfigurable {
+      override def reconfigurableConfigs: collection.Set[String] = Set(KafkaConfig.LogCleanerThreadsProp)
+      override def validateReconfiguration(newConfig: KafkaConfig): Unit = validateLogCleanerConfig(newConfig.originals)
+      override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {}
+    }
+    config.dynamicConfig.addBrokerReconfigurable(brokerReconfigurable)
+    verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps)
   }
 
   @Test
@@ -151,4 +166,86 @@ class DynamicBrokerConfigTest {
       assertEquals(oldValue, config.originals.get(name))
     }
   }
+
+  private def verifyConfigUpdateWithInvalidConfig(config: KafkaConfig,
+                                                  origProps: Properties,
+                                                  validProps: Map[String, String],
+                                                  invalidProps: Map[String, String]): Unit = {
+    val props = new Properties
+    validProps.foreach { case (k, v) => props.put(k, v) }
+    invalidProps.foreach { case (k, v) => props.put(k, v) }
+
+    // DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided in
+    // in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid.
+    try {
+      config.dynamicConfig.validate(props, perBrokerConfig = true)
+      fail("Invalid config did not fail validation")
+    } catch {
+      case e: ConfigException => // expected exception
+    }
+
+    // DynamicBrokerConfig#updateBrokerConfig is used to update configs from ZooKeeper during
+    // startup and when configs are updated in ZK. Update should apply valid configs and ignore
+    // invalid ones.
+    config.dynamicConfig.updateBrokerConfig(0, props)
+    validProps.foreach { case (name, value) => assertEquals(value, config.originals.get(name)) }
+    invalidProps.keySet.foreach { name =>
+      assertEquals(origProps.get(name), config.originals.get(name))
+    }
+  }
+
+  @Test
+  def testPasswordConfigEncryption(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    val configWithoutSecret = KafkaConfig(props)
+    props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret")
+    val configWithSecret = KafkaConfig(props)
+    val dynamicProps = new Properties
+    dynamicProps.put(KafkaConfig.SaslJaasConfigProp, "myLoginModule required;")
+
+    try {
+      configWithoutSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
+    } catch {
+      case e: ConfigException => // expected exception
+    }
+    val persistedProps = configWithSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
+    assertFalse("Password not encoded",
+      persistedProps.getProperty(KafkaConfig.SaslJaasConfigProp).contains("myLoginModule"))
+    val decodedProps = configWithSecret.dynamicConfig.fromPersistentProps(persistedProps, perBrokerConfig = true)
+    assertEquals("myLoginModule required;", decodedProps.getProperty(KafkaConfig.SaslJaasConfigProp))
+  }
+
+  @Test
+  def testPasswordConfigEncoderSecretChange(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    props.put(KafkaConfig.SaslJaasConfigProp, "staticLoginModule required;")
+    props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret")
+    val config = KafkaConfig(props)
+    val dynamicProps = new Properties
+    dynamicProps.put(KafkaConfig.SaslJaasConfigProp, "dynamicLoginModule required;")
+
+    val persistedProps = config.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
+    assertFalse("Password not encoded",
+      persistedProps.getProperty(KafkaConfig.SaslJaasConfigProp).contains("LoginModule"))
+    config.dynamicConfig.updateBrokerConfig(0, persistedProps)
+    assertEquals("dynamicLoginModule required;", config.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
+
+    // New config with same secret should use the dynamic password config
+    val newConfigWithSameSecret = KafkaConfig(props)
+    newConfigWithSameSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
+    assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
+
+    // New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig
+    props.put(KafkaConfig.PasswordEncoderSecretProp, "new-encoder-secret")
+    props.put(KafkaConfig.PasswordEncoderOldSecretProp, "config-encoder-secret")
+    val newConfigWithNewAndOldSecret = KafkaConfig(props)
+    newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
+    assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
+
+    // New config with new secret alone should revert to static password config since dynamic config cannot be decoded
+    props.put(KafkaConfig.PasswordEncoderSecretProp, "another-new-encoder-secret")
+    val newConfigWithNewSecret = KafkaConfig(props)
+    newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
+    assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 748efec..6b26334 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -674,12 +674,22 @@ class KafkaConfigTest {
         case KafkaConfig.SaslKerberosTicketRenewJitterProp =>
         case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp =>
         case KafkaConfig.SaslKerberosPrincipalToLocalRulesProp => // ignore string
+        case KafkaConfig.SaslJaasConfigProp =>
+
+        // Password encoder configs
+        case KafkaConfig.PasswordEncoderSecretProp =>
+        case KafkaConfig.PasswordEncoderOldSecretProp =>
+        case KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp =>
+        case KafkaConfig.PasswordEncoderCipherAlgorithmProp =>
+        case KafkaConfig.PasswordEncoderKeyLengthProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
+        case KafkaConfig.PasswordEncoderIterationsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
 
         //delegation token configs
         case KafkaConfig.DelegationTokenMasterKeyProp => // ignore
         case KafkaConfig.DelegationTokenMaxLifeTimeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+
         case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
       }
     })
diff --git a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
new file mode 100755
index 0000000..11a2a7a
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+
+import javax.crypto.SecretKeyFactory
+
+import kafka.server.Defaults
+import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.utils.Java
+import org.junit.Assert._
+import org.junit.Test
+
+class PasswordEncoderTest {
+
+  @Test
+  def testEncodeDecode(): Unit = {
+    val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+      None,
+      Defaults.PasswordEncoderCipherAlgorithm,
+      Defaults.PasswordEncoderKeyLength,
+      Defaults.PasswordEncoderIterations)
+    val password = "test-password"
+    val encoded = encoder.encode(new Password(password))
+    val encodedMap = CoreUtils.parseCsvMap(encoded)
+    assertEquals("4096", encodedMap(PasswordEncoder.IterationsProp))
+    assertEquals("128", encodedMap(PasswordEncoder.KeyLengthProp))
+    val defaultKeyFactoryAlgorithm = try {
+      SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512")
+      "PBKDF2WithHmacSHA512"
+    } catch {
+      case _: Exception => "PBKDF2WithHmacSHA1"
+    }
+    assertEquals(defaultKeyFactoryAlgorithm, encodedMap(PasswordEncoder.KeyFactoryAlgorithmProp))
+    assertEquals("AES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
+
+    verifyEncodedPassword(encoder, password, encoded)
+  }
+
+  @Test
+  def testEncoderConfigChange(): Unit = {
+    val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+      Some("PBKDF2WithHmacSHA1"),
+      "DES/CBC/PKCS5Padding",
+      64,
+      1024)
+    val password = "test-password"
+    val encoded = encoder.encode(new Password(password))
+    val encodedMap = CoreUtils.parseCsvMap(encoded)
+    assertEquals("1024", encodedMap(PasswordEncoder.IterationsProp))
+    assertEquals("64", encodedMap(PasswordEncoder.KeyLengthProp))
+    assertEquals("PBKDF2WithHmacSHA1", encodedMap(PasswordEncoder.KeyFactoryAlgorithmProp))
+    assertEquals("DES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
+
+    // Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered
+    val decoder = new PasswordEncoder(new Password("password-encoder-secret"),
+      Some("PBKDF2WithHmacSHA1"),
+      "AES/CBC/PKCS5Padding",
+      128,
+      2048)
+    assertEquals(password, decoder.decode(encoded).value)
+
+    // Test that decoding fails if secret is altered
+    val decoder2 = new PasswordEncoder(new Password("secret-2"),
+      Some("PBKDF2WithHmacSHA1"),
+      "AES/CBC/PKCS5Padding",
+      128,
+      1024)
+    try {
+      decoder2.decode(encoded)
+    } catch {
+      case e: ConfigException => // expected exception
+    }
+  }
+
+  @Test
+  def testEncodeDecodeAlgorithms(): Unit = {
+
+    def verifyEncodeDecode(keyFactoryAlg: Option[String], cipherAlg: String, keyLength: Int): Unit = {
+      val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+        keyFactoryAlg,
+        cipherAlg,
+        keyLength,
+        Defaults.PasswordEncoderIterations)
+      val password = "test-password"
+      val encoded = encoder.encode(new Password(password))
+      verifyEncodedPassword(encoder, password, encoded)
+    }
+
+    verifyEncodeDecode(keyFactoryAlg = None, "DES/CBC/PKCS5Padding", keyLength = 64)
+    verifyEncodeDecode(keyFactoryAlg = None, "DESede/CBC/PKCS5Padding", keyLength = 192)
+    verifyEncodeDecode(keyFactoryAlg = None, "AES/CBC/PKCS5Padding", keyLength = 128)
+    verifyEncodeDecode(keyFactoryAlg = None, "AES/CFB/PKCS5Padding", keyLength = 128)
+    verifyEncodeDecode(keyFactoryAlg = None, "AES/OFB/PKCS5Padding", keyLength = 128)
+    verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA1"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
+    if (Java.IS_JAVA8_COMPATIBLE) {
+      verifyEncodeDecode(keyFactoryAlg = None, "AES/GCM/PKCS5Padding", keyLength = 128)
+      verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA256"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
+      verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA512"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
+    }
+  }
+
+  private def verifyEncodedPassword(encoder: PasswordEncoder, password: String, encoded: String): Unit = {
+    val encodedMap = CoreUtils.parseCsvMap(encoded)
+    assertEquals(password.length.toString, encodedMap(PasswordEncoder.PasswordLengthProp))
+    assertNotNull("Invalid salt", encoder.base64Decode(encodedMap("salt")))
+    assertNotNull("Invalid encoding parameters", encoder.base64Decode(encodedMap(PasswordEncoder.InitializationVectorProp)))
+    assertNotNull("Invalid encoded password", encoder.base64Decode(encodedMap(PasswordEncoder.EncyrptedPasswordProp)))
+    assertEquals(password, encoder.decode(encoded).value)
+  }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 407bdb5..303afa7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -718,7 +718,7 @@ object TestUtils extends Logging {
 
   def deleteBrokersInZk(zkClient: KafkaZkClient, ids: Seq[Int]): Seq[Broker] = {
     val brokers = ids.map(createBroker(_, "localhost", 6667, SecurityProtocol.PLAINTEXT))
-    brokers.foreach(b => zkClient.deletePath(BrokerIdsZNode.path + "/" + b))
+    ids.foreach(b => zkClient.deletePath(BrokerIdsZNode.path + "/" + b))
     brokers
   }
 

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.