You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/03 00:32:01 UTC
[kafka] branch trunk updated: KAFKA-6494;
ConfigCommand update to use AdminClient for broker configs (#4503)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 505adef KAFKA-6494; ConfigCommand update to use AdminClient for broker configs (#4503)
505adef is described below
commit 505adefcecc6e7282b73f934aa8c7a59adcb675e
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Feb 2 16:31:58 2018 -0800
KAFKA-6494; ConfigCommand update to use AdminClient for broker configs (#4503)
Use new AdminClient for describing and altering broker configs using ConfigCommand. Broker quota configs as well as other configs will continue to be processed directly using ZooKeeper until KIP-248 is implemented.
Reviewers: Manikumar Reddy O <ma...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../common/requests/DescribeConfigsResponse.java | 2 +-
.../kafka/common/requests/RequestResponseTest.java | 33 ++++
.../src/main/scala/kafka/admin/ConfigCommand.scala | 173 ++++++++++++++++++---
.../src/main/scala/kafka/server/AdminManager.scala | 33 ++--
.../server/DynamicBrokerReconfigurationTest.scala | 95 ++++++-----
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 88 ++++++++++-
6 files changed, 341 insertions(+), 83 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index 62012f4..e463618 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -74,7 +74,7 @@ public class DescribeConfigsResponse extends AbstractResponse {
new Field(CONFIG_NAME_KEY_NAME, STRING),
new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
new Field(READ_ONLY_KEY_NAME, BOOLEAN),
- new Field(IS_DEFAULT_KEY_NAME, BOOLEAN),
+ new Field(CONFIG_SOURCE_KEY_NAME, INT8),
new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN),
new Field(CONFIG_SYNONYMS_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1)));
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b5420b5..0f7429e 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -65,12 +65,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.test.TestUtils.toBuffer;
@@ -253,6 +255,7 @@ public class RequestResponseTest {
checkRequest(createDescribeConfigsRequestWithConfigEntries(1));
checkErrorResponse(createDescribeConfigsRequest(1), new UnknownServerException());
checkResponse(createDescribeConfigsResponse(), 1);
+ checkDescribeConfigsResponseVersions();
checkRequest(createCreatePartitionsRequest());
checkRequest(createCreatePartitionsRequestWithAssignments());
checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException());
@@ -288,6 +291,36 @@ public class RequestResponseTest {
}
}
+ private void verifyDescribeConfigsResponse(DescribeConfigsResponse expected, DescribeConfigsResponse actual, int version) throws Exception {
+ for (org.apache.kafka.common.requests.Resource resource : expected.configs().keySet()) {
+ Collection<DescribeConfigsResponse.ConfigEntry> deserializedEntries1 = actual.config(resource).entries();
+ Iterator<DescribeConfigsResponse.ConfigEntry> expectedEntries = expected.config(resource).entries().iterator();
+ for (DescribeConfigsResponse.ConfigEntry entry : deserializedEntries1) {
+ DescribeConfigsResponse.ConfigEntry expectedEntry = expectedEntries.next();
+ assertEquals(expectedEntry.name(), entry.name());
+ assertEquals(expectedEntry.value(), entry.value());
+ assertEquals(expectedEntry.isReadOnly(), entry.isReadOnly());
+ assertEquals(expectedEntry.isSensitive(), entry.isSensitive());
+ if (version == 1 || (expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG &&
+ expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
+ assertEquals(expectedEntry.source(), entry.source());
+ else
+ assertEquals(DescribeConfigsResponse.ConfigSource.STATIC_BROKER_CONFIG, entry.source());
+ }
+ }
+ }
+
+ private void checkDescribeConfigsResponseVersions() throws Exception {
+ DescribeConfigsResponse response = createDescribeConfigsResponse();
+ DescribeConfigsResponse deserialized0 = (DescribeConfigsResponse) deserialize(response,
+ response.toStruct((short) 0), (short) 0);
+ verifyDescribeConfigsResponse(response, deserialized0, 0);
+
+ DescribeConfigsResponse deserialized1 = (DescribeConfigsResponse) deserialize(response,
+ response.toStruct((short) 1), (short) 1);
+ verifyDescribeConfigsResponse(response, deserialized1, 1);
+ }
+
private void checkErrorResponse(AbstractRequest req, Throwable e) throws Exception {
checkResponse(req.getErrorResponse(e), req.version());
}
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index cf01a5f..9034dba 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -17,7 +17,8 @@
package kafka.admin
-import java.util.Properties
+import java.util.concurrent.TimeUnit
+import java.util.{Collections, Properties}
import joptsimple._
import kafka.common.Config
@@ -27,6 +28,9 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
import kafka.utils.CommandLineUtils
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AlterConfigsOptions, Config => JConfig, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient}
+import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram._
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
@@ -52,6 +56,13 @@ import scala.collection.JavaConverters._
object ConfigCommand extends Config {
val DefaultScramIterations = 4096
+ // Dynamic broker configs can only be updated using the new AdminClient since they may require
+ // password encryption currently implemented only in the broker. For consistency with older versions,
+ // quota-related broker configs can still be updated using ZooKeeper. ConfigCommand will be migrated
+ // fully to the new AdminClient later (KIP-248).
+ val BrokerConfigsUpdatableUsingZooKeeper = Set(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
+ DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
+ DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp)
def main(args: Array[String]): Unit = {
@@ -63,21 +74,26 @@ object ConfigCommand extends Config {
opts.checkArgs()
val time = Time.SYSTEM
- val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), JaasUtils.isZkSecurityEnabled, 30000, 30000,
- Int.MaxValue, time)
- val adminZkClient = new AdminZkClient(zkClient)
- try {
- if (opts.options.has(opts.alterOpt))
- alterConfig(zkClient, opts, adminZkClient)
- else if (opts.options.has(opts.describeOpt))
- describeConfig(zkClient, opts, adminZkClient)
- } catch {
- case e: Throwable =>
- println("Error while executing config command " + e.getMessage)
- println(Utils.stackTrace(e))
- } finally {
- zkClient.close()
+ if (opts.options.has(opts.zkConnectOpt)) {
+ val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), JaasUtils.isZkSecurityEnabled, 30000, 30000,
+ Int.MaxValue, time)
+ val adminZkClient = new AdminZkClient(zkClient)
+
+ try {
+ if (opts.options.has(opts.alterOpt))
+ alterConfig(zkClient, opts, adminZkClient)
+ else if (opts.options.has(opts.describeOpt))
+ describeConfig(zkClient, opts, adminZkClient)
+ } catch {
+ case e: Throwable =>
+ println("Error while executing config command " + e.getMessage)
+ println(Utils.stackTrace(e))
+ } finally {
+ zkClient.close()
+ }
+ } else {
+ processBrokerConfig(opts)
}
}
@@ -90,6 +106,10 @@ object ConfigCommand extends Config {
if (entityType == ConfigType.User)
preProcessScramCredentials(configsToBeAdded)
+ if (entityType == ConfigType.Broker) {
+ require(configsToBeAdded.asScala.keySet.forall(BrokerConfigsUpdatableUsingZooKeeper.contains),
+ s"--bootstrap-server option must be specified to update broker configs $configsToBeAdded")
+ }
// compile the final set of configs
val configs = adminZkClient.fetchEntityConfig(entityType, entityName)
@@ -172,6 +192,95 @@ object ConfigCommand extends Config {
Seq.empty
}
+ private def processBrokerConfig(opts: ConfigCommandOptions): Unit = {
+ val props = if (opts.options.has(opts.commandConfigOpt))
+ Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+ else
+ new Properties()
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+ val adminClient = JAdminClient.create(props)
+ val entityName = if (opts.options.has(opts.entityName))
+ opts.options.valueOf(opts.entityName)
+ else if (opts.options.has(opts.entityDefault))
+ ""
+ else
+ throw new IllegalArgumentException("At least one of --entity-name or --entity-default must be specified with --bootstrap-server")
+
+ val entityTypes = opts.options.valuesOf(opts.entityType).asScala
+ if (entityTypes.size != 1)
+ throw new IllegalArgumentException("Exactly one --entity-type must be specified with --bootstrap-server")
+ if (entityTypes.head != ConfigType.Broker)
+ throw new IllegalArgumentException(s"--zookeeper option must be specified for entity-type $entityTypes")
+
+ try {
+ if (opts.options.has(opts.alterOpt))
+ alterBrokerConfig(adminClient, opts, entityName)
+ else if (opts.options.has(opts.describeOpt))
+ describeBrokerConfig(adminClient, opts, entityName)
+ } catch {
+ case e: Throwable =>
+ println("Error while executing config command " + e.getMessage)
+ println(Utils.stackTrace(e))
+ } finally {
+ adminClient.close()
+ }
+
+ }
+
+ private[admin] def alterBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions, entityName: String) {
+ val configsToBeAdded = parseConfigsToBeAdded(opts).asScala.map { case (k, v) => (k, new ConfigEntry(k, v)) }
+ val configsToBeDeleted = parseConfigsToBeDeleted(opts)
+
+ // compile the final set of configs
+ val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName)
+ val oldConfig = brokerConfig(adminClient, entityName, includeSynonyms = false)
+ .map { entry => (entry.name, entry) }.toMap
+
+ // fail the command if any of the configs to be deleted does not exist
+ val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+ if (invalidConfigs.nonEmpty)
+ throw new InvalidConfigException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
+
+ val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
+ val sensitiveEntries = newEntries.filter(_._2.value == null)
+ if (sensitiveEntries.nonEmpty)
+ throw new InvalidConfigException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}")
+ val newConfig = new JConfig(newEntries.asJava.values)
+
+ val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+ adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+
+ if (entityName.nonEmpty)
+ println(s"Completed updating config for broker: $entityName.")
+ else
+ println(s"Completed updating default config for brokers in the cluster,")
+ }
+
+ private def describeBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions, entityName: String) {
+ val configs = brokerConfig(adminClient, entityName, includeSynonyms = true)
+ if (entityName.nonEmpty)
+ println(s"Configs for broker $entityName are:")
+ else
+ println(s"Default config for brokers in the cluster are:")
+ configs.foreach { config =>
+ val synonyms = config.synonyms.asScala.map(synonym => s"${synonym.source}:${synonym.name}=${synonym.value}").mkString(", ")
+ println(s" ${config.name}=${config.value} sensitive=${config.isSensitive} synonyms={$synonyms}")
+ }
+ }
+
+ private def brokerConfig(adminClient: JAdminClient, entityName: String, includeSynonyms: Boolean): Seq[ConfigEntry] = {
+ val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName)
+ val configSource = if (!entityName.isEmpty)
+ ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG
+ else
+ ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG
+ val describeOpts = new DescribeConfigsOptions().includeSynonyms(includeSynonyms)
+ val configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOpts).all.get(30, TimeUnit.SECONDS)
+ configs.get(configResource).entries.asScala
+ .filter(entry => entry.source == configSource)
+ .toSeq
+ }
+
case class Entity(entityType: String, sanitizedName: Option[String]) {
val entityPath = sanitizedName match {
case Some(n) => entityType + "/" + n
@@ -249,12 +358,16 @@ object ConfigCommand extends Config {
}
}
+ private def entityNames(opts: ConfigCommandOptions): Seq[String] = {
+ val namesIterator = opts.options.valuesOf(opts.entityName).iterator
+ opts.options.specs.asScala
+ .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default"))
+ .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "")
+ }
+
private def parseQuotaEntity(opts: ConfigCommandOptions): ConfigEntity = {
val types = opts.options.valuesOf(opts.entityType).asScala
- val namesIterator = opts.options.valuesOf(opts.entityName).iterator
- val names = opts.options.specs.asScala
- .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default"))
- .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "")
+ val names = entityNames(opts)
if (opts.options.has(opts.alterOpt) && names.size != types.size)
throw new IllegalArgumentException("--entity-name or --entity-default must be specified with each --entity-type for --alter")
@@ -285,6 +398,16 @@ object ConfigCommand extends Config {
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
+ val bootstrapServerOpt = parser.accepts("bootstrap-server", "The Kafka server to connect to. " +
+ "This is required for describing and altering broker configs.")
+ .withRequiredArg
+ .describedAs("server to connect to")
+ .ofType(classOf[String])
+ val commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client. " +
+ "This is used only with --bootstrap-server option for describing and altering broker configs.")
+ .withRequiredArg
+ .describedAs("command config property file")
+ .ofType(classOf[String])
val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.")
val describeOpt = parser.accepts("describe", "List configs for the given entity.")
val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers)")
@@ -293,7 +416,7 @@ object ConfigCommand extends Config {
val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name/broker id)")
.withRequiredArg
.ofType(classOf[String])
- val entityDefault = parser.accepts("entity-default", "Default entity name for clients/users (applies to corresponding entity type in command line)")
+ val entityDefault = parser.accepts("entity-default", "Default entity name for clients/users/brokers (applies to corresponding entity type in command line)")
val nl = System.getProperty("line.separator")
val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
@@ -321,14 +444,18 @@ object ConfigCommand extends Config {
CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter")
// check required args
- CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType)
CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt))
CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addConfig, deleteConfig))
val entityTypeVals = options.valuesOf(entityType).asScala
+
+ if (options.has(bootstrapServerOpt) == options.has(zkConnectOpt))
+ throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified")
+ if (entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.Topic) || entityTypeVals.contains(ConfigType.User))
+ CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType)
if(options.has(alterOpt)) {
- if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client)) {
+ if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.Broker)) {
if (!options.has(entityName) && !options.has(entityDefault))
- throw new IllegalArgumentException("--entity-name or --entity-default must be specified with --alter of users/clients")
+ throw new IllegalArgumentException("--entity-name or --entity-default must be specified with --alter of users, clients or brokers")
} else if (!options.has(entityName))
throw new IllegalArgumentException(s"--entity-name must be specified with --alter of ${entityTypeVals}")
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 596dde0..8264f7c 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -284,9 +284,12 @@ class AdminManager(val config: KafkaConfig,
def describeConfigs(resourceToConfigNames: Map[Resource, Option[Set[String]]], includeSynonyms: Boolean): Map[Resource, DescribeConfigsResponse.Config] = {
resourceToConfigNames.map { case (resource, configNames) =>
- def createResponseConfig(config: AbstractConfig, createConfigEntry: (String, Any) => DescribeConfigsResponse.ConfigEntry): DescribeConfigsResponse.Config = {
- val allConfigs = config.originals.asScala.filter(_._2 != null) ++ config.values.asScala
- val filteredConfigPairs = allConfigs.filter { case (configName, _) =>
+ def allConfigs(config: AbstractConfig) = {
+ config.originals.asScala.filter(_._2 != null) ++ config.values.asScala
+ }
+ def createResponseConfig(configs: Map[String, Any],
+ createConfigEntry: (String, Any) => DescribeConfigsResponse.ConfigEntry): DescribeConfigsResponse.Config = {
+ val filteredConfigPairs = configs.filter { case (configName, _) =>
/* Always returns true if configNames is None */
configNames.forall(_.contains(configName))
}.toIndexedSeq
@@ -304,14 +307,17 @@ class AdminManager(val config: KafkaConfig,
// Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
- createResponseConfig(logConfig, createTopicConfigEntry(logConfig, topicProps, includeSynonyms))
+ createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms))
case ResourceType.BROKER =>
- val brokerId = resourceNameToBrokerId(resource.name)
- if (brokerId == config.brokerId)
- createResponseConfig(config, createBrokerConfigEntry(includeSynonyms))
+ if (resource.name == null || resource.name.isEmpty)
+ createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs,
+ createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms))
+ else if (resourceNameToBrokerId(resource.name) == config.brokerId)
+ createResponseConfig(allConfigs(config),
+ createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms))
else
- throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId}, but received $brokerId")
+ throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received $resource.name")
case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType")
}
@@ -361,8 +367,12 @@ class AdminManager(val config: KafkaConfig,
case ResourceType.BROKER =>
val brokerId = if (resource.name == null || resource.name.isEmpty)
None
- else
- Some(resourceNameToBrokerId(resource.name))
+ else {
+ val id = resourceNameToBrokerId(resource.name)
+ if (id != this.config.brokerId)
+ throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $resource.name")
+ Some(id)
+ }
val configProps = new Properties
config.entries.asScala.foreach { configEntry =>
configProps.setProperty(configEntry.name, configEntry.value)
@@ -459,13 +469,14 @@ class AdminManager(val config: KafkaConfig,
new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, false, synonyms.asJava)
}
- private def createBrokerConfigEntry(includeSynonyms: Boolean)
+ private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean)
(name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
val allNames = brokerSynonyms(name)
val configEntryType = configType(name, allNames)
val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType)
val allSynonyms = configSynonyms(name, allNames, isSensitive)
+ .filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source
val readOnly = !allNames.exists(DynamicBrokerConfig.AllDynamicConfigs.contains)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 49d9953..1224274 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -18,8 +18,7 @@
package kafka.server
-import java.io.Closeable
-import java.io.File
+import java.io.{Closeable, File, FileOutputStream, FileWriter}
import java.nio.file.{Files, StandardCopyOption}
import java.lang.management.ManagementFactory
import java.util
@@ -27,6 +26,7 @@ import java.util.{Collections, Properties}
import java.util.concurrent.{ConcurrentLinkedQueue, ExecutionException, TimeUnit}
import javax.management.ObjectName
+import kafka.admin.ConfigCommand
import kafka.api.SaslSetup
import kafka.log.LogConfig
import kafka.coordinator.group.OffsetConfig
@@ -136,32 +136,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
@Test
- def testKeystoreUpdate(): Unit = {
- val producer = createProducer(trustStoreFile1, retries = 0)
- val consumer = createConsumer("group1", trustStoreFile1)
- verifyProduceConsume(producer, consumer, 10)
-
- // Producer with new truststore should fail to connect before keystore update
- val producer2 = createProducer(trustStoreFile2, retries = 0)
- verifyAuthenticationFailure(producer2)
-
- // Update broker keystore
- configureDynamicKeystoreInZooKeeper(servers.head.config, servers.map(_.config.brokerId), sslProperties2)
- waitForKeystore(sslProperties2)
-
- // New producer with old truststore should fail to connect
- val producer1 = createProducer(trustStoreFile1, retries = 0)
- verifyAuthenticationFailure(producer1)
-
- // New producer with new truststore should work
- val producer3 = createProducer(trustStoreFile2, retries = 0)
- verifyProduceConsume(producer3, consumer, 10)
-
- // Old producer with old truststore should continue to work (with their old connections)
- verifyProduceConsume(producer, consumer, 10)
- }
-
- @Test
def testKeyStoreDescribeUsingAdminClient(): Unit = {
def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, expectedProps: Properties): Unit = {
@@ -220,7 +194,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
@Test
- def testKeyStoreAlterUsingAdminClient(): Unit = {
+ def testKeyStoreAlter(): Unit = {
val topic2 = "testtopic2"
TestUtils.createTopic(zkClient, topic2, numPartitions = 10, replicationFactor = numServers, servers)
@@ -229,20 +203,28 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
val (producerThread, consumerThread) = startProduceConsume(retries = 0)
TestUtils.waitUntilTrue(() => consumerThread.received >= 10, "Messages not received")
+ // Producer with new truststore should fail to connect before keystore update
+ val producer1 = createProducer(trustStoreFile2, retries = 0)
+ verifyAuthenticationFailure(producer1)
+
// Update broker keystore for external listener
- val adminClient = adminClients.head
- alterSslKeystore(adminClient, sslProperties2, SecureExternal)
+ alterSslKeystoreUsingConfigCommand(sslProperties2, SecureExternal)
- // Produce/consume should work with new truststore
+ // New producer with old truststore should fail to connect
+ val producer2 = createProducer(trustStoreFile1, retries = 0)
+ verifyAuthenticationFailure(producer2)
+
+ // Produce/consume should work with new truststore with new producer/consumer
val producer = createProducer(trustStoreFile2, retries = 0)
val consumer = createConsumer("group1", trustStoreFile2, topic2)
verifyProduceConsume(producer, consumer, 10, topic2)
// Broker keystore update for internal listener with incompatible keystore should fail without update
+ val adminClient = adminClients.head
alterSslKeystore(adminClient, sslProperties2, SecureInternal, expectFailure = true)
verifyProduceConsume(producer, consumer, 10, topic2)
- // Broker keystore update for internal listener with incompatible keystore should succeed
+ // Broker keystore update for internal listener with compatible keystore should succeed
val sslPropertiesCopy = sslProperties1.clone().asInstanceOf[Properties]
val oldFile = new File(sslProperties1.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
val newFile = File.createTempFile("keystore", ".jks")
@@ -613,15 +595,48 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
configDescription
}
+ private def sslProperties(props: Properties, configPrefix: String): Properties = {
+ val sslProps = new Properties
+ sslProps.setProperty(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
+ sslProps.setProperty(s"$configPrefix$SSL_KEYSTORE_TYPE_CONFIG", props.getProperty(SSL_KEYSTORE_TYPE_CONFIG))
+ sslProps.setProperty(s"$configPrefix$SSL_KEYSTORE_PASSWORD_CONFIG", props.get(SSL_KEYSTORE_PASSWORD_CONFIG).asInstanceOf[Password].value)
+ sslProps.setProperty(s"$configPrefix$SSL_KEY_PASSWORD_CONFIG", props.get(SSL_KEY_PASSWORD_CONFIG).asInstanceOf[Password].value)
+ sslProps
+ }
+
private def alterSslKeystore(adminClient: AdminClient, props: Properties, listener: String, expectFailure: Boolean = false): Unit = {
- val newProps = new Properties
val configPrefix = new ListenerName(listener).configPrefix
- val keystoreLocation = props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)
- newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", keystoreLocation)
- newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_TYPE_CONFIG", props.getProperty(SSL_KEYSTORE_TYPE_CONFIG))
- newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_PASSWORD_CONFIG", props.get(SSL_KEYSTORE_PASSWORD_CONFIG).asInstanceOf[Password].value)
- newProps.setProperty(s"$configPrefix$SSL_KEY_PASSWORD_CONFIG", props.get(SSL_KEY_PASSWORD_CONFIG).asInstanceOf[Password].value)
- reconfigureServers(newProps, perBrokerConfig = true, (s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", keystoreLocation), expectFailure)
+ val newProps = sslProperties(props, configPrefix)
+ reconfigureServers(newProps, perBrokerConfig = true,
+ (s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)), expectFailure)
+ }
+
+ private def alterSslKeystoreUsingConfigCommand(props: Properties, listener: String): Unit = {
+ val configPrefix = new ListenerName(listener).configPrefix
+ val newProps = sslProperties(props, configPrefix)
+
+ val securityProps: util.Map[Object, Object] = TestUtils.adminClientSecurityConfigs(SecurityProtocol.SSL, Some(trustStoreFile1), None)
+ val propsFile = TestUtils.tempFile()
+ val propsWriter = new FileWriter(propsFile)
+ try {
+ securityProps.asScala.foreach {
+ case (k, v: Password) => propsWriter.write(s"$k=${v.value}\n")
+ case (k, v: util.List[_]) => propsWriter.write(s"""$k=${v.asScala.mkString(",")}\n""")
+ case (k, v) => propsWriter.write(s"$k=$v\n")
+ }
+ } finally {
+ propsWriter.close()
+ }
+
+ servers.foreach { server =>
+ val args = Array("--bootstrap-server", TestUtils.bootstrapServers(servers, new ListenerName(SecureInternal)),
+ "--command-config", propsFile.getAbsolutePath,
+ "--alter", "--add-config", newProps.asScala.map { case (k, v) => s"$k=$v" }.mkString(","),
+ "--entity-type", "brokers",
+ "--entity-name", server.config.brokerId.toString)
+ ConfigCommand.main(args)
+ }
+ waitForConfig(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
}
private def alterConfigs(adminClient: AdminClient, props: Properties, perBrokerConfig: Boolean): AlterConfigsResult = {
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index acac907..6e78423 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -16,6 +16,7 @@
*/
package kafka.admin
+import java.util
import java.util.Properties
import kafka.admin.ConfigCommand.ConfigCommandOptions
@@ -23,9 +24,13 @@ import kafka.common.InvalidConfigException
import kafka.server.ConfigEntityName
import kafka.utils.Logging
import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.internals.KafkaFutureImpl
+import org.apache.kafka.common.{KafkaFuture, Node}
import org.apache.kafka.common.security.scram.ScramCredentialUtils
import org.apache.kafka.common.utils.Sanitizer
-import org.easymock.EasyMock
+import org.easymock.{EasyMock, IAnswer}
import org.junit.Assert._
import org.junit.Test
@@ -137,22 +142,79 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
}
@Test
- def shouldAddBrokerConfig(): Unit = {
- val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+ def shouldAddBrokerQuotaConfig(): Unit = {
+ val alterOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "1",
"--entity-type", "brokers",
"--alter",
- "--add-config", "a=b,c=d"))
+ "--add-config", "leader.replication.throttled.rate=10,follower.replication.throttled.rate=20"))
case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
assertEquals(Seq(1), brokerIds)
- assertEquals("b", configChange.get("a"))
- assertEquals("d", configChange.get("c"))
+ assertEquals("10", configChange.get("leader.replication.throttled.rate"))
+ assertEquals("20", configChange.get("follower.replication.throttled.rate"))
}
}
- ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
+ ConfigCommand.alterConfig(null, alterOpts, new TestAdminZkClient(zkClient))
+ }
+
+ @Test
+ def shouldAddBrokerDynamicConfig(): Unit = {
+ val node = new Node(1, "localhost", 9092)
+ verifyAlterBrokerConfig(node, "1", List("--entity-name", "1"))
+ }
+
+ @Test
+ def shouldAddDefaultBrokerDynamicConfig(): Unit = {
+ val node = new Node(1, "localhost", 9092)
+ verifyAlterBrokerConfig(node, "", List("--entity-default"))
+ }
+
+ def verifyAlterBrokerConfig(node: Node, resourceName: String, resourceOpts: List[String]): Unit = {
+ val optsList = List("--bootstrap-server", "localhost:9092",
+ "--entity-type", "brokers",
+ "--alter",
+ "--add-config", "message.max.bytes=10") ++ resourceOpts
+ val alterOpts = new ConfigCommandOptions(optsList.toArray)
+ val brokerConfigs = mutable.Map[String, String]("num.io.threads" -> "5")
+
+ val resource = new ConfigResource(ConfigResource.Type.BROKER, resourceName)
+ val configEntries = util.Collections.singletonList(new ConfigEntry("num.io.threads", "5"))
+ val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+ future.complete(util.Collections.singletonMap(resource, new Config(configEntries)))
+ val describeResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
+ EasyMock.expect(describeResult.all()).andReturn(future).once()
+
+ val alterFuture = new KafkaFutureImpl[Void]
+ alterFuture.complete(null)
+ val alterResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
+ EasyMock.expect(alterResult.all()).andReturn(alterFuture)
+
+ val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
+ override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
+ assertEquals(1, resources.size)
+ val resource = resources.iterator.next
+ assertEquals(ConfigResource.Type.BROKER, resource.`type`)
+ assertEquals(resourceName, resource.name)
+ describeResult
+ }
+
+ override def alterConfigs(configs: util.Map[ConfigResource, Config], options: AlterConfigsOptions): AlterConfigsResult = {
+ assertEquals(1, configs.size)
+ val entry = configs.entrySet.iterator.next
+ val resource = entry.getKey
+ val config = entry.getValue
+ assertEquals(ConfigResource.Type.BROKER, resource.`type`)
+ config.entries.asScala.foreach { e => brokerConfigs.put(e.name, e.value) }
+ alterResult
+ }
+ }
+ EasyMock.replay(alterResult, describeResult)
+ ConfigCommand.alterBrokerConfig(mockAdminClient, alterOpts, resourceName)
+ assertEquals(Map("message.max.bytes" -> "10", "num.io.threads" -> "5"), brokerConfigs.toMap)
+ EasyMock.reset(alterResult, describeResult)
}
@Test
@@ -183,7 +245,17 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--entity-name", "1,2,3", //Don't support multiple brokers currently
"--entity-type", "brokers",
"--alter",
- "--add-config", "a=b"))
+ "--add-config", "leader.replication.throttled.rate=10"))
+ ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
+ }
+
+ @Test (expected = classOf[IllegalArgumentException])
+ def shouldNotUpdateDynamicBrokerConfigUsingZooKeeper(): Unit = {
+ val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+ "--entity-name", "1",
+ "--entity-type", "brokers",
+ "--alter",
+ "--add-config", "message.max.size=100000"))
ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.