You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/01/22 00:02:35 UTC

[kafka] branch trunk updated: KAFKA-13552: Fix BROKER and BROKER_LOGGER in KRaft (#11657)

This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 68a1953  KAFKA-13552: Fix BROKER and BROKER_LOGGER in KRaft (#11657)
68a1953 is described below

commit 68a19539cf1fcd86787960d0010b672d0d611b91
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri Jan 21 16:00:21 2022 -0800

    KAFKA-13552: Fix BROKER and BROKER_LOGGER in KRaft (#11657)
    
    Currently, KRaft does not support setting BROKER_LOGGER configs (it always fails.) Additionally,
    there are several bugs in the handling of BROKER configs. They are not properly validated on the
    forwarding broker, and the way we apply them is buggy as well. This PR fixes those issues.
    
    KafkaApis: add support for doing validation and log4j processing on the forwarding broker. This
    involves breaking the config request apart and forwarding only part of it. Adjust KafkaApisTest to
    test the new behavior, rather than expecting forwarding of the full request.
    
    MetadataSupport: remove MetadataSupport#controllerId since it duplicates the functionality of
    MetadataCache#controllerId. Add support for getResourceConfig and maybeForward.
    
    ControllerApis: log an error message if the handler throws an exception, just like we do in
    KafkaApis.
    
    ControllerConfigurationValidator: add JavaDoc.
    
    Move some functions that don't involve ZK from ZkAdminManager to DynamicConfigManager. Move some
    validation out of ZkAdminManager and into a new class, ConfigAdminManager, which is not tied to ZK.
    
    ForwardingManager: add support for sending new requests, rather than just forwarding existing
    requests.
    
    BrokerMetadataPublisher: do not try to apply dynamic configurations for brokers other than the
    current one. Log an INFO message when applying a new dynamic config, like we do in ZK mode. Also,
    invoke reloadUpdatedFilesWithoutConfigChange when applying a new non-default BROKER config.
    
    QuorumController: fix a bug in ConfigResourceExistenceChecker which prevented cluster configs from
    being set. Add a test for this class.
    
    Reviews: José Armando García Sancio <js...@users.noreply.github.com>
---
 .../requests/IncrementalAlterConfigsRequest.java   |   2 +-
 .../scala/kafka/server/ConfigAdminManager.scala    | 518 +++++++++++++++++++++
 .../main/scala/kafka/server/ConfigHandler.scala    |   2 +-
 .../main/scala/kafka/server/ControllerApis.scala   |   8 +-
 .../server/ControllerConfigurationValidator.scala  |  14 +
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   2 -
 .../scala/kafka/server/ForwardingManager.scala     |  76 ++-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 128 +++--
 core/src/main/scala/kafka/server/KafkaServer.scala |   7 +-
 .../main/scala/kafka/server/MetadataSupport.scala  |  45 +-
 .../main/scala/kafka/server/ZkAdminManager.scala   | 121 +----
 ...icConfigManager.scala => ZkConfigManager.scala} |  13 +-
 .../server/metadata/BrokerMetadataListener.scala   |   3 +
 .../server/metadata/BrokerMetadataPublisher.scala  |  49 +-
 .../kafka/server/metadata/ZkConfigRepository.scala |  11 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |  16 +-
 .../kafka/server/KRaftClusterTest.scala            | 260 ++++++++++-
 .../unit/kafka/server/ConfigAdminManagerTest.scala | 466 ++++++++++++++++++
 .../kafka/server/DynamicConfigChangeTest.scala     |   2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  93 +++-
 .../apache/kafka/controller/QuorumController.java  |   8 +-
 .../kafka/controller/QuorumControllerTest.java     |  39 ++
 22 files changed, 1617 insertions(+), 266 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
index 2bc5914..9433e31 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
@@ -80,7 +80,7 @@ public class IncrementalAlterConfigsRequest extends AbstractRequest {
     private final IncrementalAlterConfigsRequestData data;
     private final short version;
 
-    private IncrementalAlterConfigsRequest(IncrementalAlterConfigsRequestData data, short version) {
+    public IncrementalAlterConfigsRequest(IncrementalAlterConfigsRequestData data, short version) {
         super(ApiKeys.INCREMENTAL_ALTER_CONFIGS, version);
         this.data = data;
         this.version = version;
diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
new file mode 100644
index 0000000..a7f5c6b
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -0,0 +1,518 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import java.util
+import java.util.Properties
+
+import kafka.server.metadata.ConfigRepository
+import kafka.utils.Log4jController
+import kafka.utils._
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.ConfigDef.ConfigKey
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, TOPIC}
+import org.apache.kafka.common.config.{ConfigDef, ConfigResource, LogLevelConfig}
+import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidConfigurationException, InvalidRequestException}
+import org.apache.kafka.common.message.{AlterConfigsRequestData, AlterConfigsResponseData, IncrementalAlterConfigsRequestData, IncrementalAlterConfigsResponseData}
+import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource}
+import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse}
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterableConfig => IAlterableConfig}
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, UNKNOWN_SERVER_ERROR}
+import org.apache.kafka.common.requests.ApiError
+import org.apache.kafka.common.resource.{Resource, ResourceType}
+import org.slf4j.LoggerFactory
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+/**
+ * Manages dynamic configuration operations on the broker.
+ *
+ * There are two RPCs that alter KIP-226 dynamic configurations: alterConfigs, and
+ * incrementalAlterConfigs. The main difference between the two is that alterConfigs sets
+ * all configurations related to a specific config resource, whereas
+ * incrementalAlterConfigs makes only designated changes.
+ *
+ * The original, non-incremental AlterConfigs is deprecated because there are inherent
+ * race conditions when multiple clients use it. It deletes any resource configuration
+ * keys that are not specified. This leads to clients trying to do read-modify-write
+ * cycles when they only want to change one config key. (But even read-modify-write doesn't
+ * work correctly, since "sensitive" configurations are omitted when read.)
+ *
+ * KIP-412 added support for changing log4j log levels via IncrementalAlterConfigs, but
+ * not via the original AlterConfigs. In retrospect, this would have been better off as a
+ * separate RPC, since the semantics are quite different. In particular, KIP-226 configs
+ * are stored durably (in ZK or KRaft) and persist across broker restarts, but KIP-412
+ * log4j levels do not. However, we have to handle it here now in order to maintain
+ * compatibility.
+ *
+ * Configuration processing is split into two parts.
+ * - The first step, called "preprocessing," handles setting KIP-412 log levels, validating
+ * BROKER configurations. We also filter out some other things here like UNKNOWN resource
+ * types, etc.
+ * - The second step is "persistence," and handles storing the configurations durably to our
+ * metadata store.
+ *
+ * When KIP-590 forwarding is active (such as in KRaft mode), preprocessing will happen
+ * on the broker, while persistence will happen on the active controller. (If KIP-590
+ * forwarding is not active, then both steps are done on the same broker.)
+ *
+ * In KRaft mode, the active controller performs its own configuration validation step in
+ * {@link kafka.server.ControllerConfigurationValidator}. This is mainly important for
+ * TOPIC resources, since we already validated changes to BROKER resources on the
+ * forwarding broker. The KRaft controller is also responsible for enforcing the configured
+ * {@link org.apache.kafka.server.policy.AlterConfigPolicy}.
+ */
+class ConfigAdminManager(nodeId: Int,
+                         conf: KafkaConfig,
+                         configRepository: ConfigRepository) extends Logging {
+  import ConfigAdminManager._
+
+  this.logIdent = "[ConfigAdminManager[nodeId=" + nodeId + "]: "
+
+  /**
+   * Preprocess an incremental configuration operation on the broker. This step handles
+   * setting log4j levels, as well as filtering out some invalid resource requests that
+   * should not be forwarded to the controller.
+   *
+   * @param request     The request data.
+   * @param authorize   A callback which is invoked when we need to authorize an operation.
+   *                    Currently, we only use this for log4j operations. Other types of
+   *                    operations are authorized in the persistence step. The arguments
+   *                    are the type and name of the resource to be authorized.
+   *
+   * @return            A map from resources to errors. If a resource appears in this map,
+   *                    it has been preprocessed and does not need further processing.
+   */
+  def preprocess(
+    request: IncrementalAlterConfigsRequestData,
+    authorize: (ResourceType, String) => Boolean
+  ): util.IdentityHashMap[IAlterConfigsResource, ApiError] = {
+    val results = new util.IdentityHashMap[IAlterConfigsResource, ApiError]()
+    val resourceIds = new util.HashMap[(Byte, String), IAlterConfigsResource]
+    request.resources().forEach(resource => {
+      val preexisting = resourceIds.put((resource.resourceType(), resource.resourceName()), resource)
+      if (preexisting != null) {
+        Seq(preexisting, resource).foreach(
+          r => results.put(r, new ApiError(INVALID_REQUEST, "Each resource must appear at most once.")))
+      }
+    })
+    request.resources().forEach(resource => {
+      if (!results.containsKey(resource)) {
+        val resourceType = ConfigResource.Type.forId(resource.resourceType())
+        val configResource = new ConfigResource(resourceType, resource.resourceName())
+        try {
+          if (containsDuplicates(resource.configs().asScala.map(_.name()))) {
+            throw new InvalidRequestException("Error due to duplicate config keys")
+          }
+          val nullUpdates = new util.ArrayList[String]()
+          resource.configs().forEach { config =>
+            if (config.configOperation() != AlterConfigOp.OpType.DELETE.id() &&
+              config.value() == null) {
+              nullUpdates.add(config.name())
+            }
+          }
+          if (!nullUpdates.isEmpty()) {
+            throw new InvalidRequestException("Null value not supported for : " +
+              String.join(", ", nullUpdates))
+          }
+          resourceType match {
+            case BROKER_LOGGER =>
+              if (!authorize(ResourceType.CLUSTER, Resource.CLUSTER_NAME)) {
+                throw new ClusterAuthorizationException(Errors.CLUSTER_AUTHORIZATION_FAILED.message())
+              }
+              validateResourceNameIsCurrentNodeId(resource.resourceName())
+              validateLogLevelConfigs(resource.configs())
+              if (!request.validateOnly()) {
+                alterLogLevelConfigs(resource.configs())
+              }
+              results.put(resource, ApiError.NONE)
+            case BROKER =>
+              // The resource name must be either blank (if setting a cluster config) or
+              // the ID of this specific broker.
+              if (!configResource.name().isEmpty) {
+                validateResourceNameIsCurrentNodeId(resource.resourceName())
+              }
+              validateBrokerConfigChange(resource, configResource)
+            case TOPIC =>
+            // Nothing to do.
+            case _ =>
+              throw new InvalidRequestException(s"Unknown resource type ${resource.resourceType().toInt}")
+          }
+        } catch {
+          case t: Throwable => {
+            val err = ApiError.fromThrowable(t)
+            info(s"Error preprocessing incrementalAlterConfigs request on ${configResource}", t)
+            results.put(resource, err)
+          }
+        }
+      }
+    })
+    results
+  }
+
+  def validateBrokerConfigChange(
+    resource: IAlterConfigsResource,
+    configResource: ConfigResource
+  ): Unit = {
+    val perBrokerConfig = !configResource.name().isEmpty
+    val persistentProps = configRepository.config(configResource)
+    val configProps = conf.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)
+    val alterConfigOps = resource.configs().asScala.map {
+      case config =>
+        val opType = AlterConfigOp.OpType.forId(config.configOperation())
+        if (opType == null) {
+          throw new InvalidRequestException(s"Unknown operations type ${config.configOperation}")
+        }
+        new AlterConfigOp(new ConfigEntry(config.name(), config.value()), opType)
+    }.toSeq
+    prepareIncrementalConfigs(alterConfigOps, configProps, KafkaConfig.configKeys)
+    try {
+      validateBrokerConfigChange(configProps, configResource)
+    } catch {
+      case t: Throwable => error(s"validation of configProps ${configProps} for ${configResource} failed with exception", t)
+        throw t
+    }
+  }
+
+  def validateBrokerConfigChange(
+    props: Properties,
+    configResource: ConfigResource
+  ): Unit = {
+    try {
+      conf.dynamicConfig.validate(props, !configResource.name().isEmpty)
+    } catch {
+      case e: ApiException => throw e
+      //KAFKA-13609: InvalidRequestException is not really the right exception here if the
+      // configuration fails validation. The configuration is still well-formed, but just
+      // can't be applied. It should probably throw InvalidConfigurationException. However,
+      // we should probably only change this in a KIP since it has compatibility implications.
+      case e: Throwable => throw new InvalidRequestException(e.getMessage)
+    }
+ }
+
+  /**
+   * Preprocess a legacy configuration operation on the broker.
+   *
+   * @param request     The request data.
+   *
+   * @return
+   */
+  def preprocess(
+    request: AlterConfigsRequestData,
+  ): util.IdentityHashMap[LAlterConfigsResource, ApiError] = {
+    val results = new util.IdentityHashMap[LAlterConfigsResource, ApiError]()
+    val resourceIds = new util.HashMap[(Byte, String), LAlterConfigsResource]
+    request.resources().forEach(resource => {
+      val preexisting = resourceIds.put((resource.resourceType(), resource.resourceName()), resource)
+      if (preexisting != null) {
+        Seq(preexisting, resource).foreach(
+          r => results.put(r, new ApiError(INVALID_REQUEST, "Each resource must appear at most once.")))
+      }
+    })
+    request.resources().forEach(resource => {
+      if (!results.containsKey(resource)) {
+        val resourceType = ConfigResource.Type.forId(resource.resourceType())
+        val configResource = new ConfigResource(resourceType, resource.resourceName())
+        try {
+          if (containsDuplicates(resource.configs().asScala.map(_.name()))) {
+            throw new InvalidRequestException("Error due to duplicate config keys")
+          }
+          val nullUpdates = new util.ArrayList[String]()
+          resource.configs().forEach { config =>
+            if (config.value() == null) {
+              nullUpdates.add(config.name())
+            }
+          }
+          if (!nullUpdates.isEmpty()) {
+            throw new InvalidRequestException("Null value not supported for : " +
+              String.join(", ", nullUpdates))
+          }
+          resourceType match {
+            case BROKER =>
+              if (!configResource.name().isEmpty) {
+                validateResourceNameIsCurrentNodeId(resource.resourceName())
+              }
+              validateBrokerConfigChange(resource, configResource)
+            case TOPIC =>
+            // Nothing to do.
+            case _ =>
+              // Since legacy AlterConfigs does not support BROKER_LOGGER, any attempt to use it
+              // gets caught by this clause.
+              throw new InvalidRequestException(s"Unknown resource type ${resource.resourceType().toInt}")
+          }
+        } catch {
+          case t: Throwable => {
+            val err = ApiError.fromThrowable(t)
+            info(s"Error preprocessing alterConfigs request on ${configResource}: ${err}")
+            results.put(resource, err)
+          }
+        }
+      }
+    })
+    results
+  }
+
+  def validateBrokerConfigChange(
+    resource: LAlterConfigsResource,
+    configResource: ConfigResource
+  ): Unit = {
+    val props = new Properties()
+    resource.configs().forEach {
+      config => props.setProperty(config.name(), config.value())
+    }
+    validateBrokerConfigChange(props, configResource)
+  }
+
+  def validateResourceNameIsCurrentNodeId(name: String): Unit = {
+    val id = try name.toInt catch {
+      case _: NumberFormatException =>
+        throw new InvalidRequestException(s"Node id must be an integer, but it is: $name")
+    }
+    if (id != nodeId) {
+      throw new InvalidRequestException(s"Unexpected broker id, expected ${nodeId}, but received ${name}")
+    }
+  }
+
+  def validateLogLevelConfigs(ops: util.Collection[IAlterableConfig]): Unit = {
+    def validateLoggerNameExists(loggerName: String): Unit = {
+      if (!Log4jController.loggerExists(loggerName)) {
+        throw new InvalidConfigurationException(s"Logger $loggerName does not exist!")
+      }
+    }
+    ops.forEach { op =>
+      val loggerName = op.name
+      OpType.forId(op.configOperation()) match {
+        case OpType.SET =>
+          validateLoggerNameExists(loggerName)
+          val logLevel = op.value()
+          if (!LogLevelConfig.VALID_LOG_LEVELS.contains(logLevel)) {
+            val validLevelsStr = LogLevelConfig.VALID_LOG_LEVELS.asScala.mkString(", ")
+            throw new InvalidConfigurationException(
+              s"Cannot set the log level of $loggerName to $logLevel as it is not a supported log level. " +
+                s"Valid log levels are $validLevelsStr"
+            )
+          }
+        case OpType.DELETE =>
+          validateLoggerNameExists(loggerName)
+          if (loggerName == Log4jController.ROOT_LOGGER)
+            throw new InvalidRequestException(s"Removing the log level of the ${Log4jController.ROOT_LOGGER} logger is not allowed")
+        case OpType.APPEND => throw new InvalidRequestException(s"${OpType.APPEND} " +
+          s"operation is not allowed for the ${BROKER_LOGGER} resource")
+        case OpType.SUBTRACT => throw new InvalidRequestException(s"${OpType.SUBTRACT} " +
+          s"operation is not allowed for the ${BROKER_LOGGER} resource")
+        case _ => throw new InvalidRequestException(s"Unknown operation type ${op.configOperation()} " +
+          s"is not allowed for the ${BROKER_LOGGER} resource")
+      }
+    }
+  }
+
+  def alterLogLevelConfigs(ops: util.Collection[IAlterableConfig]): Unit = {
+    ops.forEach { op =>
+      val loggerName = op.name()
+      val logLevel = op.value()
+      OpType.forId(op.configOperation()) match {
+        case OpType.SET =>
+          info(s"Updating the log level of $loggerName to $logLevel")
+          Log4jController.logLevel(loggerName, logLevel)
+        case OpType.DELETE =>
+          info(s"Unset the log level of $loggerName")
+          Log4jController.unsetLogLevel(loggerName)
+        case _ => throw new IllegalArgumentException(
+          s"Invalid log4j configOperation: ${op.configOperation()}")
+      }
+    }
+  }
+}
+
+object ConfigAdminManager {
+  val log = LoggerFactory.getLogger(classOf[ConfigAdminManager])
+
+  /**
+   * Copy the incremental configs request data without any already-processed elements.
+   *
+   * @param request   The input request. Will not be modified.
+   * @param processed A map containing the resources that have already been processed.
+   * @return          A new request object.
+   */
+  def copyWithoutPreprocessed(
+    request: IncrementalAlterConfigsRequestData,
+    processed: util.IdentityHashMap[IAlterConfigsResource, ApiError]
+  ): IncrementalAlterConfigsRequestData = {
+    val copy = new IncrementalAlterConfigsRequestData().
+      setValidateOnly(request.validateOnly())
+    request.resources().forEach(resource => {
+      if (!processed.containsKey(resource)) {
+        copy.resources().mustAdd(resource.duplicate())
+      }
+    })
+    copy
+  }
+
+  /**
+   * Copy the legacy alter configs request data without any already-processed elements.
+   *
+   * @param request   The input request. Will not be modified.
+   * @param processed A map containing the resources that have already been processed.
+   * @return          A new request object.
+   */
+  def copyWithoutPreprocessed(
+    request: AlterConfigsRequestData,
+    processed: util.IdentityHashMap[LAlterConfigsResource, ApiError]
+  ): AlterConfigsRequestData = {
+    val copy = new AlterConfigsRequestData().
+      setValidateOnly(request.validateOnly())
+    request.resources().forEach(resource => {
+      if (!processed.containsKey(resource)) {
+        copy.resources().mustAdd(resource.duplicate())
+      }
+    })
+    copy
+  }
+
+  def reassembleIncrementalResponse(
+    original: IncrementalAlterConfigsRequestData,
+    preprocessingResponses: util.IdentityHashMap[IAlterConfigsResource, ApiError],
+    persistentResponses: IncrementalAlterConfigsResponseData
+  ): IncrementalAlterConfigsResponseData = {
+    val response = new IncrementalAlterConfigsResponseData()
+    val responsesByResource = persistentResponses.responses().iterator().asScala.map {
+      case r => (r.resourceName(), r.resourceType()) -> new ApiError(r.errorCode(), r.errorMessage())
+    }.toMap
+    original.resources().forEach(r => {
+      val err = Option(preprocessingResponses.get(r)) match {
+        case None =>
+          responsesByResource.get((r.resourceName(), r.resourceType())) match {
+            case None => log.error("The controller returned fewer results than we " +
+              s"expected. No response found for ${r}.")
+              new ApiError(UNKNOWN_SERVER_ERROR)
+            case Some(err) => err
+          }
+        case Some(err) => err
+      }
+      response.responses().add(new IAlterConfigsResourceResponse().
+        setResourceName(r.resourceName()).
+        setResourceType(r.resourceType()).
+        setErrorCode(err.error().code()).
+        setErrorMessage(err.message()))
+    })
+    response
+  }
+
+  def reassembleLegacyResponse(
+    original: AlterConfigsRequestData,
+    preprocessingResponses: util.IdentityHashMap[LAlterConfigsResource, ApiError],
+    persistentResponses: AlterConfigsResponseData
+  ): AlterConfigsResponseData = {
+    val response = new AlterConfigsResponseData()
+    val responsesByResource = persistentResponses.responses().iterator().asScala.map {
+      case r => (r.resourceName(), r.resourceType()) -> new ApiError(r.errorCode(), r.errorMessage())
+    }.toMap
+    original.resources().forEach(r => {
+      val err = Option(preprocessingResponses.get(r)) match {
+        case None =>
+          responsesByResource.get((r.resourceName(), r.resourceType())) match {
+            case None => log.error("The controller returned fewer results than we " +
+              s"expected. No response found for ${r}.")
+              new ApiError(UNKNOWN_SERVER_ERROR)
+            case Some(err) => err
+          }
+        case Some(err) => err
+      }
+      response.responses().add(new LAlterConfigsResourceResponse().
+        setResourceName(r.resourceName()).
+        setResourceType(r.resourceType()).
+        setErrorCode(err.error().code()).
+        setErrorMessage(err.message()))
+    })
+    response
+  }
+
+  def containsDuplicates[T](
+    iterable: Iterable[T]
+  ): Boolean = {
+    val previous = new util.HashSet[T]()
+    !iterable.forall(previous.add(_))
+  }
+
+  /**
+   * Convert the configuration properties for an object (broker, topic, etc.) to a Scala
+   * map. Sensitive configurations will be redacted, so that the output is suitable for
+   * logging.
+   *
+   * @param resource      The configuration resource.
+   * @param configProps   The configuration as a Properties object.
+   * @return              A map containing all the configuration keys and values, as they
+   *                      should be logged.
+   */
+  def toLoggableProps(resource: ConfigResource, configProps: Properties): Map[String, String] = {
+    configProps.asScala.map {
+      case (key, value) => (key, KafkaConfig.loggableValue(resource.`type`, key, value))
+    }
+  }
+
+  /**
+   * Apply a series of incremental configuration operations to a set of resource properties.
+   *
+   * @param alterConfigOps  The incremental configuration operations to apply.
+   * @param configProps     The resource properties. This will be modified by this function.
+   * @param configKeys      Information about configuration key types.
+   */
+  def prepareIncrementalConfigs(
+    alterConfigOps: Seq[AlterConfigOp],
+    configProps: Properties,
+    configKeys: Map[String, ConfigKey]
+  ): Unit = {
+    def listType(configName: String, configKeys: Map[String, ConfigKey]): Boolean = {
+      val configKey = configKeys(configName)
+      if (configKey == null)
+        throw new InvalidConfigurationException(s"Unknown config name: $configName")
+      configKey.`type` == ConfigDef.Type.LIST
+    }
+
+    alterConfigOps.foreach { alterConfigOp =>
+      val configPropName = alterConfigOp.configEntry.name
+      alterConfigOp.opType() match {
+        case OpType.SET => configProps.setProperty(alterConfigOp.configEntry.name, alterConfigOp.configEntry.value)
+        case OpType.DELETE => configProps.remove(alterConfigOp.configEntry.name)
+        case OpType.APPEND => {
+          if (!listType(alterConfigOp.configEntry.name, configKeys))
+            throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry.name}")
+          val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name))
+            .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST)))
+            .getOrElse("")
+            .split(",").toList
+          val newValueList = oldValueList ::: alterConfigOp.configEntry.value.split(",").toList
+          configProps.setProperty(alterConfigOp.configEntry.name, newValueList.mkString(","))
+        }
+        case OpType.SUBTRACT => {
+          if (!listType(alterConfigOp.configEntry.name, configKeys))
+            throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry.name}")
+          val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name))
+            .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST)))
+            .getOrElse("")
+            .split(",").toList
+          val newValueList = oldValueList.diff(alterConfigOp.configEntry.value.split(",").toList)
+          configProps.setProperty(alterConfigOp.configEntry.name, newValueList.mkString(","))
+        }
+      }
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index ab8639b..2fe49ad 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -42,7 +42,7 @@ import scala.collection.Seq
 import scala.util.Try
 
 /**
-  * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
+  * The ConfigHandler is used to process broker configuration change notifications.
   */
 trait ConfigHandler {
   def processConfigChanges(entityName: String, value: Properties): Unit
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index ed9b55a..5681232 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -112,8 +112,12 @@ class ControllerApis(val requestChannel: RequestChannel,
       }
     } catch {
       case e: FatalExitError => throw e
-      case e: ExecutionException => requestHelper.handleError(request, e.getCause)
-      case e: Throwable => requestHelper.handleError(request, e)
+      case e: Throwable => {
+        val t = if (e.isInstanceOf[ExecutionException]) e.getCause() else e
+        error(s"Unexpected error handling request ${request.requestDesc(true)} " +
+          s"with context ${request.context}", t)
+        requestHelper.handleError(request, t)
+      }
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index 59a1f3c..5cc075e 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -29,6 +29,20 @@ import org.apache.kafka.common.internals.Topic
 
 import scala.collection.mutable
 
+/**
+ * The validator that the controller uses for dynamic configuration changes.
+ * It performs the generic validation, which can't be bypassed. If an AlterConfigPolicy
+ * is configured, the controller will check that after verifying that this passes.
+ *
+ * For changes to BROKER resources, the forwarding broker performs an extra validation step
+ * in {@link kafka.server.ConfigAdminManager#preprocess()} before sending the change to
+ * the controller. Therefore, the validation here is just a kind of sanity check, which
+ * should never fail under normal conditions.
+ *
+ * This validator does not handle changes to BROKER_LOGGER resources. Despite being bundled
+ * in the same RPC, BROKER_LOGGER is not really a dynamic configuration in the same sense
+ * as the others. It is not persisted to the metadata log (or to ZK, when we're in that mode).
+ */
 class ControllerConfigurationValidator extends ConfigurationValidator {
   override def validate(resource: ConfigResource, config: util.Map[String, String]): Unit = {
     resource.`type`() match {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 1f204e2..cb6cd84 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -252,8 +252,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
 
     addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
-    if (kafkaServer.logManager.cleaner != null)
-      addBrokerReconfigurable(kafkaServer.logManager.cleaner)
     addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
     addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
     addBrokerReconfigurable(kafkaServer.socketServer)
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala b/core/src/main/scala/kafka/server/ForwardingManager.scala
index e84592b..ce7f35f 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -30,7 +30,51 @@ import scala.compat.java8.OptionConverters._
 
 trait ForwardingManager {
   def forwardRequest(
-    request: RequestChannel.Request,
+    originalRequest: RequestChannel.Request,
+    responseCallback: Option[AbstractResponse] => Unit
+  ): Unit = {
+    val buffer = originalRequest.buffer.duplicate()
+    buffer.flip()
+    forwardRequest(originalRequest.context,
+      buffer,
+      originalRequest.body[AbstractRequest],
+      () => originalRequest.toString,
+      responseCallback)
+  }
+
+  def forwardRequest(
+    originalRequest: RequestChannel.Request,
+    newRequestBody: AbstractRequest,
+    responseCallback: Option[AbstractResponse] => Unit
+  ): Unit = {
+    val buffer = newRequestBody.serializeWithHeader(originalRequest.header)
+    forwardRequest(originalRequest.context,
+      buffer,
+      newRequestBody,
+      () => originalRequest.toString,
+      responseCallback)
+  }
+
+  /**
+   * Forward given request to the active controller.
+   *
+   * @param requestContext      The request context of the original envelope request.
+   * @param requestBufferCopy   The request buffer we want to send. This should not be the original
+   *                            byte buffer from the envelope request, since we will be mutating
+   *                            the position and limit fields. It should be a copy.
+   * @param requestBody         The AbstractRequest we are sending.
+   * @param requestToString     A callback which can be invoked to produce a human-readable decription
+   *                            of the request.
+   * @param responseCallback    A callback which takes in an `Option[AbstractResponse]`.
+   *                            We will call this function with Some(x) after the controller responds with x.
+   *                            Or, if the controller doesn't support the request version, we will complete
+   *                            the callback with None.
+   */
+  def forwardRequest(
+    requestContext: RequestContext,
+    requestBufferCopy: ByteBuffer,
+    requestBody: AbstractRequest,
+    requestToString: () => String,
     responseCallback: Option[AbstractResponse] => Unit
   ): Unit
 
@@ -63,32 +107,24 @@ class ForwardingManagerImpl(
   channelManager: BrokerToControllerChannelManager
 ) extends ForwardingManager with Logging {
 
-  /**
-   * Forward given request to the active controller.
-   *
-   * @param request request to be forwarded
-   * @param responseCallback callback which takes in an `Option[AbstractResponse]`, where
-   *                         None is indicating that controller doesn't support the request
-   *                         version.
-   */
   override def forwardRequest(
-    request: RequestChannel.Request,
+    requestContext: RequestContext,
+    requestBufferCopy: ByteBuffer,
+    requestBody: AbstractRequest,
+    requestToString: () => String,
     responseCallback: Option[AbstractResponse] => Unit
   ): Unit = {
-    val requestBuffer = request.buffer.duplicate()
-    requestBuffer.flip()
-    val envelopeRequest = ForwardingManager.buildEnvelopeRequest(request.context, requestBuffer)
+    val envelopeRequest = ForwardingManager.buildEnvelopeRequest(requestContext, requestBufferCopy)
 
     class ForwardingResponseHandler extends ControllerRequestCompletionHandler {
       override def onComplete(clientResponse: ClientResponse): Unit = {
-        val requestBody = request.body[AbstractRequest]
 
         if (clientResponse.versionMismatch != null) {
-          debug(s"Returning `UNKNOWN_SERVER_ERROR` in response to request $requestBody " +
+          debug(s"Returning `UNKNOWN_SERVER_ERROR` in response to ${requestToString()} " +
             s"due to unexpected version error", clientResponse.versionMismatch)
           responseCallback(Some(requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)))
         } else if (clientResponse.authenticationException != null) {
-          debug(s"Returning `UNKNOWN_SERVER_ERROR` in response to request $requestBody " +
+          debug(s"Returning `UNKNOWN_SERVER_ERROR` in response to ${requestToString()} " +
             s"due to authentication error", clientResponse.authenticationException)
           responseCallback(Some(requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)))
         } else {
@@ -108,10 +144,10 @@ class ForwardingManagerImpl(
               // the error directly to the client since it would not be expected. Instead we
               // return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem
               // on the broker.
-              debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError")
+              debug(s"Forwarded request ${requestToString()} failed with an error in the envelope response $envelopeError")
               requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)
             } else {
-              parseResponse(envelopeResponse.responseData, requestBody, request.header)
+              parseResponse(envelopeResponse.responseData, requestBody, requestContext.header)
             }
             responseCallback(Option(response))
           }
@@ -119,8 +155,8 @@ class ForwardingManagerImpl(
       }
 
       override def onTimeout(): Unit = {
-        debug(s"Forwarding of the request $request failed due to timeout exception")
-        val response = request.body[AbstractRequest].getErrorResponse(new TimeoutException())
+        debug(s"Forwarding of the request ${requestToString()} failed due to timeout exception")
+        val response = requestBody.getErrorResponse(new TimeoutException())
         responseCallback(Option(response))
       }
     }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5978321..e1c4338 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -56,7 +56,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEn
 import org.apache.kafka.common.message._
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
@@ -113,6 +113,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   val authHelper = new AuthHelper(authorizer)
   val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
   val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", config)
+  val configManager = new ConfigAdminManager(brokerId, config, configRepository)
 
   def close(): Unit = {
     aclApis.close()
@@ -130,17 +131,19 @@ class KafkaApis(val requestChannel: RequestChannel,
     def responseCallback(responseOpt: Option[AbstractResponse]): Unit = {
       responseOpt match {
         case Some(response) => requestHelper.sendForwardedResponse(request, response)
-        case None =>
-          info(s"The client connection will be closed due to controller responded " +
-            s"unsupported version exception during $request forwarding. " +
-            s"This could happen when the controller changed after the connection was established.")
-          requestChannel.closeConnection(request, Collections.emptyMap())
+        case None => handleInvalidVersionsDuringForwarding(request)
       }
     }
-
     metadataSupport.maybeForward(request, handler, responseCallback)
   }
 
+  private def handleInvalidVersionsDuringForwarding(request: RequestChannel.Request): Unit = {
+    info(s"The client connection will be closed due to controller responded " +
+      s"unsupported version exception during $request forwarding. " +
+      s"This could happen when the controller changed after the connection was established.")
+    requestChannel.closeConnection(request, Collections.emptyMap())
+  }
+
   private def forwardToControllerOrFail(
     request: RequestChannel.Request
   ): Unit = {
@@ -198,7 +201,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
         case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
         case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
-        case ApiKeys.ALTER_CONFIGS => maybeForwardToController(request, handleAlterConfigsRequest)
+        case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
         case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
         case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
         case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
@@ -210,7 +213,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
         case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal)
         case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders)
-        case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
+        case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
         case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
         case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
         case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal)
@@ -1301,7 +1304,7 @@ class KafkaApis(val requestChannel: RequestChannel,
          requestThrottleMs,
          brokers.toList.asJava,
          clusterId,
-         metadataSupport.controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
+         metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
          completeTopicMetadata.asJava,
          clusterAuthorizedOperations
       ))
@@ -2621,16 +2624,45 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
-    val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-    val alterConfigsRequest = request.body[AlterConfigsRequest]
+    val original = request.body[AlterConfigsRequest]
+    val preprocessingResponses = configManager.preprocess(original.data())
+    val remaining = ConfigAdminManager.copyWithoutPreprocessed(original.data(), preprocessingResponses)
+    def sendResponse(secondPart: Option[ApiMessage]): Unit = {
+      secondPart match {
+        case Some(result: AlterConfigsResponseData) =>
+          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+            new AlterConfigsResponse(ConfigAdminManager.reassembleLegacyResponse(
+              original.data(),
+              preprocessingResponses,
+              result).setThrottleTimeMs(requestThrottleMs)))
+        case _ => handleInvalidVersionsDuringForwarding(request)
+      }
+    }
+    if (remaining.resources().isEmpty) {
+      sendResponse(Some(new AlterConfigsResponseData()))
+    } else if ((!request.isForwarded) && metadataSupport.canForward()) {
+      metadataSupport.forwardingManager.get.forwardRequest(request,
+        new AlterConfigsRequest(remaining, request.header.apiVersion()),
+        response => sendResponse(response.map(_.data())))
+    } else {
+      sendResponse(Some(processLegacyAlterConfigsRequest(request, remaining)))
+    }
+  }
+
+  def processLegacyAlterConfigsRequest(
+    originalRequest: RequestChannel.Request,
+    data: AlterConfigsRequestData
+  ): AlterConfigsResponseData = {
+    val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(originalRequest))
+    val alterConfigsRequest = new AlterConfigsRequest(data, originalRequest.header.apiVersion())
     val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
       resource.`type` match {
         case ConfigResource.Type.BROKER_LOGGER =>
           throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
         case ConfigResource.Type.BROKER =>
-          authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
+          authHelper.authorize(originalRequest.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
-          authHelper.authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
+          authHelper.authorize(originalRequest.context, ALTER_CONFIGS, TOPIC, resource.name)
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
       }
     }
@@ -2638,19 +2670,15 @@ class KafkaApis(val requestChannel: RequestChannel,
     val unauthorizedResult = unauthorizedResources.keys.map { resource =>
       resource -> configsAuthorizationApiError(resource)
     }
-    def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = {
-      val data = new AlterConfigsResponseData()
-        .setThrottleTimeMs(requestThrottleMs)
-      (authorizedResult ++ unauthorizedResult).foreach{ case (resource, error) =>
-        data.responses().add(new AlterConfigsResourceResponse()
-          .setErrorCode(error.error.code)
-          .setErrorMessage(error.message)
-          .setResourceName(resource.name)
-          .setResourceType(resource.`type`.id))
-      }
-      new AlterConfigsResponse(data)
+    val response = new AlterConfigsResponseData()
+    (authorizedResult ++ unauthorizedResult).foreach { case (resource, error) =>
+      response.responses().add(new AlterConfigsResourceResponse()
+        .setErrorCode(error.error.code)
+        .setErrorMessage(error.message)
+        .setResourceName(resource.name)
+        .setResourceType(resource.`type`.id))
     }
-    requestHelper.sendResponseMaybeThrottle(request, responseCallback)
+    response
   }
 
   def handleAlterPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = {
@@ -2750,10 +2778,40 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleIncrementalAlterConfigsRequest(request: RequestChannel.Request): Unit = {
-    val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-    val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
+    val original = request.body[IncrementalAlterConfigsRequest]
+    val preprocessingResponses = configManager.preprocess(original.data(),
+      (rType, rName) => authHelper.authorize(request.context, ALTER_CONFIGS, rType, rName))
+    val remaining = ConfigAdminManager.copyWithoutPreprocessed(original.data(), preprocessingResponses)
+
+    def sendResponse(secondPart: Option[ApiMessage]): Unit = {
+      secondPart match {
+        case Some(result: IncrementalAlterConfigsResponseData) =>
+          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+            new IncrementalAlterConfigsResponse(ConfigAdminManager.reassembleIncrementalResponse(
+              original.data(),
+              preprocessingResponses,
+              result).setThrottleTimeMs(requestThrottleMs)))
+        case _ => handleInvalidVersionsDuringForwarding(request)
+      }
+    }
 
-    val configs = alterConfigsRequest.data.resources.iterator.asScala.map { alterConfigResource =>
+    if (remaining.resources().isEmpty) {
+      sendResponse(Some(new IncrementalAlterConfigsResponseData()))
+    } else if ((!request.isForwarded) && metadataSupport.canForward()) {
+      metadataSupport.forwardingManager.get.forwardRequest(request,
+        new IncrementalAlterConfigsRequest(remaining, request.header.apiVersion()),
+        response => sendResponse(response.map(_.data())))
+    } else {
+      sendResponse(Some(processIncrementalAlterConfigsRequest(request, remaining)))
+    }
+  }
+
+  def processIncrementalAlterConfigsRequest(
+    originalRequest: RequestChannel.Request,
+    data: IncrementalAlterConfigsRequestData
+  ): IncrementalAlterConfigsResponseData = {
+    val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(originalRequest))
+    val configs = data.resources.iterator.asScala.map { alterConfigResource =>
       val configResource = new ConfigResource(ConfigResource.Type.forId(alterConfigResource.resourceType),
         alterConfigResource.resourceName)
       configResource -> alterConfigResource.configs.iterator.asScala.map {
@@ -2765,20 +2823,18 @@ class KafkaApis(val requestChannel: RequestChannel,
     val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) =>
       resource.`type` match {
         case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
-          authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
+          authHelper.authorize(originalRequest.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
-          authHelper.authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
+          authHelper.authorize(originalRequest.context, ALTER_CONFIGS, TOPIC, resource.name)
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
       }
     }
 
-    val authorizedResult = zkSupport.adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly)
+    val authorizedResult = zkSupport.adminManager.incrementalAlterConfigs(authorizedResources, data.validateOnly)
     val unauthorizedResult = unauthorizedResources.keys.map { resource =>
       resource -> configsAuthorizationApiError(resource)
     }
-
-    requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new IncrementalAlterConfigsResponse(
-      requestThrottleMs, (authorizedResult ++ unauthorizedResult).asJava))
+    new IncrementalAlterConfigsResponse(0, (authorizedResult ++ unauthorizedResult).asJava).data()
   }
 
   def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {
@@ -3287,7 +3343,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val brokers = metadataCache.getAliveBrokerNodes(request.context.listenerName)
-    val controllerId = metadataSupport.controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)
+    val controllerId = metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)
 
     requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
       val data = new DescribeClusterResponseData()
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index d384427..bd899e1 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -121,7 +121,7 @@ class KafkaServer(
   var tokenManager: DelegationTokenManager = null
 
   var dynamicConfigHandlers: Map[String, ConfigHandler] = null
-  var dynamicConfigManager: DynamicConfigManager = null
+  var dynamicConfigManager: ZkConfigManager = null
   var credentialProvider: CredentialProvider = null
   var tokenCache: DelegationTokenCache = null
 
@@ -233,7 +233,7 @@ class KafkaServer(
         this.logIdent = logContext.logPrefix
 
         // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
-        // applied after DynamicConfigManager starts.
+        // applied after ZkConfigManager starts.
         config.dynamicConfig.initialize(Some(zkClient))
 
         /* start scheduler */
@@ -428,6 +428,7 @@ class KafkaServer(
 
         /* Add all reconfigurables for config change notification before starting config handlers */
         config.dynamicConfig.addReconfigurables(this)
+        Option(logManager.cleaner).foreach(config.dynamicConfig.addBrokerReconfigurable)
 
         /* start dynamic config manager */
         dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, Some(kafkaController)),
@@ -437,7 +438,7 @@ class KafkaServer(
                                                            ConfigType.Ip -> new IpConfigHandler(socketServer.connectionQuotas))
 
         // Create the config manager. start listening to notifications
-        dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
+        dynamicConfigManager = new ZkConfigManager(zkClient, dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
         socketServer.startProcessingRequests(authorizerFutures)
diff --git a/core/src/main/scala/kafka/server/MetadataSupport.scala b/core/src/main/scala/kafka/server/MetadataSupport.scala
index ecacffa..aeedd40 100644
--- a/core/src/main/scala/kafka/server/MetadataSupport.scala
+++ b/core/src/main/scala/kafka/server/MetadataSupport.scala
@@ -56,11 +56,19 @@ sealed trait MetadataSupport {
    */
   def ensureConsistentWith(config: KafkaConfig): Unit
 
-  def maybeForward(request: RequestChannel.Request,
-                   handler: RequestChannel.Request => Unit,
-                   responseCallback: Option[AbstractResponse] => Unit): Unit
-
-  def controllerId: Option[Int]
+  def canForward(): Boolean
+
+  def maybeForward(
+    request: RequestChannel.Request,
+    handler: RequestChannel.Request => Unit,
+    responseCallback: Option[AbstractResponse] => Unit
+  ): Unit = {
+    if (!request.isForwarded && canForward()) {
+      forwardingManager.get.forwardRequest(request, responseCallback)
+    } else {
+      handler(request)
+    }
+  }
 }
 
 case class ZkSupport(adminManager: ZkAdminManager,
@@ -79,16 +87,7 @@ case class ZkSupport(adminManager: ZkAdminManager,
     }
   }
 
-  override def maybeForward(request: RequestChannel.Request,
-                            handler: RequestChannel.Request => Unit,
-                            responseCallback: Option[AbstractResponse] => Unit): Unit = {
-    forwardingManager match {
-      case Some(mgr) if !request.isForwarded && !controller.isActive => mgr.forwardRequest(request, responseCallback)
-      case _ => handler(request)
-    }
-  }
-
-  override def controllerId: Option[Int] =  metadataCache.getControllerId
+  override def canForward(): Boolean = forwardingManager.isDefined && (!controller.isActive)
 }
 
 case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: KRaftMetadataCache)
@@ -103,19 +102,5 @@ case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: KRaftMetadataCa
     }
   }
 
-  override def maybeForward(request: RequestChannel.Request,
-                            handler: RequestChannel.Request => Unit,
-                            responseCallback: Option[AbstractResponse] => Unit): Unit = {
-    if (!request.isForwarded) {
-      fwdMgr.forwardRequest(request, responseCallback)
-    } else {
-      handler(request) // will reject
-    }
-  }
-
-  /**
-   * Get the broker ID to return from a MetadataResponse. This will be a broker ID, as
-   * described in KRaftMetadataCache#getControllerId. See that function for more details.
-   */
-  override def controllerId: Option[Int] = metadataCache.getControllerId
+  override def canForward(): Boolean = true
 }
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index d2e7456..465004d 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -22,18 +22,16 @@ import java.util.Properties
 import kafka.admin.{AdminOperationException, AdminUtils}
 import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.log.LogConfig
-import kafka.utils.Log4jController
 import kafka.metrics.KafkaMetricsGroup
+import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs, toLoggableProps}
 import kafka.server.DynamicConfig.QuotaConfigs
 import kafka.server.metadata.ZkConfigRepository
 import kafka.utils._
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism}
-import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.config.ConfigDef.ConfigKey
-import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource, LogLevelConfig}
+import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource}
 import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
 import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException}
 import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult
