You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2020/04/27 18:12:40 UTC
[kafka] branch trunk updated: KAKFA-9612: Add an option to
kafka-configs.sh to add configs from a prop file (KIP-574)
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2ca19cf KAKFA-9612: Add an option to kafka-configs.sh to add configs from a prop file (KIP-574)
2ca19cf is described below
commit 2ca19cf603d457b0ec25e5e0f4769560b4c52d6d
Author: Aneel Nazareth <an...@confluent.io>
AuthorDate: Mon Apr 27 23:40:34 2020 +0530
KAKFA-9612: Add an option to kafka-configs.sh to add configs from a prop file (KIP-574)
Add an option to kafka-configs.sh `--add-config-file` that adds the configs from a properties file.
Testing: Added new tests to ConfigCommandTest.scala
Author: Aneel Nazareth <an...@confluent.io>
Reviewers: David Jacot <dj...@confluent.io>, Manikumar Reddy <ma...@gmail.com>
Closes #8184 from WanderingStar/KAFKA-9612
---
.../test/java/org/apache/kafka/test/TestUtils.java | 14 ++++
.../src/main/scala/kafka/admin/ConfigCommand.scala | 28 +++++---
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 78 +++++++++++++++++++++-
3 files changed, 109 insertions(+), 11 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index fe0b4a0..f9be363 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.test;
+import java.io.FileWriter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Cluster;
@@ -229,6 +230,19 @@ public class TestUtils {
}
/**
+ * Create a file with the given contents in the default temporary-file directory,
+ * using `kafka` as the prefix and `tmp` as the suffix to generate its name.
+ */
+ public static File tempFile(final String contents) throws IOException {
+ final File file = tempFile();
+ final FileWriter writer = new FileWriter(file);
+ writer.write(contents);
+ writer.close();
+
+ return file;
+ }
+
+ /**
* Create a temporary relative directory in the default temporary-file directory with the given prefix.
*
* @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index a43fe9e..5bac33d 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -247,6 +247,10 @@ object ConfigCommand extends Config {
private[admin] def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
val props = new Properties
+ if (opts.options.has(opts.addConfigFile)) {
+ val file = opts.options.valueOf(opts.addConfigFile)
+ props.putAll(Utils.loadProps(file))
+ }
if (opts.options.has(opts.addConfig)) {
// Split list by commas, but avoid those in [], then into KV pairs
// Each KV pair is of format key=value, split them into key and value, using -1 as the limit for split() to
@@ -258,10 +262,10 @@ object ConfigCommand extends Config {
require(configsToBeAdded.forall(config => config.length == 2), "Invalid entity config: all configs to be added must be in the format \"key=val\".")
//Create properties, parsing square brackets from values if necessary
configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).replaceAll("\\[?\\]?", "").trim))
- if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
- println(s"WARNING: The configuration ${LogConfig.MessageFormatVersionProp}=${props.getProperty(LogConfig.MessageFormatVersionProp)} is specified. " +
- s"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker.")
- }
+ }
+ if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
+ println(s"WARNING: The configuration ${LogConfig.MessageFormatVersionProp}=${props.getProperty(LogConfig.MessageFormatVersionProp)} is specified. " +
+ s"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker.")
}
props
}
@@ -645,6 +649,9 @@ object ConfigCommand extends Config {
s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may be specified together to update config for clients of a specific user.")
.withRequiredArg
.ofType(classOf[String])
+ val addConfigFile = parser.accepts("add-config-file", "Path to a properties file with configs to add. See add-config for a list of valid configurations.")
+ .withRequiredArg
+ .ofType(classOf[String])
val deleteConfig = parser.accepts("delete-config", "config keys to remove 'k1,k2'")
.withRequiredArg
.ofType(classOf[String])
@@ -764,10 +771,15 @@ object ConfigCommand extends Config {
} else if (!hasEntityName)
throw new IllegalArgumentException(s"an entity name must be specified with --alter of ${entityTypeVals.mkString(",")}")
- val isAddConfigPresent: Boolean = options.has(addConfig)
- val isDeleteConfigPresent: Boolean = options.has(deleteConfig)
- if (!isAddConfigPresent && !isDeleteConfigPresent)
- throw new IllegalArgumentException("At least one of --add-config or --delete-config must be specified with --alter")
+ val isAddConfigPresent = options.has(addConfig)
+ val isAddConfigFilePresent = options.has(addConfigFile)
+ val isDeleteConfigPresent = options.has(deleteConfig)
+
+ if(isAddConfigPresent && isAddConfigFilePresent)
+ throw new IllegalArgumentException("Only one of --add-config or --add-config-file must be specified")
+
+ if(!isAddConfigPresent && !isAddConfigFilePresent && !isDeleteConfigPresent)
+ throw new IllegalArgumentException("At least one of --add-config, --add-config-file, or --delete-config must be specified with --alter")
}
}
}
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index a963c74..1a7d43e 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -26,15 +26,16 @@ import kafka.server.{ConfigEntityName, ConfigType, KafkaConfig}
import kafka.utils.{Exit, Logging}
import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient, ZooKeeperTestHarness}
import org.apache.kafka.clients.admin._
-import org.apache.kafka.common.config.{ConfigException, ConfigResource}
-import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.Node
+import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.errors.InvalidConfigurationException
+import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.common.utils.Sanitizer
+import org.apache.kafka.test.TestUtils
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test
@@ -161,11 +162,24 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
createOpts.checkArgs()
createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+ "--entity-name", "1",
+ "--entity-type", entityType,
+ "--alter",
+ "--add-config-file", "/tmp/new.properties"))
+ createOpts.checkArgs()
+
+ createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
shortFlag, "1",
"--alter",
"--add-config", "a=b,c=d"))
createOpts.checkArgs()
+ createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+ shortFlag, "1",
+ "--alter",
+ "--add-config-file", "/tmp/new.properties"))
+ createOpts.checkArgs()
+
// For alter and deleted config
createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
"--entity-name", "1",
@@ -226,6 +240,47 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
assertTrue(addedProps2.getProperty("f").isEmpty)
}
+ @Test(expected = classOf[IllegalArgumentException])
+ def shouldFailIfAddAndAddFile(): Unit = {
+ // Should not parse correctly
+ val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+ "--entity-name", "1",
+ "--entity-type", "brokers",
+ "--alter",
+ "--add-config", "a=b,c=d",
+ "--add-config-file", "/tmp/new.properties"
+ ))
+ createOpts.checkArgs()
+ }
+
+ @Test
+ def testParseConfigsToBeAddedForAddConfigFile(): Unit = {
+ val fileContents =
+ """a=b
+ |c = d
+ |json = {"key": "val"}
+ |nested = [[1, 2], [3, 4]]
+ |""".stripMargin
+
+ val file = TestUtils.tempFile(fileContents)
+
+ val addConfigFileArgs = Array("--add-config-file", file.getPath)
+
+ val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+ "--entity-name", "1",
+ "--entity-type", "brokers",
+ "--alter")
+ ++ addConfigFileArgs)
+ createOpts.checkArgs()
+
+ val addedProps = ConfigCommand.parseConfigsToBeAdded(createOpts)
+ assertEquals(4, addedProps.size())
+ assertEquals("b", addedProps.getProperty("a"))
+ assertEquals("d", addedProps.getProperty("c"))
+ assertEquals("{\"key\": \"val\"}", addedProps.getProperty("json"))
+ assertEquals("[[1, 2], [3, 4]]", addedProps.getProperty("nested"))
+ }
+
def doTestOptionEntityTypeNames(zkConfig: Boolean): Unit = {
val connectOpts = if (zkConfig)
("--zookeeper", zkConnect)
@@ -424,12 +479,29 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
@Test
def shouldAlterTopicConfig(): Unit = {
+ doShouldAlterTopicConfig(false)
+ }
+
+ @Test
+ def shouldAlterTopicConfigFile(): Unit = {
+ doShouldAlterTopicConfig(true)
+ }
+
+ def doShouldAlterTopicConfig(file: Boolean): Unit = {
+ var filePath = ""
+ val addedConfigs = Seq("delete.retention.ms=1000000", "min.insync.replicas=2")
+ if (file) {
+ val file = TestUtils.tempFile(addedConfigs.mkString("\n"))
+ filePath = file.getPath
+ }
+
val resourceName = "my-topic"
val alterOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
"--entity-name", resourceName,
"--entity-type", "topics",
"--alter",
- "--add-config", "delete.retention.ms=1000000,min.insync.replicas=2",
+ if (file) "--add-config-file" else "--add-config",
+ if (file) filePath else addedConfigs.mkString(","),
"--delete-config", "unclean.leader.election.enable"))
var alteredConfigs = false