You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/03 00:32:01 UTC

[kafka] branch trunk updated: KAFKA-6494; ConfigCommand update to use AdminClient for broker configs (#4503)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 505adef  KAFKA-6494; ConfigCommand update to use AdminClient for broker configs (#4503)
505adef is described below

commit 505adefcecc6e7282b73f934aa8c7a59adcb675e
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Feb 2 16:31:58 2018 -0800

    KAFKA-6494; ConfigCommand update to use AdminClient for broker configs (#4503)
    
    Use new AdminClient for describing and altering broker configs using ConfigCommand. Broker quota configs as well as other configs will continue to be processed directly using ZooKeeper until KIP-248 is implemented.
    
    Reviewers: Manikumar Reddy O <ma...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../common/requests/DescribeConfigsResponse.java   |   2 +-
 .../kafka/common/requests/RequestResponseTest.java |  33 ++++
 .../src/main/scala/kafka/admin/ConfigCommand.scala | 173 ++++++++++++++++++---
 .../src/main/scala/kafka/server/AdminManager.scala |  33 ++--
 .../server/DynamicBrokerReconfigurationTest.scala  |  95 ++++++-----
 .../scala/unit/kafka/admin/ConfigCommandTest.scala |  88 ++++++++++-
 6 files changed, 341 insertions(+), 83 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index 62012f4..e463618 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -74,7 +74,7 @@ public class DescribeConfigsResponse extends AbstractResponse {
             new Field(CONFIG_NAME_KEY_NAME, STRING),
             new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
             new Field(READ_ONLY_KEY_NAME, BOOLEAN),
-            new Field(IS_DEFAULT_KEY_NAME, BOOLEAN),
+            new Field(CONFIG_SOURCE_KEY_NAME, INT8),
             new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN),
             new Field(CONFIG_SYNONYMS_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1)));
 
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b5420b5..0f7429e 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -65,12 +65,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.test.TestUtils.toBuffer;
@@ -253,6 +255,7 @@ public class RequestResponseTest {
         checkRequest(createDescribeConfigsRequestWithConfigEntries(1));
         checkErrorResponse(createDescribeConfigsRequest(1), new UnknownServerException());
         checkResponse(createDescribeConfigsResponse(), 1);
+        checkDescribeConfigsResponseVersions();
         checkRequest(createCreatePartitionsRequest());
         checkRequest(createCreatePartitionsRequestWithAssignments());
         checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException());
@@ -288,6 +291,36 @@ public class RequestResponseTest {
         }
     }
 
