You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/12/06 00:42:17 UTC

[kafka] branch 3.1 updated: KAFKA-13490: Fix createTopics and incrementalAlterConfigs for KRaft mode #11416

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new f068a79  KAFKA-13490: Fix createTopics and incrementalAlterConfigs for KRaft mode #11416
f068a79 is described below

commit f068a7919e7943c9c3d920d8b20956b9f21608fb
Author: Colin P. Mccabe <cm...@confluent.io>
AuthorDate: Mon Oct 18 13:11:53 2021 -0700

    KAFKA-13490: Fix createTopics and incrementalAlterConfigs for KRaft mode #11416
    
    For CreateTopics, fix a bug where if one createTopics in a batch failed, they would all fail with
    the same error code.  Make the error message for TOPIC_ALREADY_EXISTS consistent with the ZK-based
    code by including the topic name.
    
    For IncrementalAlterConfigs, before we allow topic configurations to be set, we should check that
    they are valid. (This also applies to newly created topics.) IncrementalAlterConfigs should ignore
    non-null payloads for DELETE operations. Previously we would return an error in these cases.
    However, this is not compatible with the old ZK-based code, which ignores the payload in these
    cases.
    
    Reviewers: José Armando García Sancio <js...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../server/ControllerConfigurationValidator.scala  | 57 +++++++++++++++++
 .../main/scala/kafka/server/ControllerServer.scala |  1 +
 .../ControllerConfigurationValidatorTest.scala     | 71 ++++++++++++++++++++++
 .../controller/ConfigurationControlManager.java    | 36 +++++------
 .../kafka/controller/ConfigurationValidator.java   | 35 +++++++++++
 .../apache/kafka/controller/QuorumController.java  | 13 +++-
 .../controller/ReplicationControlManager.java      | 16 ++---
 .../ConfigurationControlManagerTest.java           | 40 ++++++++----
 .../controller/ReplicationControlManagerTest.java  |  5 +-
 9 files changed, 232 insertions(+), 42 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