@@ -388,14 +386,10 @@ class ZkAdminManager(val config: KafkaConfig,
     }
   }
 
-    def alterConfigs(configs: Map[ConfigResource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
+  def alterConfigs(configs: Map[ConfigResource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
     configs.map { case (resource, config) =>
 
       try {
-        val nullUpdates = config.entries.asScala.filter(_.value == null).map(_.name)
-        if (nullUpdates.nonEmpty)
-          throw new InvalidRequestException(s"Null value not supported for : ${nullUpdates.mkString(",")}")
-
         val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
 
         val configProps = new Properties
@@ -468,29 +462,6 @@ class ZkAdminManager(val config: KafkaConfig,
     resource -> ApiError.NONE
   }
 
-  private def toLoggableProps(resource: ConfigResource, configProps: Properties): Map[String, String] = {
-    configProps.asScala.map {
-      case (key, value) => (key, KafkaConfig.loggableValue(resource.`type`, key, value))
-    }
-  }
-
-  private def alterLogLevelConfigs(alterConfigOps: Seq[AlterConfigOp]): Unit = {
-    alterConfigOps.foreach { alterConfigOp =>
-      val loggerName = alterConfigOp.configEntry().name()
-      val logLevel = alterConfigOp.configEntry().value()
-      alterConfigOp.opType() match {
-        case OpType.SET =>
-          info(s"Updating the log level of $loggerName to $logLevel")
-          Log4jController.logLevel(loggerName, logLevel)
-        case OpType.DELETE =>
-          info(s"Unset the log level of $loggerName")
-          Log4jController.unsetLogLevel(loggerName)
-        case _ => throw new IllegalArgumentException(
-          s"Log level cannot be changed for OpType: ${alterConfigOp.opType()}")
-      }
-    }
-  }
-
   private def getBrokerId(resource: ConfigResource) = {
     if (resource.name == null || resource.name.isEmpty)
       None
@@ -514,18 +485,6 @@ class ZkAdminManager(val config: KafkaConfig,
   def incrementalAlterConfigs(configs: Map[ConfigResource, Seq[AlterConfigOp]], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
     configs.map { case (resource, alterConfigOps) =>
       try {
-        // throw InvalidRequestException if any duplicate keys
-        val duplicateKeys = alterConfigOps.groupBy(config => config.configEntry.name).filter { case (_, v) =>
-          v.size > 1
-        }.keySet
-        if (duplicateKeys.nonEmpty)
-          throw new InvalidRequestException(s"Error due to duplicate config keys : ${duplicateKeys.mkString(",")}")
-        val nullUpdates = alterConfigOps
-          .filter(entry => entry.configEntry.value == null && entry.opType() != OpType.DELETE)
-          .map(entry => s"${entry.opType}:${entry.configEntry.name}")
-        if (nullUpdates.nonEmpty)
-          throw new InvalidRequestException(s"Null value not supported for : ${nullUpdates.mkString(",")}")
-
         val configEntriesMap = alterConfigOps.map(entry => (entry.configEntry.name, entry.configEntry.value)).toMap
 
         resource.`type` match {
@@ -545,13 +504,6 @@ class ZkAdminManager(val config: KafkaConfig,
             prepareIncrementalConfigs(alterConfigOps, configProps, KafkaConfig.configKeys)
             alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap)
 
-          case ConfigResource.Type.BROKER_LOGGER =>
-            getBrokerId(resource)
-            validateLogLevelConfigs(alterConfigOps)
-
-            if (!validateOnly)
-              alterLogLevelConfigs(alterConfigOps)
-            resource -> ApiError.NONE
           case resourceType =>
             throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType")
         }
@@ -572,73 +524,6 @@ class ZkAdminManager(val config: KafkaConfig,
     }.toMap
   }
 
-  private def validateLogLevelConfigs(alterConfigOps: Seq[AlterConfigOp]): Unit = {
-    def validateLoggerNameExists(loggerName: String): Unit = {
-      if (!Log4jController.loggerExists(loggerName))
-        throw new ConfigException(s"Logger $loggerName does not exist!")
-    }
-
-    alterConfigOps.foreach { alterConfigOp =>
-      val loggerName = alterConfigOp.configEntry.name
-      alterConfigOp.opType() match {
-        case OpType.SET =>
-          validateLoggerNameExists(loggerName)
-          val logLevel = alterConfigOp.configEntry.value
-          if (!LogLevelConfig.VALID_LOG_LEVELS.contains(logLevel)) {
-            val validLevelsStr = LogLevelConfig.VALID_LOG_LEVELS.asScala.mkString(", ")
-            throw new ConfigException(
-              s"Cannot set the log level of $loggerName to $logLevel as it is not a supported log level. " +
-              s"Valid log levels are $validLevelsStr"
-            )
-          }
-        case OpType.DELETE =>
-          validateLoggerNameExists(loggerName)
-          if (loggerName == Log4jController.ROOT_LOGGER)
-            throw new InvalidRequestException(s"Removing the log level of the ${Log4jController.ROOT_LOGGER} logger is not allowed")
-        case OpType.APPEND => throw new InvalidRequestException(s"${OpType.APPEND} operation is not allowed for the ${ConfigResource.Type.BROKER_LOGGER} resource")
-        case OpType.SUBTRACT => throw new InvalidRequestException(s"${OpType.SUBTRACT} operation is not allowed for the ${ConfigResource.Type.BROKER_LOGGER} resource")
-      }
-    }
-  }
-
-  private def prepareIncrementalConfigs(alterConfigOps: Seq[AlterConfigOp], configProps: Properties, configKeys: Map[String, ConfigKey]): Unit = {
-
-    def listType(configName: String, configKeys: Map[String, ConfigKey]): Boolean = {
-      val configKey = configKeys(configName)
-      if (configKey == null)
-        throw new InvalidConfigurationException(s"Unknown topic config name: $configName")
-      configKey.`type` == ConfigDef.Type.LIST
-    }
-
-    alterConfigOps.foreach { alterConfigOp =>
-      val configPropName = alterConfigOp.configEntry.name
-      alterConfigOp.opType() match {
-        case OpType.SET => configProps.setProperty(alterConfigOp.configEntry.name, alterConfigOp.configEntry.value)
-        case OpType.DELETE => configProps.remove(alterConfigOp.configEntry.name)
-        case OpType.APPEND => {
-          if (!listType(alterConfigOp.configEntry.name, configKeys))
-            throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry.name}")
-          val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name))
-            .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST)))
-            .getOrElse("")
-            .split(",").toList
-          val newValueList = oldValueList ::: alterConfigOp.configEntry.value.split(",").toList
-          configProps.setProperty(alterConfigOp.configEntry.name, newValueList.mkString(","))
-        }
-        case OpType.SUBTRACT => {
-          if (!listType(alterConfigOp.configEntry.name, configKeys))
-            throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry.name}")
-          val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name))
-            .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST)))
-            .getOrElse("")
-            .split(",").toList
-          val newValueList = oldValueList.diff(alterConfigOp.configEntry.value.split(",").toList)
-          configProps.setProperty(alterConfigOp.configEntry.name, newValueList.mkString(","))
-        }
-      }
-    }
-  }
-
   def shutdown(): Unit = {
     topicPurgatory.shutdown()
     CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this)
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/ZkConfigManager.scala
similarity index 96%
rename from core/src/main/scala/kafka/server/DynamicConfigManager.scala
rename to core/src/main/scala/kafka/server/ZkConfigManager.scala
index 3eed382..c763d3f 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/ZkConfigManager.scala
@@ -59,7 +59,7 @@ object ConfigEntityName {
  *
  * To avoid watching all topics for changes instead we have a notification path
  *   /config/changes
- * The DynamicConfigManager has a child watch on this path.
+ * The ZkConfigManager has a child watch on this path.
  *
  * To update a config we first update the config properties. Then we create a new sequential
  * znode under the change path which contains the name of the entityType and entityName that was updated, say
@@ -84,10 +84,12 @@ object ConfigEntityName {
  * on startup where a change might be missed between the initial config load and registering for change notifications.
  *
  */
-class DynamicConfigManager(private val zkClient: KafkaZkClient,
-                           private val configHandlers: Map[String, ConfigHandler],
-                           private val changeExpirationMs: Long = 15*60*1000,
-                           private val time: Time = Time.SYSTEM) extends Logging {
+class ZkConfigManager(
+  private val zkClient: KafkaZkClient,
+  private val configHandlers: Map[String, ConfigHandler],
+  private val changeExpirationMs: Long = 15*60*1000,
+  private val time: Time = Time.SYSTEM
+) extends Logging {
   val adminZkClient = new AdminZkClient(zkClient)
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
@@ -182,3 +184,4 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
     configChangeListener.close()
   }
 }
+
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 702d227..55fc1f8 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -252,6 +252,9 @@ class BrokerMetadataListener(
     val delta = _delta
     _image = _delta.apply()
     _delta = new MetadataDelta(_image)
+    if (isDebugEnabled) {
+      debug(s"Publishing new metadata delta ${delta} at offset ${_image.highestOffsetAndEpoch().offset}.")
+    }
     publisher.publish(delta, _image)
   }
 
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 2c56ed6..7c6d190 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -19,12 +19,12 @@ package kafka.server.metadata
 
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
-import kafka.log.{UnifiedLog, LogManager}
-import kafka.server.ConfigType
-import kafka.server.{ConfigEntityName, ConfigHandler, FinalizedFeatureCache, KafkaConfig, ReplicaManager, RequestLocal}
+import kafka.log.{LogManager, UnifiedLog}
+import kafka.server.ConfigAdminManager.toLoggableProps
+import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, FinalizedFeatureCache, KafkaConfig, ReplicaManager, RequestLocal}
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}
 