+    private void verifyDescribeConfigsResponse(DescribeConfigsResponse expected, DescribeConfigsResponse actual, int version) throws Exception {
+        for (org.apache.kafka.common.requests.Resource resource : expected.configs().keySet()) {
+            Collection<DescribeConfigsResponse.ConfigEntry> deserializedEntries1 = actual.config(resource).entries();
+            Iterator<DescribeConfigsResponse.ConfigEntry> expectedEntries = expected.config(resource).entries().iterator();
+            for (DescribeConfigsResponse.ConfigEntry entry : deserializedEntries1) {
+                DescribeConfigsResponse.ConfigEntry expectedEntry = expectedEntries.next();
+                assertEquals(expectedEntry.name(), entry.name());
+                assertEquals(expectedEntry.value(), entry.value());
+                assertEquals(expectedEntry.isReadOnly(), entry.isReadOnly());
+                assertEquals(expectedEntry.isSensitive(), entry.isSensitive());
+                if (version == 1 || (expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG &&
+                        expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
+                    assertEquals(expectedEntry.source(), entry.source());
+                else
+                    assertEquals(DescribeConfigsResponse.ConfigSource.STATIC_BROKER_CONFIG, entry.source());
+            }
+        }
+    }
+
+    private void checkDescribeConfigsResponseVersions() throws Exception {
+        DescribeConfigsResponse response = createDescribeConfigsResponse();
+        DescribeConfigsResponse deserialized0 = (DescribeConfigsResponse) deserialize(response,
+                response.toStruct((short) 0), (short) 0);
+        verifyDescribeConfigsResponse(response, deserialized0, 0);
+
+        DescribeConfigsResponse deserialized1 = (DescribeConfigsResponse) deserialize(response,
+                response.toStruct((short) 1), (short) 1);
+        verifyDescribeConfigsResponse(response, deserialized1, 1);
+    }
+
     private void checkErrorResponse(AbstractRequest req, Throwable e) throws Exception {
         checkResponse(req.getErrorResponse(e), req.version());
     }
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index cf01a5f..9034dba 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -17,7 +17,8 @@
 
 package kafka.admin
 
-import java.util.Properties
+import java.util.concurrent.TimeUnit
+import java.util.{Collections, Properties}
 
 import joptsimple._
 import kafka.common.Config
@@ -27,6 +28,9 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
 import kafka.utils.CommandLineUtils
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AlterConfigsOptions, Config => JConfig, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient}
+import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram._
 import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
@@ -52,6 +56,13 @@ import scala.collection.JavaConverters._
 object ConfigCommand extends Config {
 
   val DefaultScramIterations = 4096
+  // Dynamic broker configs can only be updated using the new AdminClient since they may require
+  // password encryption currently implemented only in the broker. For consistency with older versions,
+  // quota-related broker configs can still be updated using ZooKeeper. ConfigCommand will be migrated
+  // fully to the new AdminClient later (KIP-248).
+  val BrokerConfigsUpdatableUsingZooKeeper = Set(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
+    DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
+    DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp)
 
   def main(args: Array[String]): Unit = {
 
@@ -63,21 +74,26 @@ object ConfigCommand extends Config {
     opts.checkArgs()
 
     val time = Time.SYSTEM
-    val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), JaasUtils.isZkSecurityEnabled, 30000, 30000,
-      Int.MaxValue, time)
-    val adminZkClient = new AdminZkClient(zkClient)
 
-    try {
-      if (opts.options.has(opts.alterOpt))
-        alterConfig(zkClient, opts, adminZkClient)
-      else if (opts.options.has(opts.describeOpt))
-        describeConfig(zkClient, opts, adminZkClient)
-    } catch {
-      case e: Throwable =>
-        println("Error while executing config command " + e.getMessage)
-        println(Utils.stackTrace(e))
-    } finally {
-      zkClient.close()
+    if (opts.options.has(opts.zkConnectOpt)) {
+      val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), JaasUtils.isZkSecurityEnabled, 30000, 30000,
+        Int.MaxValue, time)
+      val adminZkClient = new AdminZkClient(zkClient)
+
+      try {
+        if (opts.options.has(opts.alterOpt))
+          alterConfig(zkClient, opts, adminZkClient)
+        else if (opts.options.has(opts.describeOpt))
+          describeConfig(zkClient, opts, adminZkClient)
+      } catch {
+        case e: Throwable =>
+          println("Error while executing config command " + e.getMessage)
+          println(Utils.stackTrace(e))
+      } finally {
+        zkClient.close()
+      }
+    } else {
+      processBrokerConfig(opts)
     }
   }
 
