You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/05/10 19:41:27 UTC

[kafka] branch trunk updated: KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode (#12108)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0c1cde1080 KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode (#12108)
0c1cde1080 is described below

commit 0c1cde10802456b1bc3f5f12f5e4d3d6ae400edd
Author: dengziming <de...@gmail.com>
AuthorDate: Wed May 11 03:41:17 2022 +0800

    KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode (#12108)
    
    We can append/subtract multiple config values in kraft mode using the `IncrementalAlterConfig` RPC. For example: append/subtract topic config "cleanup.policy" with value="delete,compact" will end up treating "delete,compact" as a value not 2 values. This patch fixes the problem. Additionally, it update the zk logic to correctly handle duplicate additions.
    
    Reviewers: Akhilesh Chaganti <ak...@users.noreply.github.com>, Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/server/ConfigAdminManager.scala    |  3 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |  9 +++
 .../kafka/server/metadata/MetadataPublisher.scala  |  5 ++
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 71 +++++++++++++++++++---
 .../metadata/BrokerMetadataListenerTest.scala      |  4 ++
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 19 ++++++
 .../controller/ConfigurationControlManager.java    | 16 +++--
 .../ConfigurationControlManagerTest.java           | 67 +++++++++++++++++---
 8 files changed, 170 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index a7f5c6bdef..e7d6c33ab2 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -499,7 +499,8 @@ object ConfigAdminManager {
             .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST)))
             .getOrElse("")
             .split(",").toList
-          val newValueList = oldValueList ::: alterConfigOp.configEntry.value.split(",").toList
+          val appendingValueList = alterConfigOp.configEntry.value.split(",").toList.filter(value => !oldValueList.contains(value))
+          val newValueList = oldValueList ::: appendingValueList
           configProps.setProperty(alterConfigOp.configEntry.name, newValueList.mkString(","))
         }
         case OpType.SUBTRACT => {
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index e653e6e5b2..fb6bb61544 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -18,6 +18,7 @@
 package kafka.server.metadata
 
 import java.util.Properties
+import java.util.concurrent.atomic.AtomicLong
 
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
@@ -118,6 +119,11 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
    */
   var _firstPublish = true
 
+  /**
+   * This is updated after all components (e.g. LogManager) has finished publishing the new metadata delta
+   */
+  val publishedOffsetAtomic = new AtomicLong(-1)
+
   override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
     val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()
 
@@ -249,6 +255,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
       if (_firstPublish) {
         finishInitializingReplicaManager(newImage)
       }
+      publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
     } catch {
       case t: Throwable => error(s"Error publishing broker metadata at $highestOffsetAndEpoch", t)
         throw t
@@ -257,6 +264,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
     }
   }
 
+  override def publishedOffset: Long = publishedOffsetAtomic.get()
+
   def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
     conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
   }
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
index 104d164d9c..b63a2c056c 100644
--- a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
@@ -30,4 +30,9 @@ trait MetadataPublisher {
    *                               delta to the previous image.
    */
   def publish(delta: MetadataDelta, newImage: MetadataImage): Unit
+
+  /**
+   * The highest offset of metadata topic which has been published
+   */
+  def publishedOffset: Long
 }
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 81e9f692a8..d6aa7a7e9a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1755,8 +1755,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals(0, allReassignmentsMap.size())
   }
 
-  @Test
-  def testValidIncrementalAlterConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testValidIncrementalAlterConfigs(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     // Create topics
@@ -1793,6 +1794,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
     alterResult.all.get
 
+    if (isKRaftTest()) {
+      TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer, "Timeout waiting for topic configs propagating to brokers")
+    }
+
     // Verify that topics were updated correctly
     var describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava)
     var configs = describeResult.all.get
@@ -1807,7 +1812,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals("lz4", configs.get(topic2Resource).get(LogConfig.CompressionTypeProp).value)
     assertEquals("delete,compact", configs.get(topic2Resource).get(LogConfig.CleanupPolicyProp).value)
 
