You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/05/10 19:41:27 UTC
[kafka] branch trunk updated: KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode (#12108)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0c1cde1080 KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode (#12108)
0c1cde1080 is described below
commit 0c1cde10802456b1bc3f5f12f5e4d3d6ae400edd
Author: dengziming <de...@gmail.com>
AuthorDate: Wed May 11 03:41:17 2022 +0800
KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode (#12108)
We can append/subtract multiple config values in kraft mode using the `IncrementalAlterConfig` RPC. For example: append/subtract topic config "cleanup.policy" with value="delete,compact" will end up treating "delete,compact" as a value not 2 values. This patch fixes the problem. Additionally, it update the zk logic to correctly handle duplicate additions.
Reviewers: Akhilesh Chaganti <ak...@users.noreply.github.com>, Jason Gustafson <ja...@confluent.io>
---
.../scala/kafka/server/ConfigAdminManager.scala | 3 +-
.../server/metadata/BrokerMetadataPublisher.scala | 9 +++
.../kafka/server/metadata/MetadataPublisher.scala | 5 ++
.../kafka/api/PlaintextAdminIntegrationTest.scala | 71 +++++++++++++++++++---
.../metadata/BrokerMetadataListenerTest.scala | 4 ++
.../test/scala/unit/kafka/utils/TestUtils.scala | 19 ++++++
.../controller/ConfigurationControlManager.java | 16 +++--
.../ConfigurationControlManagerTest.java | 67 +++++++++++++++++---
8 files changed, 170 insertions(+), 24 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index a7f5c6bdef..e7d6c33ab2 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -499,7 +499,8 @@ object ConfigAdminManager {
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST)))
.getOrElse("")
.split(",").toList
- val newValueList = oldValueList ::: alterConfigOp.configEntry.value.split(",").toList
+ val appendingValueList = alterConfigOp.configEntry.value.split(",").toList.filter(value => !oldValueList.contains(value))
+ val newValueList = oldValueList ::: appendingValueList
configProps.setProperty(alterConfigOp.configEntry.name, newValueList.mkString(","))
}
case OpType.SUBTRACT => {
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index e653e6e5b2..fb6bb61544 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -18,6 +18,7 @@
package kafka.server.metadata
import java.util.Properties
+import java.util.concurrent.atomic.AtomicLong
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
@@ -118,6 +119,11 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
*/
var _firstPublish = true
+ /**
+ * This is updated after all components (e.g. LogManager) has finished publishing the new metadata delta
+ */
+ val publishedOffsetAtomic = new AtomicLong(-1)
+
override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()
@@ -249,6 +255,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
if (_firstPublish) {
finishInitializingReplicaManager(newImage)
}
+ publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
} catch {
case t: Throwable => error(s"Error publishing broker metadata at $highestOffsetAndEpoch", t)
throw t
@@ -257,6 +264,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
}
}
+ override def publishedOffset: Long = publishedOffsetAtomic.get()
+
def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
}
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
index 104d164d9c..b63a2c056c 100644
--- a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
@@ -30,4 +30,9 @@ trait MetadataPublisher {
* delta to the previous image.
*/
def publish(delta: MetadataDelta, newImage: MetadataImage): Unit
+
+ /**
+ * The highest offset of metadata topic which has been published
+ */
+ def publishedOffset: Long
}
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 81e9f692a8..d6aa7a7e9a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1755,8 +1755,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(0, allReassignmentsMap.size())
}
- @Test
- def testValidIncrementalAlterConfigs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testValidIncrementalAlterConfigs(quorum: String): Unit = {
client = Admin.create(createConfig)
// Create topics
@@ -1793,6 +1794,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
alterResult.all.get
+ if (isKRaftTest()) {
+ TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer, "Timeout waiting for topic configs propagating to brokers")
+ }
+
// Verify that topics were updated correctly
var describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava)
var configs = describeResult.all.get
@@ -1807,7 +1812,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals("lz4", configs.get(topic2Resource).get(LogConfig.CompressionTypeProp).value)
assertEquals("delete,compact", configs.get(topic2Resource).get(LogConfig.CleanupPolicyProp).value)
- //verify subtract operation, including from an empty property
+ // verify subtract operation, including from an empty property
topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact), AlterConfigOp.OpType.SUBTRACT),
new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, "0"), AlterConfigOp.OpType.SUBTRACT)
@@ -1825,6 +1830,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
alterResult.all.get
+ if (isKRaftTest()) {
+ TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer, "Timeout waiting for topic configs propagating to brokers")
+ }
+
// Verify that topics were updated correctly
describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava)
configs = describeResult.all.get
@@ -1852,7 +1861,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals("delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value)
- //Alter topics with validateOnly=true with invalid configs
+ // Alter topics with validateOnly=true with invalid configs
topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "zip"), AlterConfigOp.OpType.SET)
).asJava
@@ -1861,8 +1870,56 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
topic1Resource -> topic1AlterConfigs
).asJava, new AlterConfigsOptions().validateOnly(true))
- assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
- Some("Invalid config value for resource"))
+ if (isKRaftTest()) {
+ assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
+ Some("Invalid value zip for configuration compression.type"))
+ } else {
+ assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
+ Some("Invalid config value for resource"))
+ }
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAppendAlreadyExistsConfigsAndSubtractNotExistsConfigs(quorum: String): Unit = {
+ client = Admin.create(createConfig)
+
+ // Create topics
+ val topic = "incremental-alter-configs-topic"
+ val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
+
+ val appendValues = s"0:${brokers.head.config.brokerId}"
+ val subtractValues = brokers.tail.map(broker => s"0:${broker.config.brokerId}").mkString(",")
+ assertNotEquals("", subtractValues)
+
+ val topicCreateConfigs = new Properties
+ topicCreateConfigs.setProperty(LogConfig.LeaderReplicationThrottledReplicasProp, appendValues)
+ createTopic(topic, numPartitions = 1, replicationFactor = 1, topicCreateConfigs)
+
+ // Append value that is already present
+ val topicAppendConfigs = Seq(
+ new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, appendValues), AlterConfigOp.OpType.APPEND),
+ ).asJavaCollection
+
+ val appendResult = client.incrementalAlterConfigs(Map(topicResource -> topicAppendConfigs).asJava)
+ appendResult.all.get
+
+ // Subtract values that are not present
+ val topicSubtractConfigs = Seq(
+ new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, subtractValues), AlterConfigOp.OpType.SUBTRACT)
+ ).asJavaCollection
+ val subtractResult = client.incrementalAlterConfigs(Map(topicResource -> topicSubtractConfigs).asJava)
+ subtractResult.all.get
+
+ if (isKRaftTest()) {
+ TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer)
+ }
+
+ // Verify that topics were updated correctly
+ val describeResult = client.describeConfigs(Seq(topicResource).asJava)
+ val configs = describeResult.all.get
+
+ assertEquals(appendValues, configs.get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp).value)
}
@Test
@@ -2352,7 +2409,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
* Note: this test requires some custom static broker and controller configurations, which are set up in
* BaseAdminIntegrationTest.modifyConfigs and BaseAdminIntegrationTest.kraftControllerConfigs.
*/
- @ParameterizedTest
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateTopicsReturnsConfigs(quorum: String): Unit = {
client = Admin.create(super.createConfig)
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index d04377a21c..93dc8bdd39 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -84,6 +84,8 @@ class BrokerMetadataListenerTest {
Collections.emptyMap[String, VersionRange](), Optional.empty[String](), true),
delta.clusterDelta().broker(1))
}
+
+ override def publishedOffset: Long = -1
}).get()
} finally {
listener.close()
@@ -125,6 +127,8 @@ class BrokerMetadataListenerTest {
override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
image = newImage
}
+
+ override def publishedOffset: Long = -1
}
private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index bc51644cb5..31ba10f79c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1127,6 +1127,25 @@ object TestUtils extends Logging {
throw new IllegalStateException(s"Cannot get topic: $topic, partition: $partition in server metadata cache"))
}
+ /**
+ * Wait until the kraft broker metadata have caught up to the controller, before calling this, we should make sure
+ * the related metadata message has already been committed to the controller metadata log.
+ */
+ def ensureConsistentKRaftMetadata(
+ brokers: Seq[KafkaBroker],
+ controllerServer: ControllerServer,
+ msg: String = "Timeout waiting for controller metadata propagating to brokers"
+ ): Unit = {
+ val controllerOffset = controllerServer.raftManager.replicatedLog.endOffset().offset - 1
+ TestUtils.waitUntilTrue(
+ () => {
+ brokers.forall { broker =>
+ val metadataOffset = broker.asInstanceOf[BrokerServer].metadataPublisher.publishedOffset
+ metadataOffset >= controllerOffset
+ }
+ }, msg)
+ }
+
def waitUntilControllerElected(zkClient: KafkaZkClient, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
val (controllerId, _) = computeUntilTrue(zkClient.getControllerId, waitTime = timeout)(_.isDefined)
controllerId.getOrElse(throw new AssertionError(s"Controller not elected after $timeout ms"))
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 558e55b902..cde9d39569 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -213,15 +213,19 @@ public class ConfigurationControlManager {
"key " + key + " because its type is not LIST."));
return;
}
- List<String> newValueParts = getParts(newValue, key, configResource);
+ List<String> oldValueList = getParts(newValue, key, configResource);
if (opType == APPEND) {
- if (!newValueParts.contains(opValue)) {
- newValueParts.add(opValue);
+ for (String value : opValue.split(",")) {
+ if (!oldValueList.contains(value)) {
+ oldValueList.add(value);
+ }
+ }
+ } else {
+ for (String value : opValue.split(",")) {
+ oldValueList.remove(value);
}
- newValue = String.join(",", newValueParts);
- } else if (newValueParts.remove(opValue)) {
- newValue = String.join(",", newValueParts);
}
+ newValue = String.join(",", oldValueList);
break;
}
if (!Objects.equals(currentValue, newValue)) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 1ba9591e6a..007e84ffc0 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -53,6 +53,7 @@ import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
import static org.apache.kafka.metadata.ConfigSynonym.HOURS_TO_MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -137,10 +138,10 @@ public class ConfigurationControlManagerTest {
RecordTestUtils.assertBatchIteratorContains(asList(
asList(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("abc").setValue("x,y,z"), (short) 0),
+ setName("abc").setValue("x,y,z"), CONFIG_RECORD.highestSupportedVersion()),
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("def").setValue("blah"), (short) 0))),
+ setName("def").setValue("blah"), CONFIG_RECORD.highestSupportedVersion()))),
manager.iterator(Long.MAX_VALUE));
}
@@ -159,7 +160,7 @@ public class ConfigurationControlManagerTest {
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("abc").setValue("123"), (short) 0)),
+ setName("abc").setValue("123"), CONFIG_RECORD.highestSupportedVersion())),
toMap(entry(BROKER0, new ApiError(Errors.INVALID_CONFIG,
"Can't SUBTRACT to key baz because its type is not LIST.")),
entry(MYTOPIC, ApiError.NONE))), result);
@@ -168,13 +169,59 @@ public class ConfigurationControlManagerTest {
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("abc").setValue(null), (short) 0)),
+ setName("abc").setValue(null), CONFIG_RECORD.highestSupportedVersion())),
toMap(entry(MYTOPIC, ApiError.NONE))),
manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(
entry("abc", entry(DELETE, "xyz"))))),
true));
}
+ @Test
+ public void testIncrementalAlterMultipleConfigValues() {
+ ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
+ setKafkaConfigSchema(SCHEMA).
+ build();
+
+ ControllerResult<Map<ConfigResource, ApiError>> result = manager.
+ incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123,456,789"))))), true);
+
+ assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
+ new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
+ setName("abc").setValue("123,456,789"), CONFIG_RECORD.highestSupportedVersion())),
+ toMap(entry(MYTOPIC, ApiError.NONE))), result);
+
+ RecordTestUtils.replayAll(manager, result.records());
+
+ // It's ok for the appended value to be already present
+ result = manager
+ .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123,456"))))), true);
+ assertEquals(
+ ControllerResult.atomicOf(Collections.emptyList(), toMap(entry(MYTOPIC, ApiError.NONE))),
+ result
+ );
+ RecordTestUtils.replayAll(manager, result.records());
+
+ result = manager
+ .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(SUBTRACT, "123,456"))))), true);
+ assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
+ new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
+ setName("abc").setValue("789"), CONFIG_RECORD.highestSupportedVersion())),
+ toMap(entry(MYTOPIC, ApiError.NONE))),
+ result);
+ RecordTestUtils.replayAll(manager, result.records());
+
+ // It's ok for the deleted value not to be present
+ result = manager
+ .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(SUBTRACT, "123456"))))), true);
+ assertEquals(
+ ControllerResult.atomicOf(Collections.emptyList(), toMap(entry(MYTOPIC, ApiError.NONE))),
+ result
+ );
+ RecordTestUtils.replayAll(manager, result.records());
+
+ assertEquals("789", manager.getConfigs(MYTOPIC).get("abc"));
+ }
+
@Test
public void testIncrementalAlterConfigsWithoutExistence() {
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
@@ -191,7 +238,7 @@ public class ConfigurationControlManagerTest {
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("ExistingTopic").
- setName("def").setValue("newVal"), (short) 0)),
+ setName("def").setValue("newVal"), CONFIG_RECORD.highestSupportedVersion())),
toMap(entry(BROKER0, new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
"Unknown resource.")),
entry(existingTopic, ApiError.NONE))), result);
@@ -242,9 +289,9 @@ public class ConfigurationControlManagerTest {
build();
assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
- setName("foo.bar").setValue("123"), (short) 0), new ApiMessageAndVersion(
+ setName("foo.bar").setValue("123"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(
new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
- setName("quux").setValue("456"), (short) 0)),
+ setName("quux").setValue("456"), CONFIG_RECORD.highestSupportedVersion())),
toMap(entry(MYTOPIC, new ApiError(Errors.POLICY_VIOLATION,
"Expected: AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" +
"type=TOPIC, name='mytopic'), configs={}). Got: " +
@@ -267,10 +314,10 @@ public class ConfigurationControlManagerTest {
List<ApiMessageAndVersion> expectedRecords1 = asList(
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("abc").setValue("456"), (short) 0),
+ setName("abc").setValue("456"), CONFIG_RECORD.highestSupportedVersion()),
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("def").setValue("901"), (short) 0));
+ setName("def").setValue("901"), CONFIG_RECORD.highestSupportedVersion()));
assertEquals(ControllerResult.atomicOf(
expectedRecords1, toMap(entry(MYTOPIC, ApiError.NONE))),
manager.legacyAlterConfigs(
@@ -286,7 +333,7 @@ public class ConfigurationControlManagerTest {
.setResourceName("mytopic")
.setName("abc")
.setValue(null),
- (short) 0)),
+ CONFIG_RECORD.highestSupportedVersion())),
toMap(entry(MYTOPIC, ApiError.NONE))),
manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901")))),
true));