@@ -90,6 +106,10 @@ object ConfigCommand extends Config {
 
     if (entityType == ConfigType.User)
       preProcessScramCredentials(configsToBeAdded)
+    if (entityType == ConfigType.Broker) {
+      require(configsToBeAdded.asScala.keySet.forall(BrokerConfigsUpdatableUsingZooKeeper.contains),
+        s"--bootstrap-server option must be specified to update broker configs $configsToBeAdded")
+    }
 
     // compile the final set of configs
     val configs = adminZkClient.fetchEntityConfig(entityType, entityName)
@@ -172,6 +192,95 @@ object ConfigCommand extends Config {
       Seq.empty
   }
 
+  private def processBrokerConfig(opts: ConfigCommandOptions): Unit = {
+    val props = if (opts.options.has(opts.commandConfigOpt))
+      Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+    else
+      new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+    val adminClient = JAdminClient.create(props)
+    val entityName = if (opts.options.has(opts.entityName))
+      opts.options.valueOf(opts.entityName)
+    else if (opts.options.has(opts.entityDefault))
+      ""
+    else
+      throw new IllegalArgumentException("At least one of --entity-name or --entity-default must be specified with --bootstrap-server")
+
+    val entityTypes = opts.options.valuesOf(opts.entityType).asScala
+    if (entityTypes.size != 1)
+      throw new IllegalArgumentException("Exactly one --entity-type must be specified with --bootstrap-server")
+    if (entityTypes.head != ConfigType.Broker)
+      throw new IllegalArgumentException(s"--zookeeper option must be specified for entity-type $entityTypes")
+
+    try {
+      if (opts.options.has(opts.alterOpt))
+        alterBrokerConfig(adminClient, opts, entityName)
+      else if (opts.options.has(opts.describeOpt))
+        describeBrokerConfig(adminClient, opts, entityName)
+    } catch {
+      case e: Throwable =>
+        println("Error while executing config command " + e.getMessage)
+        println(Utils.stackTrace(e))
+    } finally {
+      adminClient.close()
+    }
+
+  }
+
+  private[admin] def alterBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions, entityName: String) {
+    val configsToBeAdded = parseConfigsToBeAdded(opts).asScala.map { case (k, v) => (k, new ConfigEntry(k, v)) }
+    val configsToBeDeleted = parseConfigsToBeDeleted(opts)
+
+    // compile the final set of configs
+    val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName)
+    val oldConfig = brokerConfig(adminClient, entityName, includeSynonyms = false)
+        .map { entry => (entry.name, entry) }.toMap
+
+    // fail the command if any of the configs to be deleted does not exist
+    val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+    if (invalidConfigs.nonEmpty)
+      throw new InvalidConfigException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
+
+    val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
+    val sensitiveEntries = newEntries.filter(_._2.value == null)
+    if (sensitiveEntries.nonEmpty)
+      throw new InvalidConfigException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}")
+    val newConfig = new JConfig(newEntries.asJava.values)
+
+    val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+    adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+
+    if (entityName.nonEmpty)
+      println(s"Completed updating config for broker: $entityName.")
+    else
+      println(s"Completed updating default config for brokers in the cluster,")
+  }
+
+  private def describeBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions, entityName: String) {
+    val configs = brokerConfig(adminClient, entityName, includeSynonyms = true)
+    if (entityName.nonEmpty)
+      println(s"Configs for broker $entityName are:")
+    else
+      println(s"Default config for brokers in the cluster are:")
+    configs.foreach { config =>
+      val synonyms = config.synonyms.asScala.map(synonym => s"${synonym.source}:${synonym.name}=${synonym.value}").mkString(", ")
+      println(s"  ${config.name}=${config.value} sensitive=${config.isSensitive} synonyms={$synonyms}")
+    }
+  }
+
+  private def brokerConfig(adminClient: JAdminClient, entityName: String, includeSynonyms: Boolean): Seq[ConfigEntry] = {
+    val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName)
+    val configSource = if (!entityName.isEmpty)
+      ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG
+    else
+      ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG
+    val describeOpts = new DescribeConfigsOptions().includeSynonyms(includeSynonyms)
+    val configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOpts).all.get(30, TimeUnit.SECONDS)
+    configs.get(configResource).entries.asScala
+      .filter(entry => entry.source == configSource)
+      .toSeq
+  }
+
   case class Entity(entityType: String, sanitizedName: Option[String]) {
     val entityPath = sanitizedName match {
       case Some(n) => entityType + "/" + n
@@ -249,12 +358,16 @@ object ConfigCommand extends Config {
     }
   }
 
+  private def entityNames(opts: ConfigCommandOptions): Seq[String] = {
+    val namesIterator = opts.options.valuesOf(opts.entityName).iterator
+    opts.options.specs.asScala
+      .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default"))
+      .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "")
+  }
+
   private def parseQuotaEntity(opts: ConfigCommandOptions): ConfigEntity = {
     val types = opts.options.valuesOf(opts.entityType).asScala
-    val namesIterator = opts.options.valuesOf(opts.entityName).iterator
-    val names = opts.options.specs.asScala
-                    .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default"))
-                    .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "")
+    val names = entityNames(opts)
 
     if (opts.options.has(opts.alterOpt) && names.size != types.size)
       throw new IllegalArgumentException("--entity-name or --entity-default must be specified with each --entity-type for --alter")
@@ -285,6 +398,16 @@ object ConfigCommand extends Config {
             .withRequiredArg
             .describedAs("urls")
             .ofType(classOf[String])
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", "The Kafka server to connect to. " +
+      "This is required for describing and altering broker configs.")
+      .withRequiredArg
+      .describedAs("server to connect to")
+      .ofType(classOf[String])
+    val commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client. " +
+      "This is used only with --bootstrap-server option for describing and altering broker configs.")
+      .withRequiredArg
+      .describedAs("command config property file")
+      .ofType(classOf[String])
     val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.")
     val describeOpt = parser.accepts("describe", "List configs for the given entity.")
     val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers)")
