You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2020/04/27 18:12:40 UTC

[kafka] branch trunk updated: KAKFA-9612: Add an option to kafka-configs.sh to add configs from a prop file (KIP-574)

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2ca19cf  KAKFA-9612: Add an option to kafka-configs.sh to add configs from a prop file (KIP-574)
2ca19cf is described below

commit 2ca19cf603d457b0ec25e5e0f4769560b4c52d6d
Author: Aneel Nazareth <an...@confluent.io>
AuthorDate: Mon Apr 27 23:40:34 2020 +0530

    KAKFA-9612: Add an option to kafka-configs.sh to add configs from a prop file (KIP-574)
    
    Add an option to kafka-configs.sh `--add-config-file` that adds the configs from a properties file.
    Testing: Added new tests to ConfigCommandTest.scala
    
    Author: Aneel Nazareth <an...@confluent.io>
    
    Reviewers: David Jacot <dj...@confluent.io>, Manikumar Reddy <ma...@gmail.com>
    
    Closes #8184 from WanderingStar/KAFKA-9612
---
 .../test/java/org/apache/kafka/test/TestUtils.java | 14 ++++
 .../src/main/scala/kafka/admin/ConfigCommand.scala | 28 +++++---
 .../scala/unit/kafka/admin/ConfigCommandTest.scala | 78 +++++++++++++++++++++-
 3 files changed, 109 insertions(+), 11 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index fe0b4a0..f9be363 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.test;
 