-    //verify subtract operation, including from an empty property
+    // verify subtract operation, including from an empty property
     topic1AlterConfigs = Seq(
       new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact), AlterConfigOp.OpType.SUBTRACT),
       new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, "0"), AlterConfigOp.OpType.SUBTRACT)
@@ -1825,6 +1830,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
     alterResult.all.get
 
+    if (isKRaftTest()) {
+      TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer, "Timeout waiting for topic configs propagating to brokers")
+    }
+
     // Verify that topics were updated correctly
     describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava)
     configs = describeResult.all.get
@@ -1852,7 +1861,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
 
     assertEquals("delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value)
 
-    //Alter topics with validateOnly=true with invalid configs
+    // Alter topics with validateOnly=true with invalid configs
     topic1AlterConfigs = Seq(
       new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "zip"), AlterConfigOp.OpType.SET)
     ).asJava
@@ -1861,8 +1870,56 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       topic1Resource -> topic1AlterConfigs
     ).asJava, new AlterConfigsOptions().validateOnly(true))
 
-    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
-      Some("Invalid config value for resource"))
+    if (isKRaftTest()) {
+      assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
+        Some("Invalid value zip for configuration compression.type"))
+    } else {
+      assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
+        Some("Invalid config value for resource"))
+    }
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAppendAlreadyExistsConfigsAndSubtractNotExistsConfigs(quorum: String): Unit = {
+    client = Admin.create(createConfig)
+
+    // Create topics
+    val topic = "incremental-alter-configs-topic"
+    val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
+
+    val appendValues = s"0:${brokers.head.config.brokerId}"
+    val subtractValues = brokers.tail.map(broker => s"0:${broker.config.brokerId}").mkString(",")
+    assertNotEquals("", subtractValues)
+
+    val topicCreateConfigs = new Properties
+    topicCreateConfigs.setProperty(LogConfig.LeaderReplicationThrottledReplicasProp, appendValues)
+    createTopic(topic, numPartitions = 1, replicationFactor = 1, topicCreateConfigs)
+
+    // Append value that is already present
+    val topicAppendConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, appendValues), AlterConfigOp.OpType.APPEND),
+    ).asJavaCollection
+
+    val appendResult = client.incrementalAlterConfigs(Map(topicResource -> topicAppendConfigs).asJava)
+    appendResult.all.get
+
+    // Subtract values that are not present
+    val topicSubtractConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, subtractValues), AlterConfigOp.OpType.SUBTRACT)
+    ).asJavaCollection
+    val subtractResult = client.incrementalAlterConfigs(Map(topicResource -> topicSubtractConfigs).asJava)
+    subtractResult.all.get
+
+    if (isKRaftTest()) {
+      TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer)
+    }
+
+    // Verify that topics were updated correctly
+    val describeResult = client.describeConfigs(Seq(topicResource).asJava)
+    val configs = describeResult.all.get
+
+    assertEquals(appendValues, configs.get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp).value)
   }
 
   @Test
@@ -2352,7 +2409,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
    * Note: this test requires some custom static broker and controller configurations, which are set up in
    * BaseAdminIntegrationTest.modifyConfigs and BaseAdminIntegrationTest.kraftControllerConfigs.
    */
-  @ParameterizedTest
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testCreateTopicsReturnsConfigs(quorum: String): Unit = {
     client = Admin.create(super.createConfig)
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index d04377a21c..93dc8bdd39 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -84,6 +84,8 @@ class BrokerMetadataListenerTest {
             Collections.emptyMap[String, VersionRange](), Optional.empty[String](), true),
             delta.clusterDelta().broker(1))
         }
+
+        override def publishedOffset: Long = -1
       }).get()
     } finally {
       listener.close()
@@ -125,6 +127,8 @@ class BrokerMetadataListenerTest {
     override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
       image = newImage
     }
+
+    override def publishedOffset: Long = -1
   }
 
   private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index bc51644cb5..31ba10f79c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1127,6 +1127,25 @@ object TestUtils extends Logging {
       throw new IllegalStateException(s"Cannot get topic: $topic, partition: $partition in server metadata cache"))
   }
 