@@ -293,7 +416,7 @@ object ConfigCommand extends Config {
     val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name/broker id)")
             .withRequiredArg
             .ofType(classOf[String])
-    val entityDefault = parser.accepts("entity-default", "Default entity name for clients/users (applies to corresponding entity type in command line)")
+    val entityDefault = parser.accepts("entity-default", "Default entity name for clients/users/brokers (applies to corresponding entity type in command line)")
 
     val nl = System.getProperty("line.separator")
     val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
@@ -321,14 +444,18 @@ object ConfigCommand extends Config {
         CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter")
 
       // check required args
-      CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType)
       CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addConfig, deleteConfig))
       val entityTypeVals = options.valuesOf(entityType).asScala
+
+      if (options.has(bootstrapServerOpt) == options.has(zkConnectOpt))
+        throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified")
+      if (entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.Topic) || entityTypeVals.contains(ConfigType.User))
+        CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType)
       if(options.has(alterOpt)) {
-        if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client)) {
+        if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.Broker)) {
           if (!options.has(entityName) && !options.has(entityDefault))
-            throw new IllegalArgumentException("--entity-name or --entity-default must be specified with --alter of users/clients")
+            throw new IllegalArgumentException("--entity-name or --entity-default must be specified with --alter of users, clients or brokers")
         } else if (!options.has(entityName))
             throw new IllegalArgumentException(s"--entity-name must be specified with --alter of ${entityTypeVals}")
 
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 596dde0..8264f7c 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -284,9 +284,12 @@ class AdminManager(val config: KafkaConfig,
   def describeConfigs(resourceToConfigNames: Map[Resource, Option[Set[String]]], includeSynonyms: Boolean): Map[Resource, DescribeConfigsResponse.Config] = {
     resourceToConfigNames.map { case (resource, configNames) =>
 
-      def createResponseConfig(config: AbstractConfig, createConfigEntry: (String, Any) => DescribeConfigsResponse.ConfigEntry): DescribeConfigsResponse.Config = {
-        val allConfigs = config.originals.asScala.filter(_._2 != null) ++ config.values.asScala
-        val filteredConfigPairs = allConfigs.filter { case (configName, _) =>
+      def allConfigs(config: AbstractConfig) = {
+        config.originals.asScala.filter(_._2 != null) ++ config.values.asScala
+      }
+      def createResponseConfig(configs: Map[String, Any],
+                               createConfigEntry: (String, Any) => DescribeConfigsResponse.ConfigEntry): DescribeConfigsResponse.Config = {
+        val filteredConfigPairs = configs.filter { case (configName, _) =>
           /* Always returns true if configNames is None */
           configNames.forall(_.contains(configName))
         }.toIndexedSeq
@@ -304,14 +307,17 @@ class AdminManager(val config: KafkaConfig,
             // Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
             val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
             val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
-            createResponseConfig(logConfig, createTopicConfigEntry(logConfig, topicProps, includeSynonyms))
+            createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms))
 
           case ResourceType.BROKER =>
-            val brokerId = resourceNameToBrokerId(resource.name)
-            if (brokerId == config.brokerId)
-              createResponseConfig(config, createBrokerConfigEntry(includeSynonyms))
+            if (resource.name == null || resource.name.isEmpty)
+              createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs,
+                createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms))
+            else if (resourceNameToBrokerId(resource.name) == config.brokerId)
+              createResponseConfig(allConfigs(config),
+                createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms))
             else
-              throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId}, but received $brokerId")
+              throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received $resource.name")
 
           case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType")
         }
@@ -361,8 +367,12 @@ class AdminManager(val config: KafkaConfig,
           case ResourceType.BROKER =>
             val brokerId = if (resource.name == null || resource.name.isEmpty)
               None
-            else
-              Some(resourceNameToBrokerId(resource.name))
+            else {
+              val id = resourceNameToBrokerId(resource.name)
+              if (id != this.config.brokerId)
+                throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $resource.name")
+              Some(id)
+            }
             val configProps = new Properties
             config.entries.asScala.foreach { configEntry =>
               configProps.setProperty(configEntry.name, configEntry.value)
@@ -459,13 +469,14 @@ class AdminManager(val config: KafkaConfig,
     new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, false, synonyms.asJava)
   }
 
-  private def createBrokerConfigEntry(includeSynonyms: Boolean)
+  private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean)
                                      (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
     val allNames = brokerSynonyms(name)
     val configEntryType = configType(name, allNames)
     val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
     val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType)
     val allSynonyms = configSynonyms(name, allNames, isSensitive)