new file mode 100644
index 0000000..dfb78b2
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.Properties
+
+import kafka.log.LogConfig
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC}
+import org.apache.kafka.controller.ConfigurationValidator
+import org.apache.kafka.common.errors.InvalidRequestException
+
+import scala.collection.mutable
+
+class ControllerConfigurationValidator extends ConfigurationValidator {
+  override def validate(resource: ConfigResource, config: util.Map[String, String]): Unit = {
+    resource.`type`() match {
+      case TOPIC =>
+        val properties = new Properties()
+        val nullTopicConfigs = new mutable.ArrayBuffer[String]()
+        config.entrySet().forEach(e => {
+          if (e.getValue() == null) {
+            nullTopicConfigs += e.getKey()
+          } else {
+            properties.setProperty(e.getKey(), e.getValue())
+          }
+        })
+        if (nullTopicConfigs.nonEmpty) {
+          throw new InvalidRequestException("Null value not supported for topic configs : " +
+            nullTopicConfigs.mkString(","))
+        }
+        LogConfig.validate(properties)
+      case BROKER =>
+        // TODO: add broker configuration validation
+      case _ =>
+        // Note: we should never handle BROKER_LOGGER resources here, since changes to
+        // those resources are not persisted in the metadata.
+        throw new InvalidRequestException(s"Unknown resource type ${resource.`type`}")
+    }
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 137d727..ede71d4 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -173,6 +173,7 @@ class ControllerServer(
         setMetrics(new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())).
         setCreateTopicPolicy(createTopicPolicy.asJava).
         setAlterConfigPolicy(alterConfigPolicy.asJava).
+        setConfigurationValidator(new ControllerConfigurationValidator()).
         build()
 
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
diff --git a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
new file mode 100644
index 0000000..3c85299
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import java.util.TreeMap
+import java.util.Collections.emptyMap
+
+import org.junit.jupiter.api.Test
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER_LOGGER, TOPIC}
+import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG}
+import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+
+class ControllerConfigurationValidatorTest {
+  @Test
+  def testUnknownResourceType(): Unit = {
+    val validator = new ControllerConfigurationValidator()
+    assertEquals("Unknown resource type BROKER_LOGGER",
+      assertThrows(classOf[InvalidRequestException], () => validator.validate(
+        new ConfigResource(BROKER_LOGGER, "foo"), emptyMap())). getMessage())
+  }
+
+  @Test
+  def testNullTopicConfigValue(): Unit = {
+    val validator = new ControllerConfigurationValidator()
+    val config = new TreeMap[String, String]()
+    config.put(SEGMENT_JITTER_MS_CONFIG, "10")
+    config.put(SEGMENT_BYTES_CONFIG, null)
+    config.put(SEGMENT_MS_CONFIG, null)
+    assertEquals("Null value not supported for topic configs : segment.bytes,segment.ms",
+      assertThrows(classOf[InvalidRequestException], () => validator.validate(
+        new ConfigResource(TOPIC, "foo"), config)). getMessage())
+  }
+
+  @Test
+  def testValidTopicConfig(): Unit = {
+    val validator = new ControllerConfigurationValidator()
+    val config = new TreeMap[String, String]()
+    config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
+    config.put(SEGMENT_BYTES_CONFIG, "67108864")
+    validator.validate(new ConfigResource(TOPIC, "foo"), config)
+  }
+
+  @Test
+  def testInvalidTopicConfig(): Unit = {
+    val validator = new ControllerConfigurationValidator()
+    val config = new TreeMap[String, String]()
+    config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
+    config.put(SEGMENT_BYTES_CONFIG, "67108864")
+    config.put("foobar", "abc")
+    assertEquals("Unknown topic config name: foobar",
+      assertThrows(classOf[InvalidConfigurationException], () => validator.validate(
+        new ConfigResource(TOPIC, "foo"), config)). getMessage())
+  }
+}
\ No newline at end of file
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index a305719..83f1cbf 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -20,9 +20,9 @@ package org.apache.kafka.controller;
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
 import org.apache.kafka.common.config.ConfigDef.ConfigKey;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigResource.Type;
 import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.errors.PolicyViolationException;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.protocol.Errors;
@@ -49,6 +49,7 @@ import java.util.Optional;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
 import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