@@ -175,19 +175,31 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
 
       // Apply configuration deltas.
       Option(delta.configsDelta()).foreach { configsDelta =>
-        configsDelta.changes().keySet().forEach { configResource =>
-          val tag = configResource.`type`() match {
-            case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)
-            case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
-            case _ => None
-          }
-          tag.foreach { t =>
-            val newProperties = newImage.configs().configProperties(configResource)
-            val maybeDefaultName = configResource.name() match {
-              case "" => ConfigEntityName.Default
-              case k => k
+        configsDelta.changes().keySet().forEach { resource =>
+          val props = newImage.configs().configProperties(resource)
+          resource.`type`() match {
+            case TOPIC =>
+              // Apply changes to a topic's dynamic configuration.
+              info(s"Updating topic ${resource.name()} with new configuration : " +
+                toLoggableProps(resource, props).mkString(","))
+              dynamicConfigHandlers(ConfigType.Topic).
+                processConfigChanges(resource.name(), props)
+              conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
+            case BROKER => if (resource.name().isEmpty) {
+              // Apply changes to "cluster configs" (also known as default BROKER configs).
+              // These are stored in KRaft with an empty name field.
+              info(s"Updating cluster configuration : " +
+                toLoggableProps(resource, props).mkString(","))
+              dynamicConfigHandlers(ConfigType.Broker).
+                processConfigChanges(ConfigEntityName.Default, props)
+            } else if (resource.name().equals(brokerId.toString)) {
+              // Apply changes to this broker's dynamic configuration.
+              info(s"Updating broker ${brokerId} with new configuration : " +
+                toLoggableProps(resource, props).mkString(","))
+              dynamicConfigHandlers(ConfigType.Broker).
+                processConfigChanges(resource.name(), props)
             }
-            dynamicConfigHandlers(t).processConfigChanges(maybeDefaultName, newProperties)
+            case _ => // nothing to do
           }
         }
       }
@@ -258,6 +270,11 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
     // recovery-from-unclean-shutdown if required.
     logManager.startup(metadataCache.getAllTopics())
 
+    // Make the LogCleaner available for reconfiguration. We can't do this prior to this
+    // point because LogManager#startup creates the LogCleaner object, if
+    // log.cleaner.enable is true. TODO: improve this (see KAFKA-13610)
+    Option(logManager.cleaner).foreach(conf.dynamicConfig.addBrokerReconfigurable)
+
     // Start the replica manager.
     replicaManager.startup()
 
diff --git a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
index 95fe752..8f8dfcd 100644
--- a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
@@ -19,7 +19,7 @@ package kafka.server.metadata
 
 import java.util.Properties
 
-import kafka.server.ConfigType
+import kafka.server.{ConfigEntityName, ConfigType}
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type
@@ -37,6 +37,13 @@ class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository
       case Type.BROKER => ConfigType.Broker
       case tpe => throw new IllegalArgumentException(s"Unsupported config type: $tpe")
     }
-    adminZkClient.fetchEntityConfig(configTypeForZk, configResource.name)
+    // ZK stores cluster configs under "<default>".
+    val effectiveName = if (configResource.`type`.equals(Type.BROKER) &&
+        configResource.name.isEmpty()) {
+      ConfigEntityName.Default
+    } else {
+      configResource.name
+    }
+    adminZkClient.fetchEntityConfig(configTypeForZk, effectiveName)
   }
 }
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index bd66387..1dd4096 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -26,19 +26,20 @@ import java.time.Duration
 import java.util
 import java.util.{Collections, Properties}
 import java.util.concurrent._