+        .filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)
     val synonyms = if (!includeSynonyms) List.empty else allSynonyms
     val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source
     val readOnly = !allNames.exists(DynamicBrokerConfig.AllDynamicConfigs.contains)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 49d9953..1224274 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -18,8 +18,7 @@
 
 package kafka.server
 
-import java.io.Closeable
-import java.io.File
+import java.io.{Closeable, File, FileOutputStream, FileWriter}
 import java.nio.file.{Files, StandardCopyOption}
 import java.lang.management.ManagementFactory
 import java.util
@@ -27,6 +26,7 @@ import java.util.{Collections, Properties}
 import java.util.concurrent.{ConcurrentLinkedQueue, ExecutionException, TimeUnit}
 import javax.management.ObjectName
 
+import kafka.admin.ConfigCommand
 import kafka.api.SaslSetup
 import kafka.log.LogConfig
 import kafka.coordinator.group.OffsetConfig
@@ -136,32 +136,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   }
 
   @Test
-  def testKeystoreUpdate(): Unit = {
-    val producer = createProducer(trustStoreFile1, retries = 0)
-    val consumer = createConsumer("group1", trustStoreFile1)
-    verifyProduceConsume(producer, consumer, 10)
-
-    // Producer with new truststore should fail to connect before keystore update
-    val producer2 = createProducer(trustStoreFile2, retries = 0)
-    verifyAuthenticationFailure(producer2)
-
-    // Update broker keystore
-    configureDynamicKeystoreInZooKeeper(servers.head.config, servers.map(_.config.brokerId), sslProperties2)
-    waitForKeystore(sslProperties2)
-
-    // New producer with old truststore should fail to connect
-    val producer1 = createProducer(trustStoreFile1, retries = 0)
-    verifyAuthenticationFailure(producer1)
-
-    // New producer with new truststore should work
-    val producer3 = createProducer(trustStoreFile2, retries = 0)
-    verifyProduceConsume(producer3, consumer, 10)
-
-    // Old producer with old truststore should continue to work (with their old connections)
-    verifyProduceConsume(producer, consumer, 10)
-  }
-
-  @Test
   def testKeyStoreDescribeUsingAdminClient(): Unit = {
 
     def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, expectedProps: Properties): Unit = {
@@ -220,7 +194,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   }
 
   @Test