+import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG;
 
 
 public class ConfigurationControlManager {
@@ -56,17 +57,20 @@ public class ConfigurationControlManager {
     private final SnapshotRegistry snapshotRegistry;
     private final Map<ConfigResource.Type, ConfigDef> configDefs;
     private final Optional<AlterConfigPolicy> alterConfigPolicy;
+    private final ConfigurationValidator validator;
     private final TimelineHashMap<ConfigResource, TimelineHashMap<String, String>> configData;
 
     ConfigurationControlManager(LogContext logContext,
                                 SnapshotRegistry snapshotRegistry,
                                 Map<ConfigResource.Type, ConfigDef> configDefs,
-                                Optional<AlterConfigPolicy> alterConfigPolicy) {
+                                Optional<AlterConfigPolicy> alterConfigPolicy,
+                                ConfigurationValidator validator) {
         this.log = logContext.logger(ConfigurationControlManager.class);
         this.snapshotRegistry = snapshotRegistry;
         this.configDefs = configDefs;
         this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
         this.alterConfigPolicy = alterConfigPolicy;
+        this.validator = validator;
     }
 
     /**
@@ -122,19 +126,13 @@ public class ConfigurationControlManager {
                     newValue = opValue;
                     break;
                 case DELETE:
-                    if (opValue != null) {
-                        outputResults.put(configResource, new ApiError(
-                            Errors.INVALID_REQUEST, "A DELETE op was given with a " +
-                            "non-null value."));
-                        return;
-                    }
                     newValue = null;
                     break;
                 case APPEND:
                 case SUBTRACT:
                     if (!isSplittable(configResource.type(), key)) {
                         outputResults.put(configResource, new ApiError(
-                            Errors.INVALID_CONFIG, "Can't " + opType + " to " +
+                            INVALID_CONFIG, "Can't " + opType + " to " +
                             "key " + key + " because its type is not LIST."));
                         return;
                     }
@@ -157,7 +155,7 @@ public class ConfigurationControlManager {
                     setValue(newValue), CONFIG_RECORD.highestSupportedVersion()));
             }
         }
-        error = checkAlterConfigPolicy(configResource, newRecords);
+        error = validateAlterConfig(configResource, newRecords);
         if (error.isFailure()) {
             outputResults.put(configResource, error);
             return;
@@ -166,9 +164,8 @@ public class ConfigurationControlManager {
         outputResults.put(configResource, ApiError.NONE);
     }
 
-    private ApiError checkAlterConfigPolicy(ConfigResource configResource,
-                                            List<ApiMessageAndVersion> newRecords) {
-        if (!alterConfigPolicy.isPresent()) return ApiError.NONE;
+    private ApiError validateAlterConfig(ConfigResource configResource,
+                                         List<ApiMessageAndVersion> newRecords) {
         Map<String, String> newConfigs = new HashMap<>();
         TimelineHashMap<String, String> existingConfigs = configData.get(configResource);
         if (existingConfigs != null) newConfigs.putAll(existingConfigs);
@@ -181,9 +178,14 @@ public class ConfigurationControlManager {
             }
         }
         try {
-            alterConfigPolicy.get().validate(new RequestMetadata(configResource, newConfigs));
-        } catch (PolicyViolationException e) {
-            return new ApiError(Errors.POLICY_VIOLATION, e.getMessage());
+            validator.validate(configResource, newConfigs);
+            if (alterConfigPolicy.isPresent()) {
+                alterConfigPolicy.get().validate(new RequestMetadata(configResource, newConfigs));
+            }
+        } catch (ConfigException e) {
+            return new ApiError(INVALID_CONFIG, e.getMessage());
+        } catch (Throwable e) {
+            return ApiError.fromThrowable(e);
         }
         return ApiError.NONE;
     }
@@ -246,7 +248,7 @@ public class ConfigurationControlManager {
                     setValue(null), CONFIG_RECORD.highestSupportedVersion()));
             }
         }
-        error = checkAlterConfigPolicy(configResource, newRecords);
+        error = validateAlterConfig(configResource, newRecords);
         if (error.isFailure()) {
             outputResults.put(configResource, error);
             return;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
new file mode 100644
index 0000000..b14580a
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.config.ConfigResource;
+
+import java.util.Map;
+
+
+public interface ConfigurationValidator {
+    ConfigurationValidator NO_OP = (__, ___) -> { };
+
+    /**
+     * Throws an ApiException if a configuration is invalid for the given resource.
+     *
+     * @param resource      The configuration resource.
+     * @param config        The new configuration.
+     */
+    void validate(ConfigResource resource, Map<String, String> config);
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 1a1c5d0..16b3ab3 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -140,6 +140,7 @@ public final class QuorumController implements Controller {
         private ControllerMetrics controllerMetrics = null;
         private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
         private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
+        private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
 
         public Builder(int nodeId) {
             this.nodeId = nodeId;
@@ -215,6 +216,11 @@ public final class QuorumController implements Controller {
             return this;
         }
 
+        public Builder setConfigurationValidator(ConfigurationValidator configurationValidator) {
+            this.configurationValidator = configurationValidator;
+            return this;
+        }
+
         @SuppressWarnings("unchecked")
         public QuorumController build() throws Exception {
             if (raftClient == null) {
@@ -237,7 +243,7 @@ public final class QuorumController implements Controller {
                     raftClient, supportedFeatures, defaultReplicationFactor,
                     defaultNumPartitions, replicaPlacer, snapshotMaxNewRecordBytes,
                     sessionTimeoutNs, controllerMetrics, createTopicPolicy,
-                    alterConfigPolicy);
+                    alterConfigPolicy, configurationValidator);
             } catch (Exception e) {
                 Utils.closeQuietly(queue, "event queue");
                 throw e;
@@ -1114,7 +1120,8 @@ public final class QuorumController implements Controller {
                              long sessionTimeoutNs,
                              ControllerMetrics controllerMetrics,
                              Optional<CreateTopicPolicy> createTopicPolicy,
-                             Optional<AlterConfigPolicy> alterConfigPolicy) {
+                             Optional<AlterConfigPolicy> alterConfigPolicy,
+                             ConfigurationValidator configurationValidator) {
         this.logContext = logContext;
         this.log = logContext.logger(QuorumController.class);
         this.nodeId = nodeId;
@@ -1124,7 +1131,7 @@ public final class QuorumController implements Controller {
         this.snapshotRegistry = new SnapshotRegistry(logContext);
         this.purgatory = new ControllerPurgatory();
         this.configurationControl = new ConfigurationControlManager(logContext,
-            snapshotRegistry, configDefs, alterConfigPolicy);
+            snapshotRegistry, configDefs, alterConfigPolicy, configurationValidator);
         this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
         this.clusterControl = new ClusterControlManager(logContext, time,
             snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 349a6b6..5462dea 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -369,7 +369,8 @@ public class ReplicationControlManager {
 
         // Identify topics that already exist and mark them with the appropriate error
         request.topics().stream().filter(creatableTopic -> topicsByName.containsKey(creatableTopic.name()))
-                .forEach(t -> topicErrors.put(t.name(), new ApiError(Errors.TOPIC_ALREADY_EXISTS)));
+                .forEach(t -> topicErrors.put(t.name(), new ApiError(Errors.TOPIC_ALREADY_EXISTS,
+                    "Topic '" + t.name() + "' already exists.")));
 
         // Verify that the configurations for the new topics are OK, and figure out what
         // ConfigRecords should be created.
@@ -388,7 +389,12 @@ public class ReplicationControlManager {
         Map<String, CreatableTopicResult> successes = new HashMap<>();
         for (CreatableTopic topic : request.topics()) {
             if (topicErrors.containsKey(topic.name())) continue;
-            ApiError error = createTopic(topic, records, successes);
+            ApiError error;
+            try {
+                error = createTopic(topic, records, successes);
+            } catch (ApiException e) {
+                error = ApiError.fromThrowable(e);
+            }
             if (error.isFailure()) {
                 topicErrors.put(topic.name(), error);
             }
@@ -472,11 +478,7 @@ public class ReplicationControlManager {
             if (error.isFailure()) return error;
         } else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) {
             return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
-                "Replication factor was set to an invalid non-positive value.");
-        } else if (!topic.assignments().isEmpty()) {
-            return new ApiError(INVALID_REQUEST,
-                "Replication factor was not set to -1 but a manual partition " +
-                    "assignment was specified.");
+                "Replication factor must be larger than 0, or -1 to use the default value.");
         } else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
             return new ApiError(Errors.INVALID_PARTITIONS,
                 "Number of partitions was set to an invalid non-positive value.");
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index b2be6eb..f84b12e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -47,6 +47,7 @@ import static java.util.Arrays.asList;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.DELETE;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT;
 import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
 import static org.apache.kafka.common.config.ConfigResource.Type.BROKER_LOGGER;
 import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
@@ -93,7 +94,7 @@ public class ConfigurationControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ConfigurationControlManager manager =
             new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS,
-                Optional.empty());
+                Optional.empty(), ConfigurationValidator.NO_OP);
         assertEquals(Collections.emptyMap(), manager.getConfigs(BROKER0));
         manager.replay(new ConfigRecord().
             setResourceType(BROKER.id()).setResourceName("0").
@@ -152,17 +153,29 @@ public class ConfigurationControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ConfigurationControlManager manager =
             new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS,
-                Optional.empty());
+                Optional.empty(), ConfigurationValidator.NO_OP);
+
+        ControllerResult<Map<ConfigResource, ApiError>> result = manager.
+            incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
+                entry("baz", entry(SUBTRACT, "abc")),
+                entry("quux", entry(SET, "abc")))),
+                entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123"))))));
+
         assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
                 new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
                     setName("abc").setValue("123"), (short) 0)),