+  /**
+   * Wait until the kraft broker metadata have caught up to the controller, before calling this, we should make sure
+   * the related metadata message has already been committed to the controller metadata log.
+   */
+  def ensureConsistentKRaftMetadata(
+      brokers: Seq[KafkaBroker],
+      controllerServer: ControllerServer,
+      msg: String = "Timeout waiting for controller metadata propagating to brokers"
+  ): Unit = {
+    val controllerOffset = controllerServer.raftManager.replicatedLog.endOffset().offset - 1
+    TestUtils.waitUntilTrue(
+      () => {
+        brokers.forall { broker =>
+          val metadataOffset = broker.asInstanceOf[BrokerServer].metadataPublisher.publishedOffset
+          metadataOffset >= controllerOffset
+        }
+      }, msg)
+  }
+
   def waitUntilControllerElected(zkClient: KafkaZkClient, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
     val (controllerId, _) = computeUntilTrue(zkClient.getControllerId, waitTime = timeout)(_.isDefined)
     controllerId.getOrElse(throw new AssertionError(s"Controller not elected after $timeout ms"))
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 558e55b902..cde9d39569 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -213,15 +213,19 @@ public class ConfigurationControlManager {
                             "key " + key + " because its type is not LIST."));
                         return;
                     }
-                    List<String> newValueParts = getParts(newValue, key, configResource);
+                    List<String> oldValueList = getParts(newValue, key, configResource);
                     if (opType == APPEND) {
-                        if (!newValueParts.contains(opValue)) {
-                            newValueParts.add(opValue);
+                        for (String value : opValue.split(",")) {
+                            if (!oldValueList.contains(value)) {
+                                oldValueList.add(value);
+                            }
+                        }
+                    } else {
+                        for (String value : opValue.split(",")) {
+                            oldValueList.remove(value);
                         }
-                        newValue = String.join(",", newValueParts);
-                    } else if (newValueParts.remove(opValue)) {
-                        newValue = String.join(",", newValueParts);
                     }
+                    newValue = String.join(",", oldValueList);
                     break;
             }
             if (!Objects.equals(currentValue, newValue)) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 1ba9591e6a..007e84ffc0 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -53,6 +53,7 @@ import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT;
 import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
 import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
 import static org.apache.kafka.metadata.ConfigSynonym.HOURS_TO_MILLISECONDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -137,10 +138,10 @@ public class ConfigurationControlManagerTest {
         RecordTestUtils.assertBatchIteratorContains(asList(
             asList(new ApiMessageAndVersion(new ConfigRecord().
                     setResourceType(TOPIC.id()).setResourceName("mytopic").
-                    setName("abc").setValue("x,y,z"), (short) 0),
+                    setName("abc").setValue("x,y,z"), CONFIG_RECORD.highestSupportedVersion()),
                 new ApiMessageAndVersion(new ConfigRecord().
                     setResourceType(TOPIC.id()).setResourceName("mytopic").
-                    setName("def").setValue("blah"), (short) 0))),
+                    setName("def").setValue("blah"), CONFIG_RECORD.highestSupportedVersion()))),
             manager.iterator(Long.MAX_VALUE));
     }
 
@@ -159,7 +160,7 @@ public class ConfigurationControlManagerTest {
 
         assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
                 new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
-                    setName("abc").setValue("123"), (short) 0)),
+                    setName("abc").setValue("123"), CONFIG_RECORD.highestSupportedVersion())),
                 toMap(entry(BROKER0, new ApiError(Errors.INVALID_CONFIG,
                             "Can't SUBTRACT to key baz because its type is not LIST.")),
                     entry(MYTOPIC, ApiError.NONE))), result);
@@ -168,13 +169,59 @@ public class ConfigurationControlManagerTest {
 
         assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
                 new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
-                    setName("abc").setValue(null), (short) 0)),
+                    setName("abc").setValue(null), CONFIG_RECORD.highestSupportedVersion())),
                 toMap(entry(MYTOPIC, ApiError.NONE))),
             manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(
                 entry("abc", entry(DELETE, "xyz"))))),
                 true));
     }
 