-  def testKeyStoreAlterUsingAdminClient(): Unit = {
+  def testKeyStoreAlter(): Unit = {
     val topic2 = "testtopic2"
     TestUtils.createTopic(zkClient, topic2, numPartitions = 10, replicationFactor = numServers, servers)
 
@@ -229,20 +203,28 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     val (producerThread, consumerThread) = startProduceConsume(retries = 0)
     TestUtils.waitUntilTrue(() => consumerThread.received >= 10, "Messages not received")
 
+    // Producer with new truststore should fail to connect before keystore update
+    val producer1 = createProducer(trustStoreFile2, retries = 0)
+    verifyAuthenticationFailure(producer1)
+
     // Update broker keystore for external listener
-    val adminClient = adminClients.head
-    alterSslKeystore(adminClient, sslProperties2, SecureExternal)
+    alterSslKeystoreUsingConfigCommand(sslProperties2, SecureExternal)
 
-    // Produce/consume should work with new truststore
+    // New producer with old truststore should fail to connect
+    val producer2 = createProducer(trustStoreFile1, retries = 0)
+    verifyAuthenticationFailure(producer2)
+
+    // Produce/consume should work with new truststore with new producer/consumer
     val producer = createProducer(trustStoreFile2, retries = 0)
     val consumer = createConsumer("group1", trustStoreFile2, topic2)
     verifyProduceConsume(producer, consumer, 10, topic2)
 
     // Broker keystore update for internal listener with incompatible keystore should fail without update
+    val adminClient = adminClients.head
     alterSslKeystore(adminClient, sslProperties2, SecureInternal, expectFailure = true)
     verifyProduceConsume(producer, consumer, 10, topic2)
 
-    // Broker keystore update for internal listener with incompatible keystore should succeed
+    // Broker keystore update for internal listener with compatible keystore should succeed
     val sslPropertiesCopy = sslProperties1.clone().asInstanceOf[Properties]
     val oldFile = new File(sslProperties1.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
     val newFile = File.createTempFile("keystore", ".jks")
@@ -613,15 +595,48 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     configDescription
   }
 
+  private def sslProperties(props: Properties, configPrefix: String): Properties = {
+    val sslProps = new Properties
+    sslProps.setProperty(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
+    sslProps.setProperty(s"$configPrefix$SSL_KEYSTORE_TYPE_CONFIG", props.getProperty(SSL_KEYSTORE_TYPE_CONFIG))
+    sslProps.setProperty(s"$configPrefix$SSL_KEYSTORE_PASSWORD_CONFIG", props.get(SSL_KEYSTORE_PASSWORD_CONFIG).asInstanceOf[Password].value)
+    sslProps.setProperty(s"$configPrefix$SSL_KEY_PASSWORD_CONFIG", props.get(SSL_KEY_PASSWORD_CONFIG).asInstanceOf[Password].value)
+    sslProps
+  }
+
   private def alterSslKeystore(adminClient: AdminClient, props: Properties, listener: String, expectFailure: Boolean  = false): Unit = {
-    val newProps = new Properties
     val configPrefix = new ListenerName(listener).configPrefix
-    val keystoreLocation = props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)
-    newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", keystoreLocation)
-    newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_TYPE_CONFIG", props.getProperty(SSL_KEYSTORE_TYPE_CONFIG))
-    newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_PASSWORD_CONFIG", props.get(SSL_KEYSTORE_PASSWORD_CONFIG).asInstanceOf[Password].value)
-    newProps.setProperty(s"$configPrefix$SSL_KEY_PASSWORD_CONFIG", props.get(SSL_KEY_PASSWORD_CONFIG).asInstanceOf[Password].value)
-    reconfigureServers(newProps, perBrokerConfig = true, (s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", keystoreLocation), expectFailure)
+    val newProps = sslProperties(props, configPrefix)
+    reconfigureServers(newProps, perBrokerConfig = true,
+      (s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)), expectFailure)
+  }
+
+  private def alterSslKeystoreUsingConfigCommand(props: Properties, listener: String): Unit = {
+    val configPrefix = new ListenerName(listener).configPrefix
+    val newProps = sslProperties(props, configPrefix)
+
+    val securityProps: util.Map[Object, Object] = TestUtils.adminClientSecurityConfigs(SecurityProtocol.SSL, Some(trustStoreFile1), None)
+    val propsFile = TestUtils.tempFile()
+    val propsWriter = new FileWriter(propsFile)
+    try {
+      securityProps.asScala.foreach {
+        case (k, v: Password) => propsWriter.write(s"$k=${v.value}\n")
+        case (k, v: util.List[_]) => propsWriter.write(s"""$k=${v.asScala.mkString(",")}\n""")
+        case (k, v) => propsWriter.write(s"$k=$v\n")
+      }
+    } finally {
+      propsWriter.close()
+    }
+
+    servers.foreach { server =>
+      val args = Array("--bootstrap-server", TestUtils.bootstrapServers(servers, new ListenerName(SecureInternal)),
+        "--command-config", propsFile.getAbsolutePath,
+        "--alter", "--add-config", newProps.asScala.map { case (k, v) => s"$k=$v" }.mkString(","),
+        "--entity-type", "brokers",
+        "--entity-name", server.config.brokerId.toString)
+      ConfigCommand.main(args)
+    }
+    waitForConfig(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
   }
 
   private def alterConfigs(adminClient: AdminClient, props: Properties, perBrokerConfig: Boolean): AlterConfigsResult = {
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index acac907..6e78423 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -16,6 +16,7 @@
  */
 package kafka.admin
 
+import java.util
 import java.util.Properties
 
 import kafka.admin.ConfigCommand.ConfigCommandOptions
@@ -23,9 +24,13 @@ import kafka.common.InvalidConfigException
 import kafka.server.ConfigEntityName
 import kafka.utils.Logging
 import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.internals.KafkaFutureImpl
+import org.apache.kafka.common.{KafkaFuture, Node}
 import org.apache.kafka.common.security.scram.ScramCredentialUtils
 import org.apache.kafka.common.utils.Sanitizer
-import org.easymock.EasyMock
+import org.easymock.{EasyMock, IAnswer}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -137,22 +142,79 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
-  def shouldAddBrokerConfig(): Unit = {
-    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+  def shouldAddBrokerQuotaConfig(): Unit = {
+    val alterOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
       "--entity-name", "1",
       "--entity-type", "brokers",
       "--alter",
-      "--add-config", "a=b,c=d"))
+      "--add-config", "leader.replication.throttled.rate=10,follower.replication.throttled.rate=20"))
 
     case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
       override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
         assertEquals(Seq(1), brokerIds)
-        assertEquals("b", configChange.get("a"))
-        assertEquals("d", configChange.get("c"))
+        assertEquals("10", configChange.get("leader.replication.throttled.rate"))
+        assertEquals("20", configChange.get("follower.replication.throttled.rate"))
       }
     }
 
-    ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
+    ConfigCommand.alterConfig(null, alterOpts, new TestAdminZkClient(zkClient))
+  }
+
+  @Test
+  def shouldAddBrokerDynamicConfig(): Unit = {
+    val node = new Node(1, "localhost", 9092)
+    verifyAlterBrokerConfig(node, "1", List("--entity-name", "1"))
+  }
+
+  @Test
+  def shouldAddDefaultBrokerDynamicConfig(): Unit = {
+    val node = new Node(1, "localhost", 9092)
+    verifyAlterBrokerConfig(node, "", List("--entity-default"))
+  }
+
+  def verifyAlterBrokerConfig(node: Node, resourceName: String, resourceOpts: List[String]): Unit = {
+    val optsList = List("--bootstrap-server", "localhost:9092",
+      "--entity-type", "brokers",
+      "--alter",
+      "--add-config", "message.max.bytes=10") ++ resourceOpts
+    val alterOpts = new ConfigCommandOptions(optsList.toArray)
+    val brokerConfigs = mutable.Map[String, String]("num.io.threads" -> "5")
+
+    val resource = new ConfigResource(ConfigResource.Type.BROKER, resourceName)
+    val configEntries = util.Collections.singletonList(new ConfigEntry("num.io.threads", "5"))
+    val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+    future.complete(util.Collections.singletonMap(resource, new Config(configEntries)))
+    val describeResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
+    EasyMock.expect(describeResult.all()).andReturn(future).once()
+
+    val alterFuture = new KafkaFutureImpl[Void]
+    alterFuture.complete(null)
+    val alterResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
+    EasyMock.expect(alterResult.all()).andReturn(alterFuture)
+
+    val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
+      override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
+        assertEquals(1, resources.size)
+        val resource = resources.iterator.next
+        assertEquals(ConfigResource.Type.BROKER, resource.`type`)
+        assertEquals(resourceName, resource.name)
+        describeResult
+      }
+
+      override def alterConfigs(configs: util.Map[ConfigResource, Config], options: AlterConfigsOptions): AlterConfigsResult = {
+        assertEquals(1, configs.size)
+        val entry = configs.entrySet.iterator.next
+        val resource = entry.getKey
+        val config = entry.getValue
+        assertEquals(ConfigResource.Type.BROKER, resource.`type`)
+        config.entries.asScala.foreach { e => brokerConfigs.put(e.name, e.value) }
+        alterResult
+      }
+    }
+    EasyMock.replay(alterResult, describeResult)
+    ConfigCommand.alterBrokerConfig(mockAdminClient, alterOpts, resourceName)
+    assertEquals(Map("message.max.bytes" -> "10", "num.io.threads" -> "5"), brokerConfigs.toMap)
+    EasyMock.reset(alterResult, describeResult)
   }
 
   @Test
@@ -183,7 +245,17 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--entity-name", "1,2,3", //Don't support multiple brokers currently
       "--entity-type", "brokers",
       "--alter",
-      "--add-config", "a=b"))
+      "--add-config", "leader.replication.throttled.rate=10"))
+    ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
+  }
+
+  @Test (expected = classOf[IllegalArgumentException])
+  def shouldNotUpdateDynamicBrokerConfigUsingZooKeeper(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "1",
+      "--entity-type", "brokers",
+      "--alter",
+      "--add-config", "message.max.size=100000"))
     ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.