You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ch...@apache.org on 2022/08/15 18:05:39 UTC

[kafka] branch trunk updated: MINOR: Appending value to LIST config should not generate empty string with … (#12503)

This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 31547145481 MINOR: Appending value to LIST config should not generate empty string with … (#12503)
31547145481 is described below

commit 3154714548174fd2b037de293c143f3c3db9f7db
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Tue Aug 16 02:05:28 2022 +0800

    MINOR: Appending value to LIST config should not generate empty string with … (#12503)
    
    Reviewers: dengziming <de...@gmail.com>, Luke Chen <sh...@gmail.com>
---
 .../scala/kafka/server/ConfigAdminManager.scala    |  5 ++--
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 32 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index cc7a98179dd..c6ea4e13953 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -497,8 +497,9 @@ object ConfigAdminManager {
             throw new InvalidConfigurationException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry.name}")
           val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name))
             .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST)))
-            .getOrElse("")
-            .split(",").toList
+            .filter(s => s.nonEmpty)
+            .map(_.split(",").toList)
+            .getOrElse(List.empty)
           val appendingValueList = alterConfigOp.configEntry.value.split(",").toList.filter(value => !oldValueList.contains(value))
           val newValueList = oldValueList ::: appendingValueList
           configProps.setProperty(alterConfigOp.configEntry.name, newValueList.mkString(","))
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 203c04a68a7..7121f98bb9c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2480,6 +2480,38 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAppendConfigToEmptyDefaultValue(ignored: String): Unit = {
+    testAppendConfig(new Properties(), "0:0", "0:0")
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAppendConfigToExistentValue(ignored: String): Unit = {
+    val props = new Properties();
+    props.setProperty(LogConfig.LeaderReplicationThrottledReplicasProp, "1:1")
+    testAppendConfig(props, "0:0", "1:1,0:0")
+  }
+
+  private def testAppendConfig(props: Properties, append: String, expected: String): Unit = {
+    client = Admin.create(createConfig)
+    createTopic(topic, topicConfig = props)
+    val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
+    val topicAlterConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, append), AlterConfigOp.OpType.APPEND),
+    ).asJavaCollection
+
+    val alterResult = client.incrementalAlterConfigs(Map(
+      topicResource -> topicAlterConfigs
+    ).asJava)
+    alterResult.all().get()
+
+    ensureConsistentKRaftMetadata()
+    val config = client.describeConfigs(List(topicResource).asJava).all().get().get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp)
+    assertEquals(expected, config.value())
+  }
+
   /**
    * Test that createTopics returns the dynamic configurations of the topics that were created.
    *