-                toMap(entry(BROKER0, new ApiError(Errors.INVALID_REQUEST,
-                            "A DELETE op was given with a non-null value.")),
-                    entry(MYTOPIC, ApiError.NONE))),
-            manager.incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
-                    entry("foo.bar", entry(DELETE, "abc")),
-                    entry("quux", entry(SET, "abc")))),
-                entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123")))))));
+                toMap(entry(BROKER0, new ApiError(Errors.INVALID_CONFIG,
+                            "Can't SUBTRACT to key baz because its type is not LIST.")),
+                    entry(MYTOPIC, ApiError.NONE))), result);
+
+        RecordTestUtils.replayAll(manager, result.records());
+
+        assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
+                new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
+                    setName("abc").setValue(null), (short) 0)),
+                toMap(entry(MYTOPIC, ApiError.NONE))),
+            manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(
+                entry("abc", entry(DELETE, "xyz")))))));
     }
 
     private static class MockAlterConfigsPolicy implements AlterConfigPolicy {
@@ -206,7 +219,8 @@ public class ConfigurationControlManagerTest {
             new RequestMetadata(BROKER0, toMap(entry("foo.bar", "123"),
                 entry("quux", "456")))));
         ConfigurationControlManager manager = new ConfigurationControlManager(
-            new LogContext(), snapshotRegistry, CONFIGS, Optional.of(policy));
+            new LogContext(), snapshotRegistry, CONFIGS, Optional.of(policy),
+            ConfigurationValidator.NO_OP);
 
         assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion(
                 new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
@@ -231,7 +245,7 @@ public class ConfigurationControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ConfigurationControlManager manager =
             new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS,
-                Optional.empty());
+                Optional.empty(), ConfigurationValidator.NO_OP);
         assertTrue(manager.isSplittable(BROKER, "foo.bar"));
         assertFalse(manager.isSplittable(BROKER, "baz"));
         assertFalse(manager.isSplittable(BROKER, "foo.baz.quux"));