+
 import javax.management.ObjectName
 import com.yammer.metrics.core.MetricName
 import kafka.admin.ConfigCommand
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
-import kafka.log.LogConfig
+import kafka.log.{CleanerConfig, LogConfig}
 import kafka.message.ProducerCompressionCodec
 import kafka.metrics.KafkaYammerMetrics
 import kafka.network.{Processor, RequestChannel}
 import kafka.server.QuorumTestHarness
 import kafka.utils._
 import kafka.utils.Implicits._
-import kafka.zk.{ConfigEntityChangeNotificationZNode}
+import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
@@ -485,12 +486,15 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     props.put(KafkaConfig.MessageMaxBytesProp, "40000")
     props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "50000000")
     props.put(KafkaConfig.LogCleanerBackoffMsProp, "6000")
-    reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogCleanerThreadsProp, "2"))
 
     // Verify cleaner config was updated. Wait for one of the configs to be updated and verify
     // that all other others were updated at the same time since they are reconfigured together
-    val newCleanerConfig = servers.head.logManager.cleaner.currentConfig
-    TestUtils.waitUntilTrue(() => newCleanerConfig.numThreads == 2, "Log cleaner not reconfigured")
+    var newCleanerConfig: CleanerConfig = null
+    TestUtils.waitUntilTrue(() => {
+      reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogCleanerThreadsProp, "2"))
+      newCleanerConfig = servers.head.logManager.cleaner.currentConfig
+      newCleanerConfig.numThreads == 2
+    }, "Log cleaner not reconfigured", 60000)
     assertEquals(20000000, newCleanerConfig.dedupeBufferSize)
     assertEquals(0.8, newCleanerConfig.dedupeBufferLoadFactor, 0.001)
     assertEquals(300000, newCleanerConfig.ioBufferSize)