+import java.io.FileWriter;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
@@ -229,6 +230,19 @@ public class TestUtils {
     }
 
     /**
+     * Create a file with the given contents in the default temporary-file directory,
+     * using `kafka` as the prefix and `tmp` as the suffix to generate its name.
+     */
+    public static File tempFile(final String contents) throws IOException {
+        final File file = tempFile();
+        final FileWriter writer = new FileWriter(file);
+        writer.write(contents);
+        writer.close();
+
+        return file;
+    }
+
+    /**
      * Create a temporary relative directory in the default temporary-file directory with the given prefix.
      *
      * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index a43fe9e..5bac33d 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -247,6 +247,10 @@ object ConfigCommand extends Config {
 
   private[admin] def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
     val props = new Properties
+    if (opts.options.has(opts.addConfigFile)) {
+      val file = opts.options.valueOf(opts.addConfigFile)
+      props.putAll(Utils.loadProps(file))
+    }
     if (opts.options.has(opts.addConfig)) {
       // Split list by commas, but avoid those in [], then into KV pairs
       // Each KV pair is of format key=value, split them into key and value, using -1 as the limit for split() to
@@ -258,10 +262,10 @@ object ConfigCommand extends Config {
       require(configsToBeAdded.forall(config => config.length == 2), "Invalid entity config: all configs to be added must be in the format \"key=val\".")
       //Create properties, parsing square brackets from values if necessary
       configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).replaceAll("\\[?\\]?", "").trim))
-      if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
-        println(s"WARNING: The configuration ${LogConfig.MessageFormatVersionProp}=${props.getProperty(LogConfig.MessageFormatVersionProp)} is specified. " +
-          s"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker.")
-      }
+    }
+    if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
+      println(s"WARNING: The configuration ${LogConfig.MessageFormatVersionProp}=${props.getProperty(LogConfig.MessageFormatVersionProp)} is specified. " +
+        s"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker.")
     }
     props
   }
@@ -645,6 +649,9 @@ object ConfigCommand extends Config {
             s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may be specified together to update config for clients of a specific user.")
             .withRequiredArg
             .ofType(classOf[String])
+    val addConfigFile = parser.accepts("add-config-file", "Path to a properties file with configs to add. See add-config for a list of valid configurations.")
+            .withRequiredArg
+            .ofType(classOf[String])
     val deleteConfig = parser.accepts("delete-config", "config keys to remove 'k1,k2'")
             .withRequiredArg
             .ofType(classOf[String])
@@ -764,10 +771,15 @@ object ConfigCommand extends Config {
         } else if (!hasEntityName)
           throw new IllegalArgumentException(s"an entity name must be specified with --alter of ${entityTypeVals.mkString(",")}")
 
-        val isAddConfigPresent: Boolean = options.has(addConfig)
-        val isDeleteConfigPresent: Boolean = options.has(deleteConfig)
-        if (!isAddConfigPresent && !isDeleteConfigPresent)
-          throw new IllegalArgumentException("At least one of --add-config or --delete-config must be specified with --alter")
+        val isAddConfigPresent = options.has(addConfig)
+        val isAddConfigFilePresent = options.has(addConfigFile)
+        val isDeleteConfigPresent = options.has(deleteConfig)
+
+        if(isAddConfigPresent && isAddConfigFilePresent)
+          throw new IllegalArgumentException("Only one of --add-config or --add-config-file must be specified")
+
+        if(!isAddConfigPresent && !isAddConfigFilePresent && !isDeleteConfigPresent)
+          throw new IllegalArgumentException("At least one of --add-config, --add-config-file, or --delete-config must be specified with --alter")
       }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index a963c74..1a7d43e 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -26,15 +26,16 @@ import kafka.server.{ConfigEntityName, ConfigType, KafkaConfig}
 import kafka.utils.{Exit, Logging}
 import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient, ZooKeeperTestHarness}
 import org.apache.kafka.clients.admin._
-import org.apache.kafka.common.config.{ConfigException, ConfigResource}
-import org.apache.kafka.common.internals.KafkaFutureImpl
 import org.apache.kafka.common.Node
+import org.apache.kafka.common.config.{ConfigException, ConfigResource}
 import org.apache.kafka.common.errors.InvalidConfigurationException
+import org.apache.kafka.common.internals.KafkaFutureImpl
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
 import org.apache.kafka.common.utils.Sanitizer
+import org.apache.kafka.test.TestUtils
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
@@ -161,11 +162,24 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     createOpts.checkArgs()
 
     createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+      "--entity-name", "1",
+      "--entity-type", entityType,
+      "--alter",
+      "--add-config-file", "/tmp/new.properties"))
+    createOpts.checkArgs()
+
+    createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
       shortFlag, "1",
       "--alter",
       "--add-config", "a=b,c=d"))
     createOpts.checkArgs()
 
+    createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+      shortFlag, "1",
+      "--alter",
+      "--add-config-file", "/tmp/new.properties"))
+    createOpts.checkArgs()
+
     // For alter and deleted config
     createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
       "--entity-name", "1",
@@ -226,6 +240,47 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     assertTrue(addedProps2.getProperty("f").isEmpty)
   }
 
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldFailIfAddAndAddFile(): Unit = {
+    // Should not parse correctly
+    val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+      "--entity-name", "1",
+      "--entity-type", "brokers",
+      "--alter",
+      "--add-config", "a=b,c=d",
+      "--add-config-file", "/tmp/new.properties"
+    ))
+    createOpts.checkArgs()
+  }
+
+  @Test
+  def testParseConfigsToBeAddedForAddConfigFile(): Unit = {
+    val fileContents =
+      """a=b
+        |c = d
+        |json = {"key": "val"}
+        |nested = [[1, 2], [3, 4]]
+        |""".stripMargin
+
+    val file = TestUtils.tempFile(fileContents)
+
+    val addConfigFileArgs = Array("--add-config-file", file.getPath)
+
+    val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+      "--entity-name", "1",
+      "--entity-type", "brokers",
+      "--alter")
+      ++ addConfigFileArgs)
+    createOpts.checkArgs()
+
+    val addedProps = ConfigCommand.parseConfigsToBeAdded(createOpts)
+    assertEquals(4, addedProps.size())
+    assertEquals("b", addedProps.getProperty("a"))
+    assertEquals("d", addedProps.getProperty("c"))
+    assertEquals("{\"key\": \"val\"}", addedProps.getProperty("json"))
+    assertEquals("[[1, 2], [3, 4]]", addedProps.getProperty("nested"))
+  }
+
   def doTestOptionEntityTypeNames(zkConfig: Boolean): Unit = {
     val connectOpts = if (zkConfig)
       ("--zookeeper", zkConnect)
@@ -424,12 +479,29 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def shouldAlterTopicConfig(): Unit = {
+    doShouldAlterTopicConfig(false)
+  }
+
+  @Test
+  def shouldAlterTopicConfigFile(): Unit = {
+    doShouldAlterTopicConfig(true)
+  }
+
+  def doShouldAlterTopicConfig(file: Boolean): Unit = {
+    var filePath = ""
+    val addedConfigs = Seq("delete.retention.ms=1000000", "min.insync.replicas=2")
+    if (file) {
+      val file = TestUtils.tempFile(addedConfigs.mkString("\n"))
+      filePath = file.getPath
+    }
+
     val resourceName = "my-topic"
     val alterOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
       "--entity-name", resourceName,
       "--entity-type", "topics",
       "--alter",
-      "--add-config", "delete.retention.ms=1000000,min.insync.replicas=2",
+      if (file) "--add-config-file" else "--add-config",
+      if (file) filePath else addedConfigs.mkString(","),
       "--delete-config", "unclean.leader.election.enable"))
     var alteredConfigs = false