@@ -244,7 +258,7 @@ public class ConfigurationControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ConfigurationControlManager manager =
             new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS,
-                Optional.empty());
+                Optional.empty(), ConfigurationValidator.NO_OP);
         assertEquals("1", manager.getConfigValueDefault(BROKER, "foo.bar"));
         assertEquals(null, manager.getConfigValueDefault(BROKER, "foo.baz.quux"));
         assertEquals(null, manager.getConfigValueDefault(TOPIC, "abc"));
@@ -256,7 +270,7 @@ public class ConfigurationControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ConfigurationControlManager manager =
             new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS,
-                Optional.empty());
+                Optional.empty(), ConfigurationValidator.NO_OP);
         List<ApiMessageAndVersion> expectedRecords1 = asList(
             new ApiMessageAndVersion(new ConfigRecord().
                 setResourceType(TOPIC.id()).setResourceName("mytopic").
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 6543073..94fbe7c 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -137,7 +137,8 @@ public class ReplicationControlManagerTest {
             logContext, time, snapshotRegistry, TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
             new StripedReplicaPlacer(random), metrics);
         final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
-            new LogContext(), snapshotRegistry, Collections.emptyMap(), Optional.empty());
+            new LogContext(), snapshotRegistry, Collections.emptyMap(), Optional.empty(),
+                (__, ___) -> { });
         final ReplicationControlManager replicationControl;
 
         void replay(List<ApiMessageAndVersion> records) throws Exception {
@@ -415,7 +416,7 @@ public class ReplicationControlManagerTest {
         CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData();
         expectedResponse3.topics().add(new CreatableTopicResult().setName("foo").
                 setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).
-                setErrorMessage(Errors.TOPIC_ALREADY_EXISTS.exception().getMessage()));
+                setErrorMessage("Topic 'foo' already exists."));
         assertEquals(expectedResponse3, result3.response());
         Uuid fooId = result2.response().topics().find("foo").topicId();
         RecordTestUtils.assertBatchIteratorContains(asList(