@@ -1426,7 +1430,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
         Seq(new ConfigResource(ConfigResource.Type.BROKER, ""))
       brokerResources.foreach { brokerResource =>
         val exception = assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get)
-        assertTrue(exception.getCause.isInstanceOf[InvalidRequestException])
+        assertEquals(classOf[InvalidRequestException], exception.getCause().getClass())
       }
       servers.foreach { server =>
         assertEquals(oldProps, server.config.values.asScala.filter { case (k, _) => newProps.containsKey(k) })
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 25e79ad..c62dbd5 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -21,18 +21,26 @@ import kafka.network.SocketServer
 import kafka.server.IntegrationTestUtils.connectAndReceive
 import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes}
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{Admin, NewPartitionReassignment, NewTopic}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, Config, ConfigEntry, NewPartitionReassignment, NewTopic}
 import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo}
 import org.apache.kafka.common.message.DescribeClusterRequestData
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
-import org.apache.kafka.common.requests.{DescribeClusterRequest, DescribeClusterResponse}
+import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, DescribeClusterResponse}
 import org.apache.kafka.metadata.BrokerState
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{Tag, Test, Timeout}
-
 import java.util
+import java.util.concurrent.ExecutionException
 import java.util.{Arrays, Collections, Optional}
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.ConfigResource.Type
+import org.apache.kafka.common.protocol.Errors._
+import org.slf4j.LoggerFactory
+
+import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
 import scala.jdk.CollectionConverters._
@@ -40,6 +48,8 @@ import scala.jdk.CollectionConverters._
 @Timeout(120)
 @Tag("integration")
 class KRaftClusterTest {
+  val log = LoggerFactory.getLogger(classOf[KRaftClusterTest])
+  val log2 = LoggerFactory.getLogger(classOf[KRaftClusterTest].getCanonicalName() + "2")
 
   @Test
   def testCreateClusterAndClose(): Unit = {
@@ -459,4 +469,248 @@ class KRaftClusterTest {
       topicsNotFound.isEmpty && extraTopics.isEmpty
     }, s"Failed to find topic(s): ${topicsNotFound.asScala} and NOT find topic(s): ${extraTopics}")
   }