+    @Test
+    public void testIncrementalAlterMultipleConfigValues() {
+        ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
+            setKafkaConfigSchema(SCHEMA).
+            build();
+
+        ControllerResult<Map<ConfigResource, ApiError>> result = manager.
+            incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123,456,789"))))), true);
+
+        assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
+                new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
+                    setName("abc").setValue("123,456,789"), CONFIG_RECORD.highestSupportedVersion())),
+                toMap(entry(MYTOPIC, ApiError.NONE))), result);
+
+        RecordTestUtils.replayAll(manager, result.records());
+
+        // It's ok for the appended value to be already present
+        result = manager
+            .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123,456"))))), true);
+        assertEquals(
+            ControllerResult.atomicOf(Collections.emptyList(), toMap(entry(MYTOPIC, ApiError.NONE))),
+            result
+        );
+        RecordTestUtils.replayAll(manager, result.records());
+
+        result = manager
+            .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(SUBTRACT, "123,456"))))), true);
+        assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
+                new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
+                    setName("abc").setValue("789"), CONFIG_RECORD.highestSupportedVersion())),
+                toMap(entry(MYTOPIC, ApiError.NONE))),
+                result);
+        RecordTestUtils.replayAll(manager, result.records());
+
+        // It's ok for the deleted value not to be present
+        result = manager
+            .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(SUBTRACT, "123456"))))), true);
+        assertEquals(
+            ControllerResult.atomicOf(Collections.emptyList(), toMap(entry(MYTOPIC, ApiError.NONE))),
+            result
+        );
+        RecordTestUtils.replayAll(manager, result.records());
+
+        assertEquals("789", manager.getConfigs(MYTOPIC).get("abc"));
+    }
+
     @Test
     public void testIncrementalAlterConfigsWithoutExistence() {
         ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
@@ -191,7 +238,7 @@ public class ConfigurationControlManagerTest {
 
         assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
                 new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("ExistingTopic").
-                    setName("def").setValue("newVal"), (short) 0)),
+                    setName("def").setValue("newVal"), CONFIG_RECORD.highestSupportedVersion())),
             toMap(entry(BROKER0, new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
                     "Unknown resource.")),
                 entry(existingTopic, ApiError.NONE))), result);
@@ -242,9 +289,9 @@ public class ConfigurationControlManagerTest {
             build();
         assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion(
                 new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
-                    setName("foo.bar").setValue("123"), (short) 0), new ApiMessageAndVersion(
+                    setName("foo.bar").setValue("123"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(
                 new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
-                    setName("quux").setValue("456"), (short) 0)),
+                    setName("quux").setValue("456"), CONFIG_RECORD.highestSupportedVersion())),
             toMap(entry(MYTOPIC, new ApiError(Errors.POLICY_VIOLATION,
                     "Expected: AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" +
                     "type=TOPIC, name='mytopic'), configs={}). Got: " +
@@ -267,10 +314,10 @@ public class ConfigurationControlManagerTest {
         List<ApiMessageAndVersion> expectedRecords1 = asList(
             new ApiMessageAndVersion(new ConfigRecord().
                 setResourceType(TOPIC.id()).setResourceName("mytopic").
-                setName("abc").setValue("456"), (short) 0),
+                setName("abc").setValue("456"), CONFIG_RECORD.highestSupportedVersion()),
             new ApiMessageAndVersion(new ConfigRecord().
                 setResourceType(TOPIC.id()).setResourceName("mytopic").
-                setName("def").setValue("901"), (short) 0));
+                setName("def").setValue("901"), CONFIG_RECORD.highestSupportedVersion()));
         assertEquals(ControllerResult.atomicOf(
                 expectedRecords1, toMap(entry(MYTOPIC, ApiError.NONE))),
             manager.legacyAlterConfigs(
@@ -286,7 +333,7 @@ public class ConfigurationControlManagerTest {
                     .setResourceName("mytopic")
                     .setName("abc")
                     .setValue(null),
-                (short) 0)),
+                CONFIG_RECORD.highestSupportedVersion())),
             toMap(entry(MYTOPIC, ApiError.NONE))),
             manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901")))),
                 true));