+
+  private def incrementalAlter(
+    admin: Admin,
+    changes: Seq[(ConfigResource, Seq[AlterConfigOp])]
+  ): Seq[ApiError] = {
+    val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
+    changes.foreach {
+      case (resource, ops) => configs.put(resource, ops.asJava)
+    }
+    val values = admin.incrementalAlterConfigs(configs).values()
+    changes.map {
+      case (resource, _) => try {
+        values.get(resource).get()
+        ApiError.NONE
+      } catch {
+        case e: ExecutionException => ApiError.fromThrowable(e.getCause)
+        case t: Throwable => ApiError.fromThrowable(t)
+      }
+    }
+  }
+
+  private def validateConfigs(
+    admin: Admin,
+    expected: Map[ConfigResource, Seq[(String, String)]],
+    exhaustive: Boolean = false
+  ): Map[ConfigResource, util.Map[String, String]] = {
+    val results = new mutable.HashMap[ConfigResource, util.Map[String, String]]()
+    TestUtils.retry(60000) {
+      try {
+        val values = admin.describeConfigs(expected.keySet.asJava).values()
+        results.clear()
+        assertEquals(expected.keySet, values.keySet().asScala)
+        expected.foreach {
+          case (resource, pairs) =>
+            val config = values.get(resource).get()
+            val actual = new util.TreeMap[String, String]()
+            val expected = new util.TreeMap[String, String]()
+            config.entries().forEach {
+              case entry =>
+                actual.put(entry.name(), entry.value())
+                if (!exhaustive) {
+                  expected.put(entry.name(), entry.value())
+                }
+            }
+            pairs.foreach {
+              case (k, v) => expected.put(k, v)
+            }
+            assertEquals(expected, actual)
+            results.put(resource, actual)
+        }
+      } catch {
+        case t: Throwable =>
+          log.warn(s"Unable to describeConfigs(${expected.keySet.asJava})", t)
+          throw t
+      }
+    }
+    results.toMap
+  }
+
+  @Test
+  def testIncrementalAlterConfigs(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        assertEquals(Seq(ApiError.NONE), incrementalAlter(admin, Seq(
+          (new ConfigResource(Type.BROKER, ""), Seq(
+            new AlterConfigOp(new ConfigEntry("log.roll.ms", "1234567"), OpType.SET),
+            new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "6"), OpType.SET))))))
+        validateConfigs(admin, Map(new ConfigResource(Type.BROKER, "") -> Seq(
+          ("log.roll.ms", "1234567"),
+          ("max.connections.per.ip", "6"))), true)
+
+        admin.createTopics(Arrays.asList(
+          new NewTopic("foo", 2, 3.toShort),
+          new NewTopic("bar", 2, 3.toShort))).all().get()
+        TestUtils.waitForAllPartitionsMetadata(cluster.brokers().values().asScala.toSeq, "foo", 2)
+        TestUtils.waitForAllPartitionsMetadata(cluster.brokers().values().asScala.toSeq, "bar", 2)
+
+        validateConfigs(admin, Map(new ConfigResource(Type.TOPIC, "bar") -> Seq()))
+
+        assertEquals(Seq(ApiError.NONE,
+            new ApiError(INVALID_CONFIG, "Unknown topic config name: not.a.real.topic.config"),
+            new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "The topic 'baz' does not exist.")),
+          incrementalAlter(admin, Seq(
+            (new ConfigResource(Type.TOPIC, "foo"), Seq(
+              new AlterConfigOp(new ConfigEntry("segment.jitter.ms", "345"), OpType.SET))),
+            (new ConfigResource(Type.TOPIC, "bar"), Seq(
+              new AlterConfigOp(new ConfigEntry("not.a.real.topic.config", "789"), OpType.SET))),
+            (new ConfigResource(Type.TOPIC, "baz"), Seq(
+              new AlterConfigOp(new ConfigEntry("segment.jitter.ms", "678"), OpType.SET))))))
+
+        validateConfigs(admin, Map(new ConfigResource(Type.TOPIC, "foo") -> Seq(
+          ("segment.jitter.ms", "345"))))
+
+        assertEquals(Seq(ApiError.NONE), incrementalAlter(admin, Seq(
+          (new ConfigResource(Type.BROKER, "2"), Seq(
+            new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "7"), OpType.SET))))))
+
+        validateConfigs(admin, Map(new ConfigResource(Type.BROKER, "2") -> Seq(
+          ("max.connections.per.ip", "7"))))
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testSetLog4jConfigurations(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        Seq(log, log2).foreach(_.debug("setting log4j"))
+
+        val broker2 = new ConfigResource(Type.BROKER_LOGGER, "2")
+        val broker3 = new ConfigResource(Type.BROKER_LOGGER, "3")
+        val initialLog4j = validateConfigs(admin, Map(broker2 -> Seq()))
+
+        assertEquals(Seq(ApiError.NONE,
+            new ApiError(INVALID_REQUEST, "APPEND operation is not allowed for the BROKER_LOGGER resource")),
+          incrementalAlter(admin, Seq(
+            (broker2, Seq(
+              new AlterConfigOp(new ConfigEntry(log.getName(), "TRACE"), OpType.SET),
+              new AlterConfigOp(new ConfigEntry(log2.getName(), "TRACE"), OpType.SET))),
+            (broker3, Seq(
+              new AlterConfigOp(new ConfigEntry(log.getName(), "TRACE"), OpType.APPEND),
+              new AlterConfigOp(new ConfigEntry(log2.getName(), "TRACE"), OpType.APPEND))))))
+
+        validateConfigs(admin, Map(broker2 -> Seq(
+          (log.getName(), "TRACE"),
+          (log2.getName(), "TRACE"))))
+
+        assertEquals(Seq(ApiError.NONE,
+          new ApiError(INVALID_REQUEST, "SUBTRACT operation is not allowed for the BROKER_LOGGER resource")),
+          incrementalAlter(admin, Seq(
+            (broker2, Seq(
+              new AlterConfigOp(new ConfigEntry(log.getName(), ""), OpType.DELETE),
+              new AlterConfigOp(new ConfigEntry(log2.getName(), ""), OpType.DELETE))),
+            (broker3, Seq(
+              new AlterConfigOp(new ConfigEntry(log.getName(), "TRACE"), OpType.SUBTRACT),
+              new AlterConfigOp(new ConfigEntry(log2.getName(), "TRACE"), OpType.SUBTRACT))))))
+
+        validateConfigs(admin, Map(broker2 -> Seq(
+          (log.getName(), initialLog4j.get(broker2).get.get(log.getName())),
+          (log2.getName(), initialLog4j.get(broker2).get.get(log2.getName())))))
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @nowarn("cat=deprecation") // Suppress warnings about using legacy alterConfigs
+  def legacyAlter(
+    admin: Admin,
+    resources: Map[ConfigResource, Seq[ConfigEntry]]
+  ): Seq[ApiError] = {
+    val configs = new util.HashMap[ConfigResource, Config]()
+    resources.foreach {
+      case (resource, entries) => configs.put(resource, new Config(entries.asJava))
+    }
+    val values = admin.alterConfigs(configs).values()
+    resources.map {
+      case (resource, _) => try {
+        values.get(resource).get()
+        ApiError.NONE
+      } catch {
+        case e: ExecutionException => ApiError.fromThrowable(e.getCause)
+        case t: Throwable => ApiError.fromThrowable(t)
+      }
+    }.toSeq
+  }
+
+  @Test
+  def testLegacyAlterConfigs(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        val defaultBroker = new ConfigResource(Type.BROKER, "")
+
+        assertEquals(Seq(ApiError.NONE), legacyAlter(admin, Map(defaultBroker -> Seq(
+          new ConfigEntry("log.roll.ms", "1234567"),
+          new ConfigEntry("max.connections.per.ip", "6")))))
+
+        validateConfigs(admin, Map(defaultBroker -> Seq(
+          ("log.roll.ms", "1234567"),
+          ("max.connections.per.ip", "6"))), true)
+
+        assertEquals(Seq(ApiError.NONE), legacyAlter(admin, Map(defaultBroker -> Seq(
+          new ConfigEntry("log.roll.ms", "1234567")))))
+
+        // Since max.connections.per.ip was left out of the previous legacyAlter, it is removed.
+        validateConfigs(admin, Map(defaultBroker -> Seq(
+          ("log.roll.ms", "1234567"))), true)
+
+        admin.createTopics(Arrays.asList(
+          new NewTopic("foo", 2, 3.toShort),
+          new NewTopic("bar", 2, 3.toShort))).all().get()
+        TestUtils.waitForAllPartitionsMetadata(cluster.brokers().values().asScala.toSeq, "foo", 2)
+        TestUtils.waitForAllPartitionsMetadata(cluster.brokers().values().asScala.toSeq, "bar", 2)
+        assertEquals(Seq(ApiError.NONE,
+            new ApiError(INVALID_CONFIG, "Unknown topic config name: not.a.real.topic.config"),
+            new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "The topic 'baz' does not exist.")),
+          legacyAlter(admin, Map(
+            new ConfigResource(Type.TOPIC, "foo") -> Seq(
+              new ConfigEntry("segment.jitter.ms", "345")),
+            new ConfigResource(Type.TOPIC, "bar") -> Seq(
+              new ConfigEntry("not.a.real.topic.config", "789")),
+            new ConfigResource(Type.TOPIC, "baz") -> Seq(
+              new ConfigEntry("segment.jitter.ms", "678")))))
+
+        validateConfigs(admin, Map(new ConfigResource(Type.TOPIC, "foo") -> Seq(
+          ("segment.jitter.ms", "345"))))
+
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala b/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
new file mode 100644
index 0000000..19390c3
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.Collections
+
+import kafka.server.metadata.MockConfigRepository
+import kafka.utils.{Log4jController, TestUtils}
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, TOPIC, UNKNOWN}
+import org.apache.kafka.common.config.LogLevelConfig.VALID_LOG_LEVELS
+import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException}
+import org.apache.kafka.common.message.{AlterConfigsRequestData, AlterConfigsResponseData, IncrementalAlterConfigsRequestData, IncrementalAlterConfigsResponseData}
+import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource}
+import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection => LAlterConfigsResourceCollection}
+import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection => LAlterableConfigCollection}
+import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse}
+import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig => LAlterableConfig}
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource}
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResourceCollection => IAlterConfigsResourceCollection}
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfig => IAlterableConfig}
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfigCollection => IAlterableConfigCollection}
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, NONE}
+import org.apache.kafka.common.requests.ApiError
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.{Assertions, Test}
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+
+class ConfigAdminManagerTest {
+  val logger = LoggerFactory.getLogger(classOf[ConfigAdminManagerTest])
+
+  def newConfigAdminManager(brokerId: Integer): ConfigAdminManager = {
+    val config = TestUtils.createBrokerConfig(nodeId = brokerId, zkConnect = null)
+    new ConfigAdminManager(brokerId, new KafkaConfig(config), new MockConfigRepository())
+  }
+
+  def broker0Incremental(): IAlterConfigsResource = new IAlterConfigsResource().
+    setResourceName("0").
+    setResourceType(BROKER.id()).
+    setConfigs(new IAlterableConfigCollection(
+      util.Arrays.asList(new IAlterableConfig().setName("foo").
+        setValue("bar").
+        setConfigOperation(OpType.SET.id())).iterator()))
+
+  def topicAIncremental(): IAlterConfigsResource = new IAlterConfigsResource().
+    setResourceName("a").
+    setResourceType(TOPIC.id()).
+    setConfigs(new IAlterableConfigCollection(
+      util.Arrays.asList(new IAlterableConfig().setName("foo").
+        setValue("bar").
+        setConfigOperation(OpType.SET.id())).iterator()))
+
+  def broker0Legacy(): LAlterConfigsResource = new LAlterConfigsResource().
+    setResourceName("0").
+    setResourceType(BROKER.id()).
+    setConfigs(new LAlterableConfigCollection(
+      util.Arrays.asList(new LAlterableConfig().setName("foo").
+        setValue("bar")).iterator()))
+
+  def topicALegacy(): LAlterConfigsResource = new LAlterConfigsResource().
+    setResourceName("a").
+    setResourceType(TOPIC.id()).
+    setConfigs(new LAlterableConfigCollection(
+      util.Arrays.asList(new LAlterableConfig().setName("foo").
+        setValue("bar")).iterator()))
+
+  val invalidRequestError = new ApiError(INVALID_REQUEST)
+
+  @Test
+  def testCopyWithoutPreprocessedForIncremental(): Unit = {
+    val broker0 = broker0Incremental()
+    val topicA = topicAIncremental()
+    val request = new IncrementalAlterConfigsRequestData().setValidateOnly(true).
+      setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+        broker0, topicA).iterator()))
+    val processed1 = new util.IdentityHashMap[IAlterConfigsResource, ApiError]()
+    processed1.put(broker0, ApiError.NONE)
+    assertEquals(new IncrementalAlterConfigsRequestData().setValidateOnly(true).
+      setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+        topicA.duplicate()).iterator())),
+        ConfigAdminManager.copyWithoutPreprocessed(request, processed1))
+    val processed2 = new util.IdentityHashMap[IAlterConfigsResource, ApiError]()
+    assertEquals(new IncrementalAlterConfigsRequestData().setValidateOnly(true).
+      setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+        broker0.duplicate(), topicA.duplicate()).iterator())),
+      ConfigAdminManager.copyWithoutPreprocessed(request, processed2))
+    val processed3 = new util.IdentityHashMap[IAlterConfigsResource, ApiError]()
+    processed3.put(broker0, ApiError.NONE)
+    processed3.put(topicA, ApiError.NONE)
+    assertEquals(new IncrementalAlterConfigsRequestData().setValidateOnly(true),
+      ConfigAdminManager.copyWithoutPreprocessed(request, processed3))
+  }
+
+  @Test
+  def testCopyWithoutPreprocessedForLegacy(): Unit = {
+    val broker0 = broker0Legacy()
+    val topicA = topicALegacy()
+    val request = new AlterConfigsRequestData().setValidateOnly(true).
+      setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+        broker0, topicA).iterator()))
+    val processed1 = new util.IdentityHashMap[LAlterConfigsResource, ApiError]()
+    processed1.put(broker0, ApiError.NONE)
+    assertEquals(new AlterConfigsRequestData().setValidateOnly(true).
+      setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+        topicA.duplicate()).iterator())),
+      ConfigAdminManager.copyWithoutPreprocessed(request, processed1))
+    val processed2 = new util.IdentityHashMap[LAlterConfigsResource, ApiError]()
+    assertEquals(new AlterConfigsRequestData().setValidateOnly(true).
+      setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+        broker0.duplicate(), topicA.duplicate()).iterator())),
+      ConfigAdminManager.copyWithoutPreprocessed(request, processed2))
+    val processed3 = new util.IdentityHashMap[LAlterConfigsResource, ApiError]()
+    processed3.put(broker0, ApiError.NONE)
+    processed3.put(topicA, ApiError.NONE)
+    assertEquals(new AlterConfigsRequestData().setValidateOnly(true),
+      ConfigAdminManager.copyWithoutPreprocessed(request, processed3))
+  }
+
+  @Test
+  def testReassembleIncrementalResponse(): Unit = {
+    val broker0 = broker0Incremental()
+    val topicA = topicAIncremental()
+    val original = new IncrementalAlterConfigsRequestData().
+      setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+        broker0, topicA).iterator()))
+    val preprocessed1 = new util.IdentityHashMap[IAlterConfigsResource, ApiError]()
+    preprocessed1.put(broker0, invalidRequestError)
+    val persistentResponses1 = new IncrementalAlterConfigsResponseData().setResponses(
+      util.Arrays.asList(new IAlterConfigsResourceResponse().
+        setResourceName("a").
+        setResourceType(TOPIC.id()).
+        setErrorCode(NONE.code()).
+        setErrorMessage(null)))
+    assertEquals(new IncrementalAlterConfigsResponseData().setResponses(
+      util.Arrays.asList(new IAlterConfigsResourceResponse().
+        setResourceName("0").
+        setResourceType(BROKER.id()).
+        setErrorCode(INVALID_REQUEST.code()).
+        setErrorMessage(INVALID_REQUEST.message()),
+        new IAlterConfigsResourceResponse().
+          setResourceName("a").
+          setResourceType(TOPIC.id()).
+          setErrorCode(NONE.code()).
+          setErrorMessage(null))),
+      ConfigAdminManager.reassembleIncrementalResponse(original, preprocessed1, persistentResponses1))
+    val preprocessed2 = new util.IdentityHashMap[IAlterConfigsResource, ApiError]()
+    val persistentResponses2 = new IncrementalAlterConfigsResponseData().setResponses(
+      util.Arrays.asList(new IAlterConfigsResourceResponse().
+          setResourceName("0").
+          setResourceType(BROKER.id()).
+          setErrorCode(NONE.code()).
+          setErrorMessage(null),
+        new IAlterConfigsResourceResponse().
+          setResourceName("a").
+          setResourceType(TOPIC.id()).
+          setErrorCode(NONE.code()).
+          setErrorMessage(null),
+      ))
+    assertEquals(new IncrementalAlterConfigsResponseData().setResponses(
+      util.Arrays.asList(new IAlterConfigsResourceResponse().
+          setResourceName("0").
+          setResourceType(BROKER.id()).
+          setErrorCode(NONE.code()).
+          setErrorMessage(null),
+        new IAlterConfigsResourceResponse().
+          setResourceName("a").
+          setResourceType(TOPIC.id()).
+          setErrorCode(NONE.code()).
+          setErrorMessage(null))),
+      ConfigAdminManager.reassembleIncrementalResponse(original, preprocessed2, persistentResponses2))
+  }
+
+  @Test
+  def testReassembleLegacyResponse(): Unit = {
+    val broker0 = broker0Legacy()
+    val topicA = topicALegacy()
+    val original = new AlterConfigsRequestData().
+      setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+        broker0, topicA).iterator()))
+    val preprocessed1 = new util.IdentityHashMap[LAlterConfigsResource, ApiError]()
+    preprocessed1.put(broker0, invalidRequestError)
+    val persistentResponses1 = new AlterConfigsResponseData().setResponses(
+      util.Arrays.asList(new LAlterConfigsResourceResponse().
+        setResourceName("a").
+        setResourceType(TOPIC.id()).
+        setErrorCode(NONE.code()).
+        setErrorMessage(null)))
+    assertEquals(new AlterConfigsResponseData().setResponses(
+      util.Arrays.asList(new LAlterConfigsResourceResponse().
+        setResourceName("0").
+        setResourceType(BROKER.id()).
+        setErrorCode(INVALID_REQUEST.code()).
+        setErrorMessage(INVALID_REQUEST.message()),
+        new LAlterConfigsResourceResponse().
+          setResourceName("a").
+          setResourceType(TOPIC.id()).
+          setErrorCode(NONE.code()).
+          setErrorMessage(null))),
+      ConfigAdminManager.reassembleLegacyResponse(original, preprocessed1, persistentResponses1))
+    val preprocessed2 = new util.IdentityHashMap[LAlterConfigsResource, ApiError]()
+    val persistentResponses2 = new AlterConfigsResponseData().setResponses(
+      util.Arrays.asList(new LAlterConfigsResourceResponse().
+        setResourceName("0").
+        setResourceType(BROKER.id()).
+        setErrorCode(NONE.code()).
+        setErrorMessage(null),
+        new LAlterConfigsResourceResponse().
+          setResourceName("a").
+          setResourceType(TOPIC.id()).
+          setErrorCode(NONE.code()).
+          setErrorMessage(null),
+      ))
+    assertEquals(new AlterConfigsResponseData().setResponses(
+      util.Arrays.asList(new LAlterConfigsResourceResponse().
+        setResourceName("0").
+        setResourceType(BROKER.id()).
+        setErrorCode(NONE.code()).
+        setErrorMessage(null),
+        new LAlterConfigsResourceResponse().
+          setResourceName("a").
+          setResourceType(TOPIC.id()).
+          setErrorCode(NONE.code()).
+          setErrorMessage(null))),
+      ConfigAdminManager.reassembleLegacyResponse(original, preprocessed2, persistentResponses2))
+  }
+
+  @Test
+  def testValidateResourceNameIsCurrentNodeId(): Unit = {
+    val manager = newConfigAdminManager(5)
+    manager.validateResourceNameIsCurrentNodeId("5")
+    assertEquals("Node id must be an integer, but it is: ",
+      Assertions.assertThrows(classOf[InvalidRequestException],
+        () => manager.validateResourceNameIsCurrentNodeId("")).getMessage())
+    assertEquals("Unexpected broker id, expected 5, but received 3",
+      Assertions.assertThrows(classOf[InvalidRequestException],
+        () => manager.validateResourceNameIsCurrentNodeId("3")).getMessage())
+    assertEquals("Node id must be an integer, but it is: e",
+      Assertions.assertThrows(classOf[InvalidRequestException],
+        () => manager.validateResourceNameIsCurrentNodeId("e")).getMessage())
+  }
+
+  @Test
+  def testValidateLogLevelConfigs(): Unit = {
+    val manager = newConfigAdminManager(5)
+    manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
+      setName(logger.getName).
+      setConfigOperation(OpType.SET.id()).
+      setValue("TRACE")))
+    manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
+      setName(logger.getName).
+      setConfigOperation(OpType.DELETE.id()).
+      setValue("")))
+    assertEquals("APPEND operation is not allowed for the BROKER_LOGGER resource",
+      Assertions.assertThrows(classOf[InvalidRequestException],
+        () => manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
+          setName(logger.getName).
+          setConfigOperation(OpType.APPEND.id()).
+          setValue("TRACE")))).getMessage())
+    assertEquals(s"Cannot set the log level of ${logger.getName} to BOGUS as it is not " +
+      s"a supported log level. Valid log levels are ${VALID_LOG_LEVELS.asScala.mkString(", ")}",
+      Assertions.assertThrows(classOf[InvalidConfigurationException],
+        () => manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
+          setName(logger.getName).
+          setConfigOperation(OpType.SET.id()).
+          setValue("BOGUS")))).getMessage())
+  }
+
+  @Test
+  def testValidateRootLogLevelConfigs(): Unit = {
+    val manager = newConfigAdminManager(5)
+    manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
+      setName(Log4jController.ROOT_LOGGER).
+      setConfigOperation(OpType.SET.id()).
+      setValue("TRACE")))
+    assertEquals(s"Removing the log level of the ${Log4jController.ROOT_LOGGER} logger is not allowed",
+      Assertions.assertThrows(classOf[InvalidRequestException],
+        () => manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
+          setName(Log4jController.ROOT_LOGGER).
+          setConfigOperation(OpType.DELETE.id()).
+          setValue("")))).getMessage())
+  }
+
+  def brokerLogger1Incremental(): IAlterConfigsResource = new IAlterConfigsResource().
+    setResourceName("1").
+    setResourceType(BROKER_LOGGER.id).
+    setConfigs(new IAlterableConfigCollection(
+      util.Arrays.asList(new IAlterableConfig().setName(logger.getName).
+        setValue("INFO").
+        setConfigOperation(OpType.SET.id())).iterator()))
+
+  def brokerLogger2Incremental(): IAlterConfigsResource = new IAlterConfigsResource().
+    setResourceName("2").
+    setResourceType(BROKER_LOGGER.id).
+    setConfigs(new IAlterableConfigCollection(
+      util.Arrays.asList(new IAlterableConfig().setName(logger.getName).
+        setValue(null).
+        setConfigOperation(OpType.SET.id())).iterator()))
+
+  @Test
+  def testPreprocessIncrementalWithUnauthorizedBrokerLoggerChanges(): Unit = {
+    val manager = newConfigAdminManager(1)
+    val brokerLogger1 = brokerLogger1Incremental()
+    assertEquals(Collections.singletonMap(brokerLogger1,
+        new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)),
+      manager.preprocess(new IncrementalAlterConfigsRequestData().
+        setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+          brokerLogger1).iterator())),
+          (_, _) => false))
+  }
+
+  @Test
+  def testPreprocessIncrementalWithNulls(): Unit = {
+    val manager = newConfigAdminManager(2)
+    val brokerLogger2 = brokerLogger2Incremental()
+    assertEquals(Collections.singletonMap(brokerLogger2,
+      new ApiError(INVALID_REQUEST, s"Null value not supported for : ${logger.getName}")),
+      manager.preprocess(new IncrementalAlterConfigsRequestData().
+        setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+          brokerLogger2).iterator())),
+        (_, _) => true))
+  }
+
+  @Test
+  def testPreprocessIncrementalWithLoggerChanges(): Unit = {
+    val manager = newConfigAdminManager(1)
+    val brokerLogger1 = brokerLogger1Incremental()
+    assertEquals(Collections.singletonMap(brokerLogger1,
+      new ApiError(Errors.NONE, null)),
+      manager.preprocess(new IncrementalAlterConfigsRequestData().
+        setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+          brokerLogger1).iterator())),
+        (_, _) => true))
+  }
+
+  @Test
+  def testPreprocessIncrementalWithDuplicates(): Unit = {
+    val manager = newConfigAdminManager(1)
+    val brokerLogger1a = brokerLogger1Incremental()
+    val brokerLogger1b = brokerLogger1Incremental()
+    val output = manager.preprocess(new IncrementalAlterConfigsRequestData().
+        setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+          brokerLogger1a, brokerLogger1b).iterator())),
+        (_, _) => true)
+    assertEquals(2, output.size())
+    Seq(brokerLogger1a, brokerLogger1b).foreach(r =>
+      assertEquals(new ApiError(INVALID_REQUEST, "Each resource must appear at most once."),
+        output.get(r)))
+  }
+
+  def brokerLogger1Legacy(): LAlterConfigsResource = new LAlterConfigsResource().
+    setResourceName("1").
+    setResourceType(BROKER_LOGGER.id).
+    setConfigs(new LAlterableConfigCollection(
+      util.Arrays.asList(new LAlterableConfig().setName(logger.getName).
+        setValue("INFO")).iterator()))
+
+  def broker2Legacy(): LAlterConfigsResource = new LAlterConfigsResource().
+    setResourceName("2").
+    setResourceType(BROKER.id).
+    setConfigs(new LAlterableConfigCollection(
+      util.Arrays.asList(new LAlterableConfig().setName(logger.getName).
+        setValue(null)).iterator()))
+
+  @Test
+  def testPreprocessLegacyWithBrokerLoggerChanges(): Unit = {
+    val manager = newConfigAdminManager(1)
+    val brokerLogger1 = brokerLogger1Legacy()
+    assertEquals(Collections.singletonMap(brokerLogger1,
+      new ApiError(INVALID_REQUEST, "Unknown resource type 8")),
+      manager.preprocess(new AlterConfigsRequestData().
+        setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+          brokerLogger1).iterator()))))
+  }
+
+  @Test
+  def testPreprocessLegacyWithNulls(): Unit = {
+    val manager = newConfigAdminManager(2)
+    val brokerLogger2 = broker2Legacy()
+    assertEquals(Collections.singletonMap(brokerLogger2,
+      new ApiError(INVALID_REQUEST, s"Null value not supported for : ${logger.getName}")),
+      manager.preprocess(new AlterConfigsRequestData().
+        setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+          brokerLogger2).iterator()))))
+  }
+
+  @Test
+  def testPreprocessLegacyWithDuplicates(): Unit = {
+    val manager = newConfigAdminManager(1)
+    val brokerLogger1a = brokerLogger1Legacy()
+    val brokerLogger1b = brokerLogger1Legacy()
+    val output = manager.preprocess(new AlterConfigsRequestData().
+      setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+        brokerLogger1a, brokerLogger1b).iterator())))
+    assertEquals(2, output.size())
+    Seq(brokerLogger1a, brokerLogger1b).foreach(r =>
+      assertEquals(new ApiError(INVALID_REQUEST, "Each resource must appear at most once."),
+        output.get(r)))
+  }
+
+  def unknownIncremental(): IAlterConfigsResource = new IAlterConfigsResource().
+    setResourceName("unknown").
+    setResourceType(UNKNOWN.id).
+    setConfigs(new IAlterableConfigCollection(
+      util.Arrays.asList(new IAlterableConfig().setName("foo").
+        setValue("bar").
+        setConfigOperation(OpType.SET.id())).iterator()))
+
+  def unknownLegacy(): LAlterConfigsResource = new LAlterConfigsResource().
+    setResourceName("unknown").
+    setResourceType(UNKNOWN.id).
+    setConfigs(new LAlterableConfigCollection(
+      util.Arrays.asList(new LAlterableConfig().setName("foo").
+        setValue("bar")).iterator()))
+
+  @Test
+  def testPreprocessIncrementalWithUnknownResource(): Unit = {
+    val manager = newConfigAdminManager(1)
+    val unknown = unknownIncremental()
+    assertEquals(Collections.singletonMap(unknown,
+      new ApiError(INVALID_REQUEST, "Unknown resource type 0")),
+        manager.preprocess(new IncrementalAlterConfigsRequestData().
+        setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+          unknown).iterator())),
+        (_, _) => false))
+  }
+
+  @Test
+  def testPreprocessLegacyWithUnknownResource(): Unit = {
+    val manager = newConfigAdminManager(1)
+    val unknown = unknownLegacy()
+    assertEquals(Collections.singletonMap(unknown,
+      new ApiError(INVALID_REQUEST, "Unknown resource type 0")),
+      manager.preprocess(new AlterConfigsRequestData().
+        setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+          unknown).iterator()))))
+  }
+
+  @Test
+  def testContainsDuplicates(): Unit = {
+    assertFalse(ConfigAdminManager.containsDuplicates(Seq()))
+    assertFalse(ConfigAdminManager.containsDuplicates(Seq("foo")))
+    assertTrue(ConfigAdminManager.containsDuplicates(Seq("foo", "foo")))
+    assertFalse(ConfigAdminManager.containsDuplicates(Seq("foo", "bar", "baz")))
+    assertTrue(ConfigAdminManager.containsDuplicates(Seq("foo", "bar", "baz", "foo")))
+  }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 8565385..02328ee 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -412,7 +412,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     EasyMock.expectLastCall().once()
     EasyMock.replay(handler)
 
-    val configManager = new DynamicConfigManager(zkClient, Map(ConfigType.Topic -> handler))
+    val configManager = new ZkConfigManager(zkClient, Map(ConfigType.Topic -> handler))
     // Notifications created using the old TopicConfigManager are ignored.
     configManager.ConfigChangedNotificationHandler.processNotification("not json".getBytes(StandardCharsets.UTF_8))
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 794ecea..e617eab 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -34,7 +34,7 @@ import kafka.log.AppendOrigin
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository, ZkMetadataCache}
-import kafka.utils.{MockTime, TestUtils}
+import kafka.utils.{Log4jController, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
@@ -43,9 +43,20 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic}
 import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER}
+import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection => LAlterConfigsResourceCollection}
+import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource}
+import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection => LAlterableConfigCollection}
+import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig => LAlterableConfig}
+import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
 import org.apache.kafka.common.message.DescribeConfigsResponseData.DescribeConfigsResult
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource}
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResourceCollection => IAlterConfigsResourceCollection}
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfig => IAlterableConfig}
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfigCollection => IAlterableConfigCollection}
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse}
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
@@ -84,7 +95,6 @@ import scala.jdk.CollectionConverters._
 import java.util.Arrays
 
 class KafkaApisTest {
-
   private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
   private val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
   private val replicaManager: ReplicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
@@ -466,12 +476,6 @@ class KafkaApisTest {
   }
 
   @Test
-  def testAlterConfigsWithForwarding(): Unit = {
-    val requestBuilder = new AlterConfigsRequest.Builder(Collections.emptyMap(), false)
-    testForwardableApi(ApiKeys.ALTER_CONFIGS, requestBuilder)
-  }
-
-  @Test
   def testElectLeadersForwarding(): Unit = {
     val requestBuilder = new ElectLeadersRequest.Builder(ElectionType.PREFERRED, null, 30000)
     testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder)
@@ -643,13 +647,6 @@ class KafkaApisTest {
     verify(authorizer, adminManager)
   }
 
-  @Test
-  def testIncrementalAlterConfigsWithForwarding(): Unit = {
-    val requestBuilder = new IncrementalAlterConfigsRequest.Builder(
-      new IncrementalAlterConfigsRequestData())
-    testForwardableApi(ApiKeys.INCREMENTAL_ALTER_CONFIGS, requestBuilder)
-  }
-
   private def getIncrementalAlterConfigRequestBuilder(configResources: Seq[ConfigResource]): IncrementalAlterConfigsRequest.Builder = {
     val resourceMap = configResources.map(configResource => {
       configResource -> Set(
@@ -4154,9 +4151,38 @@ class KafkaApisTest {
   }
 
   @Test
-  def testRaftShouldAlwaysForwardAlterConfigsRequest(): Unit = {
+  def testEmptyLegacyAlterConfigsRequestWithKRaft(): Unit = {
+    val request = buildRequest(new AlterConfigsRequest(new AlterConfigsRequestData(), 1.toShort));
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterConfigsRequest)
+    val capturedResponse = expectNoThrottling(request)
+    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+    createKafkaApis(raftSupport = true).handleAlterConfigsRequest(request)
+    assertEquals(new AlterConfigsResponseData(),
+      capturedResponse.getValue.asInstanceOf[AlterConfigsResponse].data())
+  }
+
+  @Test
+  def testInvalidLegacyAlterConfigsRequestWithKRaft(): Unit = {
+    val request = buildRequest(new AlterConfigsRequest(new AlterConfigsRequestData().
+      setValidateOnly(true).
+      setResources(new LAlterConfigsResourceCollection(Arrays.asList(
+        new LAlterConfigsResource().
+          setResourceName(brokerId.toString).
+          setResourceType(BROKER.id()).
+          setConfigs(new LAlterableConfigCollection(Arrays.asList(new LAlterableConfig().
+            setName("foo").
+            setValue(null)).iterator()))).iterator())), 1.toShort))
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+    val capturedResponse = expectNoThrottling(request)
+    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+    createKafkaApis(raftSupport = true).handleAlterConfigsRequest(request)
+    assertEquals(new AlterConfigsResponseData().setResponses(Arrays.asList(
+      new LAlterConfigsResourceResponse().
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Null value not supported for : foo").
+        setResourceName(brokerId.toString).
+        setResourceType(BROKER.id()))),
+      capturedResponse.getValue.asInstanceOf[AlterConfigsResponse].data())
   }
 
   @Test
@@ -4166,9 +4192,38 @@ class KafkaApisTest {
   }
 
   @Test
-  def testRaftShouldAlwaysForwardIncrementalAlterConfigsRequest(): Unit = {
+  def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = {
+    val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort));
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+    val capturedResponse = expectNoThrottling(request)
+    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+    createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest(request)
+    assertEquals(new IncrementalAlterConfigsResponseData(),
+      capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse].data())
+  }
+
+  @Test
+  def testLog4jIncrementalAlterConfigsRequestWithKRaft(): Unit = {
+    val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData().
+      setValidateOnly(true).
+      setResources(new IAlterConfigsResourceCollection(Arrays.asList(new IAlterConfigsResource().
+        setResourceName(brokerId.toString).
+        setResourceType(BROKER_LOGGER.id()).
+        setConfigs(new IAlterableConfigCollection(Arrays.asList(new IAlterableConfig().
+          setName(Log4jController.ROOT_LOGGER).
+          setValue("TRACE")).iterator()))).iterator())),
+        1.toShort))
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest)
+    val capturedResponse = expectNoThrottling(request)
+    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+    createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest(request)
+    assertEquals(new IncrementalAlterConfigsResponseData().setResponses(Arrays.asList(
+      new IAlterConfigsResourceResponse().
+        setErrorCode(0.toShort).
+        setErrorMessage(null).
+        setResourceName(brokerId.toString).
+        setResourceType(BROKER_LOGGER.id()))),
+      capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse].data())
   }
 
   @Test
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 37df567..3a7f3c4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.NotControllerException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -267,11 +268,16 @@ public final class QuorumController implements Controller {
                 case BROKER_LOGGER:
                     break;
                 case BROKER:
+                    // Cluster configs are always allowed.
+                    if (configResource.name().isEmpty()) break;
+
+                    // Otherwise, check that the broker ID is valid.
                     int brokerId;
                     try {
                         brokerId = Integer.parseInt(configResource.name());
                     } catch (NumberFormatException e) {
-                        brokerId = -1;
+                        throw new InvalidRequestException("Invalid broker name " +
+                            configResource.name());
                     }
                     if (!clusterControl.brokerRegistrations().containsKey(brokerId)) {
                         throw new BrokerIdNotRegisteredException("No broker with id " +
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 9d2cf77..84585f3 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -35,6 +35,8 @@ import java.util.stream.StreamSupport;
 import java.util.stream.IntStream;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -69,6 +71,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
+import org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.MetadataRecordSerde;
@@ -86,6 +89,8 @@ import org.junit.jupiter.api.Timeout;
 
 import static java.util.concurrent.TimeUnit.HOURS;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
 import static org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER0;
 import static org.apache.kafka.controller.ConfigurationControlManagerTest.CONFIGS;
 import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
@@ -870,4 +875,38 @@ public class QuorumControllerTest {
         }
     }
 
+    @Test
+    public void testConfigResourceExistenceChecker() throws Throwable {
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                     new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
+                QuorumController active = controlEnv.activeController();
+                registerBrokers(active, 5);
+                active.createTopics(new CreateTopicsRequestData().
+                    setTopics(new CreatableTopicCollection(Collections.singleton(
+                        new CreatableTopic().setName("foo").
+                            setReplicationFactor((short) 3).
+                            setNumPartitions(1)).iterator()))).get();
+                ConfigResourceExistenceChecker checker =
+                    active.new ConfigResourceExistenceChecker();
+                // A ConfigResource with type=BROKER and name=(empty string) represents
+                // the default broker resource. It is used to set cluster configs.
+                checker.accept(new ConfigResource(BROKER, ""));
+
+                // Broker 3 exists, so we can set a configuration for it.
+                checker.accept(new ConfigResource(BROKER, "3"));
+
+                // Broker 10 does not exist, so this should throw an exception.
+                assertThrows(BrokerIdNotRegisteredException.class,
+                    () -> checker.accept(new ConfigResource(BROKER, "10")));
+
+                // Topic foo exists, so we can set a configuration for it.
+                checker.accept(new ConfigResource(TOPIC, "foo"));
+
+                // Topic bar does not exist, so this should throw an exception.
+                assertThrows(UnknownTopicOrPartitionException.class,
+                    () -> checker.accept(new ConfigResource(TOPIC, "bar")));
+            }
+        }
+    }
 }