You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/09/15 15:54:39 UTC

[kafka] branch trunk updated: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft (#11186)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7de8a93  KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft (#11186)
7de8a93 is described below

commit 7de8a93c7e1036553ab6e738cf91bc39e154719e
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Sep 15 08:52:45 2021 -0700

    KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft (#11186)
    
    This patch fixes several problems with the `ElectLeaders` API in KRaft:
    
    - `KafkaApis` did not properly forward this request type to the controller.
    - `ControllerApis` did not handle the request type.
    - `ElectLeadersRequest.getErrorResponse` may raise NPE when `TopicPartitions` is null.
    - Controller should not do preferred election if `ElectLeaders` specifies `UNCLEAN` election.
    - Controller should not do unclean election if `ElectLeaders` specifies `PREFERRED` election.
    - Controller should use proper error codes to handle cases when desired leader is unavailable or when no election is needed because a desired leader is already elected.
    - When election for all partitions is requested (indicated with null `TopicPartitions` field), the response should not return partitions for which no election was necessary.
    
    In addition to extending the unit test coverage in `ReplicationControlManagerTest`, I have also converted `LeaderElectionCommandTest` to use KRaft.
    
    Reviewers: dengziming <sw...@163.com>, José Armando García Sancio <js...@users.noreply.github.com>, David Arthur <mu...@gmail.com>
---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   2 +-
 .../kafka/common/requests/ElectLeadersRequest.java |  24 +-
 .../kafka/server/BrokerLifecycleManager.scala      |   8 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  17 +-
 .../main/scala/kafka/server/ControllerApis.scala   |  24 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   9 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |   2 +
 core/src/test/java/kafka/test/ClusterInstance.java |   6 +
 .../test/junit/RaftClusterInvocationContext.java   |  63 ++-
 .../test/junit/ZkClusterInvocationContext.java     |  43 +-
 .../kafka/server/IntegrationTestUtils.scala        |  38 +-
 .../admin/LeaderElectionCommandErrorTest.scala     |  97 ++++
 .../kafka/admin/LeaderElectionCommandTest.scala    | 371 ++++++--------
 .../unit/kafka/server/ControllerApisTest.scala     | 105 +++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  92 ++--
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  77 ++-
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |  37 +-
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala |   7 +-
 .../kafka/controller/ClusterControlManager.java    |   6 +-
 .../kafka/controller/PartitionChangeBuilder.java   |  23 +
 .../controller/ReplicationControlManager.java      |  59 ++-
 .../org/apache/kafka/image/ClientQuotasDelta.java  |   7 +
 .../java/org/apache/kafka/image/ClusterDelta.java  |   7 +
 .../org/apache/kafka/image/ConfigurationDelta.java |   9 +
 .../apache/kafka/image/ConfigurationsDelta.java    |   7 +
 .../java/org/apache/kafka/image/FeaturesDelta.java |   7 +
 .../java/org/apache/kafka/image/MetadataDelta.java |  11 +
 .../java/org/apache/kafka/image/TopicDelta.java    |   7 +
 .../java/org/apache/kafka/image/TopicsDelta.java   |   8 +
 .../controller/PartitionChangeBuilderTest.java     |   8 +
 .../controller/ReplicationControlManagerTest.java  | 560 ++++++++++++++++-----
 31 files changed, 1210 insertions(+), 531 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 423093a..428e4a8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -84,7 +84,7 @@ public enum ApiKeys {
     EXPIRE_DELEGATION_TOKEN(ApiMessageType.EXPIRE_DELEGATION_TOKEN, false, true),
     DESCRIBE_DELEGATION_TOKEN(ApiMessageType.DESCRIBE_DELEGATION_TOKEN),
     DELETE_GROUPS(ApiMessageType.DELETE_GROUPS),
-    ELECT_LEADERS(ApiMessageType.ELECT_LEADERS),
+    ELECT_LEADERS(ApiMessageType.ELECT_LEADERS, false, true),
     INCREMENTAL_ALTER_CONFIGS(ApiMessageType.INCREMENTAL_ALTER_CONFIGS, false, true),
     ALTER_PARTITION_REASSIGNMENTS(ApiMessageType.ALTER_PARTITION_REASSIGNMENTS, false, true),
     LIST_PARTITION_REASSIGNMENTS(ApiMessageType.LIST_PARTITION_REASSIGNMENTS, false, true),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
index 1bc888a..febb030 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
@@ -103,20 +103,22 @@ public class ElectLeadersRequest extends AbstractRequest {
         ApiError apiError = ApiError.fromThrowable(e);
         List<ReplicaElectionResult> electionResults = new ArrayList<>();
 
-        for (TopicPartitions topic : data.topicPartitions()) {
-            ReplicaElectionResult electionResult = new ReplicaElectionResult();
+        if (data.topicPartitions() != null) {
+            for (TopicPartitions topic : data.topicPartitions()) {
+                ReplicaElectionResult electionResult = new ReplicaElectionResult();
 
-            electionResult.setTopic(topic.topic());
-            for (Integer partitionId : topic.partitions()) {
-                PartitionResult partitionResult = new PartitionResult();
-                partitionResult.setPartitionId(partitionId);
-                partitionResult.setErrorCode(apiError.error().code());
-                partitionResult.setErrorMessage(apiError.message());
+                electionResult.setTopic(topic.topic());
+                for (Integer partitionId : topic.partitions()) {
+                    PartitionResult partitionResult = new PartitionResult();
+                    partitionResult.setPartitionId(partitionId);
+                    partitionResult.setErrorCode(apiError.error().code());
+                    partitionResult.setErrorMessage(apiError.message());
 
-                electionResult.partitionResult().add(partitionResult);
-            }
+                    electionResult.partitionResult().add(partitionResult);
+                }
 
-            electionResults.add(electionResult);
+                electionResults.add(electionResult);
+            }
         }
 
         return new ElectLeadersResponse(throttleTimeMs, apiError.error().code(), electionResults, version());
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index e15a3e6..394c353 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -208,7 +208,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
           _state = BrokerState.PENDING_CONTROLLED_SHUTDOWN
           // Send the next heartbeat immediately in order to let the controller
           // begin processing the controlled shutdown as soon as possible.
-          scheduleNextCommunication(0)
+          scheduleNextCommunicationImmediately()
 
         case _ =>
           info(s"Skipping controlled shutdown because we are in state ${_state}.")
@@ -284,8 +284,8 @@ class BrokerLifecycleManager(val config: KafkaConfig,
         setIncarnationId(incarnationId).
         setListeners(_advertisedListeners).
         setRack(rack.orNull)
-    if (isTraceEnabled) {
-      trace(s"Sending broker registration ${data}")
+    if (isDebugEnabled) {
+      debug(s"Sending broker registration ${data}")
     }
     _channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data),
       new BrokerRegistrationResponseHandler())
@@ -406,7 +406,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
                   scheduleNextCommunicationAfterSuccess()
                 }
               } else {
-                info(s"The controlled has asked us to exit controlled shutdown.")
+                info(s"The controller has asked us to exit controlled shutdown.")
                 beginShutdown()
               }
               gotControlledShutdownResponse = true
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index d2079c4..533036f 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -17,11 +17,11 @@
 
 package kafka.server
 
+import java.net.InetAddress
 import java.util
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
-import java.net.InetAddress
 
 import kafka.cluster.Broker.ServerInfo
 import kafka.coordinator.group.GroupCoordinator
@@ -34,7 +34,6 @@ import kafka.security.CredentialProvider
 import kafka.server.KafkaRaftServer.ControllerRole
 import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, SnapshotWriterBuilder}
 import kafka.utils.{CoreUtils, KafkaScheduler}
-import org.apache.kafka.snapshot.SnapshotWriter
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
 import org.apache.kafka.common.metrics.Metrics
@@ -45,14 +44,15 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
 import org.apache.kafka.common.{ClusterResource, Endpoint}
 import org.apache.kafka.metadata.{BrokerState, VersionRange}
-import org.apache.kafka.raft.{RaftClient, RaftConfig}
 import org.apache.kafka.raft.RaftConfig.AddressSpec
+import org.apache.kafka.raft.{RaftClient, RaftConfig}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.SnapshotWriter
 
 import scala.collection.{Map, Seq}
-import scala.jdk.CollectionConverters._
 import scala.compat.java8.OptionConverters._
+import scala.jdk.CollectionConverters._
 
 
 class BrokerSnapshotWriterBuilder(raftClient: RaftClient[ApiMessageAndVersion])
@@ -92,8 +92,7 @@ class BrokerServer(
 
   this.logIdent = logContext.logPrefix
 
-  val lifecycleManager: BrokerLifecycleManager =
-    new BrokerLifecycleManager(config, time, threadNamePrefix)
+  @volatile private var lifecycleManager: BrokerLifecycleManager = null
 
   private val isShuttingDown = new AtomicBoolean(false)
 
@@ -105,7 +104,7 @@ class BrokerServer(
   var controlPlaneRequestProcessor: KafkaApis = null
 
   var authorizer: Option[Authorizer] = None
-  var socketServer: SocketServer = null
+  @volatile var socketServer: SocketServer = null
   var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
 
   var logDirFailureChannel: LogDirFailureChannel = null
@@ -162,6 +161,8 @@ class BrokerServer(
     lock.lock()
     try {
       if (status != from) return false
+      info(s"Transition from $status to $to")
+
       status = to
       if (to == SHUTTING_DOWN) {
         isShuttingDown.set(true)
@@ -182,6 +183,8 @@ class BrokerServer(
     try {
       info("Starting broker")
 
+      lifecycleManager = new BrokerLifecycleManager(config, time, threadNamePrefix)
+
       /* start scheduler */
       kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
       kafkaScheduler.startup()
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 2f86e1f..ed9b55a 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -20,8 +20,8 @@ package kafka.server
 import java.util
 import java.util.Collections
 import java.util.Map.Entry
-import java.util.concurrent.{CompletableFuture, ExecutionException}
 import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS}
+import java.util.concurrent.{CompletableFuture, ExecutionException}
 
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
@@ -36,11 +36,10 @@ import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
 import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
 import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
-import org.apache.kafka.common.message.CreateTopicsRequestData
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
-import org.apache.kafka.common.message._
+import org.apache.kafka.common.message.{CreateTopicsRequestData, _}
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
 import org.apache.kafka.common.requests._
@@ -108,6 +107,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)
         case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
         case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request)
+        case ApiKeys.ELECT_LEADERS => handleElectLeaders(request)
         case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
       }
     } catch {
@@ -488,6 +488,24 @@ class ControllerApis(val requestChannel: RequestChannel,
     handleRaftRequest(request, response => new DescribeQuorumResponse(response.asInstanceOf[DescribeQuorumResponseData]))
   }
 
+  def handleElectLeaders(request: RequestChannel.Request): Unit = {
+    authHelper.authorizeClusterOperation(request, ALTER)
+
+    val electLeadersRequest = request.body[ElectLeadersRequest]
+    val future = controller.electLeaders(electLeadersRequest.data)
+    future.whenComplete { (responseData, exception) =>
+      if (exception != null) {
+        requestHelper.sendResponseMaybeThrottle(request, throttleMs => {
+          electLeadersRequest.getErrorResponse(throttleMs, exception)
+        })
+      } else {
+        requestHelper.sendResponseMaybeThrottle(request, throttleMs => {
+          new ElectLeadersResponse(responseData.setThrottleTimeMs(throttleMs))
+        })
+      }
+    }
+  }
+
   def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
     val alterIsrRequest = request.body[AlterIsrRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c2e0c15..e20a36a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -209,7 +209,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest)
         case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
         case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal)
-        case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
+        case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders)
         case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
         case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
         case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
@@ -2993,9 +2993,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       true
   }
 
-  def handleElectReplicaLeader(request: RequestChannel.Request): Unit = {
-    val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.notYetSupported(request))
-
+  def handleElectLeaders(request: RequestChannel.Request): Unit = {
+    val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
     val electionRequest = request.body[ElectLeadersRequest]
 
     def sendResponseCallback(
@@ -3006,7 +3005,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
         val adjustedResults = if (electionRequest.data.topicPartitions == null) {
           /* When performing elections across all of the partitions we should only return
-           * partitions for which there was an eleciton or resulted in an error. In other
+           * partitions for which there was an election or resulted in an error. In other
            * words, partitions that didn't need election because they ready have the correct
            * leader are not returned to the client.
            */
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 4fe4938..cedef47 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
                        delta: MetadataDelta,
                        newImage: MetadataImage): Unit = {
     try {
+      trace(s"Publishing delta $delta with highest offset $newHighestMetadataOffset")
+
       // Publish the new metadata image to the metadata cache.
       metadataCache.setImage(newImage)
 
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
index 021db5a..23b417e 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -95,5 +95,11 @@ public interface ClusterInstance {
 
     void stop();
 
+    void shutdownBroker(int brokerId);
+
+    void startBroker(int brokerId);
+
     void rollingBrokerRestart();
+
+    void waitForReadyBrokers() throws InterruptedException;
 }
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index c60e0ec..599bdf0 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -27,6 +27,7 @@ import kafka.testkit.TestKitNodes;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.metadata.BrokerState;
 import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
 import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
@@ -36,10 +37,14 @@ import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test invocation. Each instance of this
@@ -73,6 +78,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
 
     @Override
     public List<Extension> getAdditionalExtensions() {
+        RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, clusterConfig);
         return Arrays.asList(
             (BeforeTestExecutionCallback) context -> {
                 TestKitNodes nodes = new TestKitNodes.Builder().
@@ -97,8 +103,8 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
                     org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
                     100L);
             },
-            (AfterTestExecutionCallback) context -> clusterReference.get().close(),
-            new ClusterInstanceParameterResolver(new RaftClusterInstance(clusterReference, clusterConfig)),
+            (AfterTestExecutionCallback) context -> clusterInstance.stop(),
+            new ClusterInstanceParameterResolver(clusterInstance),
             new GenericParameterResolver<>(clusterConfig, ClusterConfig.class)
         );
     }
@@ -109,6 +115,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
         private final ClusterConfig clusterConfig;
         final AtomicBoolean started = new AtomicBoolean(false);
         final AtomicBoolean stopped = new AtomicBoolean(false);
+        private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue<>();
 
         RaftClusterInstance(AtomicReference<KafkaClusterTestKit> clusterReference, ClusterConfig clusterConfig) {
             this.clusterReference = clusterReference;
@@ -122,7 +129,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
 
         @Override
         public Collection<SocketServer> brokerSocketServers() {
-            return clusterReference.get().brokers().values().stream()
+            return brokers()
                 .map(BrokerServer::socketServer)
                 .collect(Collectors.toList());
         }
@@ -134,14 +141,14 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
 
         @Override
         public Collection<SocketServer> controllerSocketServers() {
-            return clusterReference.get().controllers().values().stream()
+            return controllers()
                 .map(ControllerServer::socketServer)
                 .collect(Collectors.toList());
         }
 
         @Override
         public SocketServer anyBrokerSocketServer() {
-            return clusterReference.get().brokers().values().stream()
+            return brokers()
                 .map(BrokerServer::socketServer)
                 .findFirst()
                 .orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
@@ -149,7 +156,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
 
         @Override
         public SocketServer anyControllerSocketServer() {
-            return clusterReference.get().controllers().values().stream()
+            return controllers()
                 .map(ControllerServer::socketServer)
                 .findFirst()
                 .orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
@@ -172,7 +179,9 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
 
         @Override
         public Admin createAdminClient(Properties configOverrides) {
-            return Admin.create(clusterReference.get().clientProperties());
+            Admin admin = Admin.create(clusterReference.get().clientProperties());
+            admins.add(admin);
+            return admin;
         }
 
         @Override
@@ -189,11 +198,27 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
         @Override
         public void stop() {
             if (stopped.compareAndSet(false, true)) {
-                try {
-                    clusterReference.get().close();
-                } catch (Exception e) {
-                    throw new RuntimeException("Failed to stop Raft server", e);
-                }
+                admins.forEach(admin -> Utils.closeQuietly(admin, "admin"));
+                Utils.closeQuietly(clusterReference.get(), "cluster");
+            }
+        }
+
+        @Override
+        public void shutdownBroker(int brokerId) {
+            findBrokerOrThrow(brokerId).shutdown();
+        }
+
+        @Override
+        public void startBroker(int brokerId) {
+            findBrokerOrThrow(brokerId).startup();
+        }
+
+        @Override
+        public void waitForReadyBrokers() throws InterruptedException {
+            try {
+                clusterReference.get().waitForReadyBrokers();
+            } catch (ExecutionException e) {
+                throw new AssertionError("Failed while waiting for brokers to become ready", e);
             }
         }
 
@@ -201,5 +226,19 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
         public void rollingBrokerRestart() {
             throw new UnsupportedOperationException("Restarting Raft servers is not yet supported.");
         }
+
+        private BrokerServer findBrokerOrThrow(int brokerId) {
+            return Optional.ofNullable(clusterReference.get().brokers().get(brokerId))
+                .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
+        }
+
+        private Stream<BrokerServer> brokers() {
+            return clusterReference.get().brokers().values().stream();
+        }
+
+        private Stream<ControllerServer> controllers() {
+            return clusterReference.get().controllers().values().stream();
+        }
+
     }
 }
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index cd8cdc1..4d4323f 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -21,12 +21,12 @@ import kafka.api.IntegrationTestHarness;
 import kafka.network.SocketServer;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
 import kafka.utils.TestUtils;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
-import kafka.test.ClusterConfig;
-import kafka.test.ClusterInstance;
 import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
 import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
 import org.junit.jupiter.api.extension.Extension;
@@ -44,6 +44,7 @@ import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Wraps a {@link IntegrationTestHarness} inside lifecycle methods for a test invocation. Each instance of this
@@ -193,7 +194,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
 
         @Override
         public Collection<SocketServer> brokerSocketServers() {
-            return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+            return servers()
                     .map(KafkaServer::socketServer)
                     .collect(Collectors.toList());
         }
@@ -205,7 +206,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
 
         @Override
         public Collection<SocketServer> controllerSocketServers() {
-            return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+            return servers()
                 .filter(broker -> broker.kafkaController().isActive())
                 .map(KafkaServer::socketServer)
                 .collect(Collectors.toList());
@@ -213,7 +214,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
 
         @Override
         public SocketServer anyBrokerSocketServer() {
-            return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+            return servers()
                 .map(KafkaServer::socketServer)
                 .findFirst()
                 .orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
@@ -221,7 +222,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
 
         @Override
         public SocketServer anyControllerSocketServer() {
-            return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+            return servers()
                 .filter(broker -> broker.kafkaController().isActive())
                 .map(KafkaServer::socketServer)
                 .findFirst()
@@ -263,6 +264,16 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
         }
 
         @Override
+        public void shutdownBroker(int brokerId) {
+            findBrokerOrThrow(brokerId).shutdown();
+        }
+
+        @Override
+        public void startBroker(int brokerId) {
+            findBrokerOrThrow(brokerId).startup();
+        }
+
+        @Override
         public void rollingBrokerRestart() {
             if (!started.get()) {
                 throw new IllegalStateException("Tried to restart brokers but the cluster has not been started!");
@@ -272,5 +283,25 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
             }
             clusterReference.get().restartDeadBrokers(true);
         }
+
+        @Override
+        public void waitForReadyBrokers() throws InterruptedException {
+            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
+                int numRegisteredBrokers = clusterReference.get().zkClient().getAllBrokersInCluster().size();
+                return numRegisteredBrokers == config.numBrokers();
+            }, "Timed out while waiting for brokers to become ready");
+        }
+
+        private KafkaServer findBrokerOrThrow(int brokerId) {
+            return servers()
+                .filter(server -> server.config().brokerId() == brokerId)
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
+        }
+
+        private Stream<KafkaServer> servers() {
+            return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream();
+        }
+
     }
 }
diff --git a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
index 9a21e7f..203e181 100644
--- a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
@@ -17,19 +17,23 @@
 
 package kafka.server
 
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.nio.ByteBuffer
+import java.util.{Collections, Properties}
+
 import kafka.network.SocketServer
+import kafka.utils.Implicits._
 import kafka.utils.{NotNothing, TestUtils}
+import org.apache.kafka.clients.admin.{Admin, NewTopic}
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, ResponseHeader}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
 
-import java.io.{DataInputStream, DataOutputStream}
-import java.net.Socket
-import java.nio.ByteBuffer
-import java.util.Properties
 import scala.annotation.nowarn
+import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
 object IntegrationTestUtils {
@@ -101,6 +105,32 @@ object IntegrationTestUtils {
     finally socket.close()
   }
 
+  def createTopic(
+    admin: Admin,
+    topic: String,
+    numPartitions: Int,
+    replicationFactor: Short
+  ): Unit = {
+    val newTopics = Collections.singletonList(new NewTopic(topic, numPartitions, replicationFactor))
+    val createTopicResult = admin.createTopics(newTopics)
+    createTopicResult.all().get()
+  }
+
+  def createTopic(
+    admin: Admin,
+    topic: String,
+    replicaAssignment: Map[Int, Seq[Int]]
+  ): Unit = {
+    val javaAssignment = new java.util.HashMap[Integer, java.util.List[Integer]]()
+    replicaAssignment.forKeyValue { (partitionId, assignment) =>
+      javaAssignment.put(partitionId, assignment.map(Int.box).asJava)
+    }
+    val newTopic = new NewTopic(topic, javaAssignment)
+    val newTopics = Collections.singletonList(newTopic)
+    val createTopicResult = admin.createTopics(newTopics)
+    createTopicResult.all().get()
+  }
+
   protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
   private var correlationId = 0
 
diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
new file mode 100644
index 0000000..eaef936
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.common.AdminCommandFailedException
+import org.apache.kafka.common.errors.TimeoutException
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.concurrent.duration._
+
+/**
+ * For some error cases, we can save a little build time by avoiding the overhead for
+ * cluster creation and cleanup because the command is expected to fail immediately.
+ */
+class LeaderElectionCommandErrorTest {
+
+  @Test
+  def testTopicWithoutPartition(): Unit = {
+    val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", "nohost:9092",
+        "--election-type", "unclean",
+        "--topic", "some-topic"
+      )
+    ))
+    assertTrue(e.getMessage.startsWith("Missing required option(s)"))
+    assertTrue(e.getMessage.contains(" partition"))
+  }
+
+  @Test
+  def testPartitionWithoutTopic(): Unit = {
+    val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", "nohost:9092",
+        "--election-type", "unclean",
+        "--all-topic-partitions",
+        "--partition", "0"
+      )
+    ))
+    assertEquals("Option partition is only allowed if topic is used", e.getMessage)
+  }
+
+  @Test
+  def testMissingElectionType(): Unit = {
+    val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", "nohost:9092",
+        "--topic", "some-topic",
+        "--partition", "0"
+      )
+    ))
+    assertTrue(e.getMessage.startsWith("Missing required option(s)"))
+    assertTrue(e.getMessage.contains(" election-type"))
+  }
+
+  @Test
+  def testMissingTopicPartitionSelection(): Unit = {
+    val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", "nohost:9092",
+        "--election-type", "preferred"
+      )
+    ))
+    assertTrue(e.getMessage.startsWith("One and only one of the following options is required: "))
+    assertTrue(e.getMessage.contains(" all-topic-partitions"))
+    assertTrue(e.getMessage.contains(" topic"))
+    assertTrue(e.getMessage.contains(" path-to-json-file"))
+  }
+
+  @Test
+  def testInvalidBroker(): Unit = {
+    val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.run(
+      Array(
+        "--bootstrap-server", "example.com:1234",
+        "--election-type", "unclean",
+        "--all-topic-partitions"
+      ),
+      1.seconds
+    ))
+    assertTrue(e.getCause.isInstanceOf[TimeoutException])
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
index a7b91ea..b942f6f 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
@@ -18,214 +18,176 @@ package kafka.admin
 
 import java.io.File
 import java.nio.charset.StandardCharsets
-import java.nio.file.Files
-import java.nio.file.Path
+import java.nio.file.{Files, Path}
 
 import kafka.common.AdminCommandFailedException
-import kafka.server.KafkaConfig
-import kafka.server.KafkaServer
+import kafka.server.IntegrationTestUtils.createTopic
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.{ClusterConfig, ClusterInstance}
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
+import org.apache.kafka.clients.admin.AdminClientConfig
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 import org.apache.kafka.common.network.ListenerName
-import org.junit.jupiter.api.AfterEach
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{BeforeEach, Tag}
 
-import scala.jdk.CollectionConverters._
-import scala.collection.Seq
-import scala.concurrent.duration._
-
-final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 3)
+@Tag("integration")
+final class LeaderElectionCommandTest(cluster: ClusterInstance) {
   import LeaderElectionCommandTest._
 
-  var servers = Seq.empty[KafkaServer]
   val broker1 = 0
   val broker2 = 1
   val broker3 = 2
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
-
-    val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
-    servers = brokerConfigs.map { config =>
-      config.setProperty("auto.leader.rebalance.enable", "false")
-      config.setProperty("controlled.shutdown.enable", "true")
-      config.setProperty("controlled.shutdown.max.retries", "1")
-      config.setProperty("controlled.shutdown.retry.backoff.ms", "1000")
-      TestUtils.createServer(KafkaConfig.fromProps(config))
-    }
+  def setup(clusterConfig: ClusterConfig): Unit = {
+    TestUtils.verifyNoUnexpectedThreads("@BeforeEach")
+    clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
+    clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp, "true")
+    clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp, "1")
+    clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "1000")
+    clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
   }
 
-  @AfterEach
-  override def tearDown(): Unit = {
-    TestUtils.shutdownServers(servers)
-
-    super.tearDown()
-  }
-
-  @Test
+  @ClusterTest
   def testAllTopicPartition(): Unit = {
-    TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
-      val topic = "unclean-topic"
-      val partition = 0
-      val assignment = Seq(broker2, broker3)
-
-      TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
+    val client = cluster.createAdminClient()
+    val topic = "unclean-topic"
+    val partition = 0
+    val assignment = Seq(broker2, broker3)
 
-      val topicPartition = new TopicPartition(topic, partition)
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(partition -> assignment))
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
+    val topicPartition = new TopicPartition(topic, partition)
 
-      servers(broker3).shutdown()
-      TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
-      servers(broker2).shutdown()
-      TestUtils.assertNoLeader(client, topicPartition)
-      servers(broker3).startup()
+    TestUtils.assertLeader(client, topicPartition, broker2)
+    cluster.shutdownBroker(broker3)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertNoLeader(client, topicPartition)
+    cluster.startBroker(broker3)
+    TestUtils.waitForOnlineBroker(client, broker3)
 
-      LeaderElectionCommand.main(
-        Array(
-          "--bootstrap-server", bootstrapServers(servers),
-          "--election-type", "unclean",
-          "--all-topic-partitions"
-        )
+    LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "unclean",
+        "--all-topic-partitions"
       )
+    )
 
-      TestUtils.assertLeader(client, topicPartition, broker3)
-    }
+    TestUtils.assertLeader(client, topicPartition, broker3)
   }
 
-  @Test
+  @ClusterTest
   def testTopicPartition(): Unit = {
-    TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
-      val topic = "unclean-topic"
-      val partition = 0
-      val assignment = Seq(broker2, broker3)
+    val client = cluster.createAdminClient()
+    val topic = "unclean-topic"
+    val partition = 0
+    val assignment = Seq(broker2, broker3)
 
-      TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(partition -> assignment))
 
-      val topicPartition = new TopicPartition(topic, partition)
+    val topicPartition = new TopicPartition(topic, partition)
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
+    TestUtils.assertLeader(client, topicPartition, broker2)
 
-      servers(broker3).shutdown()
-      TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
-      servers(broker2).shutdown()
-      TestUtils.assertNoLeader(client, topicPartition)
-      servers(broker3).startup()
+    cluster.shutdownBroker(broker3)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertNoLeader(client, topicPartition)
+    cluster.startBroker(broker3)
+    TestUtils.waitForOnlineBroker(client, broker3)
 
-      LeaderElectionCommand.main(
-        Array(
-          "--bootstrap-server", bootstrapServers(servers),
-          "--election-type", "unclean",
-          "--topic", topic,
-          "--partition", partition.toString
-        )
+    LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "unclean",
+        "--topic", topic,
+        "--partition", partition.toString
       )
+    )
 
-      TestUtils.assertLeader(client, topicPartition, broker3)
-    }
+    TestUtils.assertLeader(client, topicPartition, broker3)
   }
 
-  @Test
+  @ClusterTest
   def testPathToJsonFile(): Unit = {
-    TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
-      val topic = "unclean-topic"
-      val partition = 0
-      val assignment = Seq(broker2, broker3)
+    val client = cluster.createAdminClient()
+    val topic = "unclean-topic"
+    val partition = 0
+    val assignment = Seq(broker2, broker3)
 
-      TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(partition -> assignment))
 
-      val topicPartition = new TopicPartition(topic, partition)
+    val topicPartition = new TopicPartition(topic, partition)
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
+    TestUtils.assertLeader(client, topicPartition, broker2)
 
-      servers(broker3).shutdown()
-      TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
-      servers(broker2).shutdown()
-      TestUtils.assertNoLeader(client, topicPartition)
-      servers(broker3).startup()
+    cluster.shutdownBroker(broker3)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertNoLeader(client, topicPartition)
+    cluster.startBroker(broker3)
+    TestUtils.waitForOnlineBroker(client, broker3)
 
-      val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
+    val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
 
-      LeaderElectionCommand.main(
-        Array(
-          "--bootstrap-server", bootstrapServers(servers),
-          "--election-type", "unclean",
-          "--path-to-json-file", topicPartitionPath.toString
-        )
+    LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "unclean",
+        "--path-to-json-file", topicPartitionPath.toString
       )
+    )
 
-      TestUtils.assertLeader(client, topicPartition, broker3)
-    }
+    TestUtils.assertLeader(client, topicPartition, broker3)
   }
 
-  @Test
+  @ClusterTest
   def testPreferredReplicaElection(): Unit = {
-    TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
-      val topic = "unclean-topic"
-      val partition = 0
-      val assignment = Seq(broker2, broker3)
+    val client = cluster.createAdminClient()
+    val topic = "preferred-topic"
+    val partition = 0
+    val assignment = Seq(broker2, broker3)
 
-      TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(partition -> assignment))
 
-      val topicPartition = new TopicPartition(topic, partition)
+    val topicPartition = new TopicPartition(topic, partition)
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
+    TestUtils.assertLeader(client, topicPartition, broker2)
 
-      servers(broker2).shutdown()
-      TestUtils.assertLeader(client, topicPartition, broker3)
-      servers(broker2).startup()
-      TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertLeader(client, topicPartition, broker3)
+    cluster.startBroker(broker2)
+    TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
 
-      LeaderElectionCommand.main(
-        Array(
-          "--bootstrap-server", bootstrapServers(servers),
-          "--election-type", "preferred",
-          "--all-topic-partitions"
-        )
-      )
-
-      TestUtils.assertLeader(client, topicPartition, broker2)
-    }
-  }
-
-  @Test
-  def testTopicWithoutPartition(): Unit = {
-    val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
+    LeaderElectionCommand.main(
       Array(
-        "--bootstrap-server", bootstrapServers(servers),
-        "--election-type", "unclean",
-        "--topic", "some-topic"
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "preferred",
+        "--all-topic-partitions"
       )
-    ))
-    assertTrue(e.getMessage.startsWith("Missing required option(s)"))
-    assertTrue(e.getMessage.contains(" partition"))
-  }
+    )
 
-  @Test
-  def testPartitionWithoutTopic(): Unit = {
-    val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
-      Array(
-        "--bootstrap-server", bootstrapServers(servers),
-        "--election-type", "unclean",
-        "--all-topic-partitions",
-        "--partition", "0"
-      )
-    ))
-    assertEquals("Option partition is only allowed if topic is used", e.getMessage)
+    TestUtils.assertLeader(client, topicPartition, broker2)
   }
 
-  @Test
+  @ClusterTest
   def testTopicDoesNotExist(): Unit = {
     val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.main(
       Array(
-        "--bootstrap-server", bootstrapServers(servers),
+        "--bootstrap-server", cluster.bootstrapServers(),
         "--election-type", "preferred",
         "--topic", "unknown-topic-name",
         "--partition", "0"
@@ -234,86 +196,55 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
     assertTrue(e.getSuppressed()(0).isInstanceOf[UnknownTopicOrPartitionException])
   }
 
-  @Test
-  def testMissingElectionType(): Unit = {
-    val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
-      Array(
-        "--bootstrap-server", bootstrapServers(servers),
-        "--topic", "some-topic",
-        "--partition", "0"
-      )
+  @ClusterTest
+  def testElectionResultOutput(): Unit = {
+    val client = cluster.createAdminClient()
+    val topic = "non-preferred-topic"
+    val partition0 = 0
+    val partition1 = 1
+    val assignment0 = Seq(broker2, broker3)
+    val assignment1 = Seq(broker3, broker2)
+
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(
+      partition0 -> assignment0,
+      partition1 -> assignment1
     ))
-    assertTrue(e.getMessage.startsWith("Missing required option(s)"))
-    assertTrue(e.getMessage.contains(" election-type"))
-  }
 
-  @Test
-  def testMissingTopicPartitionSelection(): Unit = {
-    val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
-      Array(
-        "--bootstrap-server", bootstrapServers(servers),
-        "--election-type", "preferred"
-      )
-    ))
-    assertTrue(e.getMessage.startsWith("One and only one of the following options is required: "))
-    assertTrue(e.getMessage.contains(" all-topic-partitions"))
-    assertTrue(e.getMessage.contains(" topic"))
-    assertTrue(e.getMessage.contains(" path-to-json-file"))
-  }
+    val topicPartition0 = new TopicPartition(topic, partition0)
+    val topicPartition1 = new TopicPartition(topic, partition1)
 
-  @Test
-  def testInvalidBroker(): Unit = {
-    val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.run(
-      Array(
-        "--bootstrap-server", "example.com:1234",
-        "--election-type", "unclean",
-        "--all-topic-partitions"
-      ),
-      1.seconds
-    ))
-    assertTrue(e.getCause.isInstanceOf[TimeoutException])
-  }
+    TestUtils.assertLeader(client, topicPartition0, broker2)
+    TestUtils.assertLeader(client, topicPartition1, broker3)
 
-  @Test
-  def testElectionResultOutput(): Unit = {
-    TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
-      val topic = "non-preferred-topic"
-      val partition0 = 0
-      val partition1 = 1
-      val assignment0 = Seq(broker2, broker3)
-      val assignment1 = Seq(broker3, broker2)
-
-      TestUtils.createTopic(zkClient, topic, Map(partition0 -> assignment0, partition1 -> assignment1), servers)
-
-      val topicPartition0 = new TopicPartition(topic, partition0)
-      val topicPartition1 = new TopicPartition(topic, partition1)
-
-      TestUtils.assertLeader(client, topicPartition0, broker2)
-      TestUtils.assertLeader(client, topicPartition1, broker3)
-
-      servers(broker2).shutdown()
-      TestUtils.assertLeader(client, topicPartition0, broker3)
-      servers(broker2).startup()
-      TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2))
-      TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2))
-
-      val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, topicPartition1))
-      val output = TestUtils.grabConsoleOutput(
-        LeaderElectionCommand.main(
-          Array(
-            "--bootstrap-server", bootstrapServers(servers),
-            "--election-type", "preferred",
-            "--path-to-json-file", topicPartitionPath.toString
-          )
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertLeader(client, topicPartition0, broker3)
+    cluster.startBroker(broker2)
+    TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2))
+    TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2))
+
+    val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, topicPartition1))
+    val output = TestUtils.grabConsoleOutput(
+      LeaderElectionCommand.main(
+        Array(
+          "--bootstrap-server", cluster.bootstrapServers(),
+          "--election-type", "preferred",
+          "--path-to-json-file", topicPartitionPath.toString
         )
       )
+    )
+
+    val electionResultOutputIter = output.split("\n").iterator
+
+    assertTrue(electionResultOutputIter.hasNext)
+    val firstLine = electionResultOutputIter.next()
+    assertTrue(firstLine.contains(s"Successfully completed leader election (PREFERRED) for partitions $topicPartition0"),
+    s"Unexpected output: $firstLine")
 
-      val electionResultOutputIter = output.split("\n").iterator
-      assertTrue(electionResultOutputIter.hasNext)
-      assertTrue(electionResultOutputIter.next().contains(s"Successfully completed leader election (PREFERRED) for partitions $topicPartition0"))
-      assertTrue(electionResultOutputIter.hasNext)
-      assertTrue(electionResultOutputIter.next().contains(s"Valid replica already elected for partitions $topicPartition1"))
-    }
+    assertTrue(electionResultOutputIter.hasNext)
+    val secondLine = electionResultOutputIter.next()
+    assertTrue(secondLine.contains(s"Valid replica already elected for partitions $topicPartition1"),
+    s"Unexpected output: $secondLine")
   }
 }
 
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 768c06a..318e17e 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -19,43 +19,40 @@ package kafka.server
 
 import java.net.InetAddress
 import java.util
+import java.util.Collections.singletonList
 import java.util.Properties
-import java.util.concurrent.CompletableFuture
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.{CompletableFuture, ExecutionException}
+
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.test.MockController
-import kafka.utils.MockTime
+import kafka.utils.{MockTime, NotNothing}
 import org.apache.kafka.clients.admin.AlterConfigOp
-import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.Uuid.ZERO_UUID
+import org.apache.kafka.common.acl.AclOperation
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => OldAlterConfigsResource, AlterConfigsResourceCollection => OldAlterConfigsResourceCollection, AlterableConfig => OldAlterableConfig, AlterableConfigCollection => OldAlterableConfigCollection}
+import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
 import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
-import org.apache.kafka.common.message.CreateTopicsRequestData
-import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
-import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
+import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
 import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult
-import org.apache.kafka.common.message._
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterConfigsResourceCollection, AlterableConfig, AlterableConfigCollection}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection => OldAlterConfigsResourceCollection}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => OldAlterConfigsResource}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection => OldAlterableConfigCollection}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig => OldAlterableConfig}
-import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
+import org.apache.kafka.common.message.{CreateTopicsRequestData, _}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.protocol.ApiMessage
 import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
 import org.apache.kafka.common.requests._
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.{ElectionType, Uuid}
 import org.apache.kafka.controller.Controller
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
 import org.apache.kafka.server.common.ApiMessageAndVersion
@@ -65,7 +62,9 @@ import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito._
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
 
+import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
 
 class ControllerApisTest {
   private val nodeId = 1
@@ -136,12 +135,11 @@ class ControllerApisTest {
 
   def createDenyAllAuthorizer(): Authorizer = {
     val authorizer = mock(classOf[Authorizer])
-    mock(classOf[Authorizer])
     when(authorizer.authorize(
       any(classOf[AuthorizableRequestContext]),
       any(classOf[java.util.List[Action]])
     )).thenReturn(
-      java.util.Collections.singletonList(AuthorizationResult.DENIED)
+      singletonList(AuthorizationResult.DENIED)
     )
     authorizer
   }
@@ -732,6 +730,79 @@ class ControllerApisTest {
       controllerApis.createPartitions(request, false, _ => Set("foo", "bar")).get().asScala.toSet)
   }
 
+  @Test
+  def testElectLeadersAuthorization(): Unit = {
+    val authorizer = mock(classOf[Authorizer])
+    val controller = mock(classOf[Controller])
+    val controllerApis = createControllerApis(Some(authorizer), controller)
+
+    val request = new ElectLeadersRequest.Builder(
+      ElectionType.PREFERRED,
+      null,
+      30000
+    ).build()
+
+    val resource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
+    val actions = singletonList(new Action(AclOperation.ALTER, resource, 1, true, true))
+
+    when(authorizer.authorize(
+      any[RequestContext],
+      ArgumentMatchers.eq(actions)
+    )).thenReturn(singletonList(AuthorizationResult.DENIED))
+
+    val response = handleRequest[ElectLeadersResponse](request, controllerApis)
+    assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, Errors.forCode(response.data.errorCode))
+  }
+
+  @Test
+  def testElectLeadersHandledByController(): Unit = {
+    val controller = mock(classOf[Controller])
+    val controllerApis = createControllerApis(None, controller)
+
+    val request = new ElectLeadersRequest.Builder(
+      ElectionType.PREFERRED,
+      null,
+      30000
+    ).build()
+
+    val responseData = new ElectLeadersResponseData()
+        .setErrorCode(Errors.NOT_CONTROLLER.code)
+
+    when(controller.electLeaders(
+      request.data
+    )).thenReturn(CompletableFuture.completedFuture(responseData))
+
+    val response = handleRequest[ElectLeadersResponse](request, controllerApis)
+    assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(response.data.errorCode))
+  }
+
+  private def handleRequest[T <: AbstractResponse](
+    request: AbstractRequest,
+    controllerApis: ControllerApis
+  )(
+    implicit classTag: ClassTag[T],
+    @nowarn("cat=unused") nn: NotNothing[T]
+  ): T = {
+    val req = buildRequest(request)
+
+    controllerApis.handle(req, RequestLocal.NoCaching)
+
+    val capturedResponse: ArgumentCaptor[AbstractResponse] =
+      ArgumentCaptor.forClass(classOf[AbstractResponse])
+    verify(requestChannel).sendResponse(
+      ArgumentMatchers.eq(req),
+      capturedResponse.capture(),
+      ArgumentMatchers.eq(None)
+    )
+
+    capturedResponse.getValue match {
+      case response: T => response
+      case response =>
+        throw new ClassCastException(s"Expected response with type ${classTag.runtimeClass}, " +
+          s"but found ${response.getClass}")
+    }
+  }
+
   @AfterEach
   def tearDown(): Unit = {
     quotas.shutdown()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 1ddd63a..8528511 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -23,6 +23,7 @@ import java.util
 import java.util.Arrays.asList
 import java.util.concurrent.TimeUnit
 import java.util.{Collections, Optional, Properties, Random}
+
 import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr}
 import kafka.cluster.{Broker, Partition}
 import kafka.controller.{ControllerContext, KafkaController}
@@ -68,7 +69,7 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
 import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
-import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, Uuid}
 import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
 import org.easymock.EasyMock._
 import org.easymock.{Capture, EasyMock, IAnswer}
@@ -469,6 +470,12 @@ class KafkaApisTest {
   }
 
   @Test
+  def testElectLeadersForwarding(): Unit = {
+    val requestBuilder = new ElectLeadersRequest.Builder(ElectionType.PREFERRED, null, 30000)
+    testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder)
+  }
+
+  @Test
   def testDescribeQuorumNotAllowedForZkClusters(): Unit = {
     val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)
     val requestBuilder = new DescribeQuorumRequest.Builder(requestData)
@@ -495,6 +502,18 @@ class KafkaApisTest {
     )
   }
 
+  private def testKraftForwarding(
+    apiKey: ApiKeys,
+    requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]
+  ): Unit = {
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+    testForwardableApi(
+      createKafkaApis(enableForwarding = true, raftSupport = true),
+      apiKey,
+      requestBuilder
+    )
+  }
+
   private def testForwardableApi(apiKey: ApiKeys, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Unit = {
     testForwardableApi(
       createKafkaApis(enableForwarding = true),
@@ -511,7 +530,8 @@ class KafkaApisTest {
     val topicHeader = new RequestHeader(apiKey, apiKey.latestVersion,
       clientId, 0)
 
-    val request = buildRequest(requestBuilder.build(topicHeader.apiVersion))
+    val apiRequest = requestBuilder.build(topicHeader.apiVersion)
+    val request = buildRequest(apiRequest)
 
     if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) {
       // The controller check only makes sense for ZK clusters. For KRaft,
@@ -520,18 +540,28 @@ class KafkaApisTest {
       EasyMock.expect(controller.isActive).andReturn(false)
     }
 
-    expectNoThrottling(request)
+    val capturedResponse = expectNoThrottling(request)
+    val forwardCallback: Capture[Option[AbstractResponse] => Unit] = EasyMock.newCapture()
 
     EasyMock.expect(forwardingManager.forwardRequest(
       EasyMock.eq(request),
-      anyObject[Option[AbstractResponse] => Unit]()
+      EasyMock.capture(forwardCallback)
     )).once()
 
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller, forwardingManager)
 
     kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
+    assertNotNull(request.buffer, "The buffer was unexpectedly deallocated after " +
+      s"`handle` returned (is $apiKey marked as forwardable in `ApiKeys`?)")
+
+    val expectedResponse = apiRequest.getErrorResponse(Errors.NOT_CONTROLLER.exception)
+    assertTrue(forwardCallback.hasCaptured)
+    forwardCallback.getValue.apply(Some(expectedResponse))
 
-    EasyMock.verify(controller, forwardingManager)
+    assertTrue(capturedResponse.hasCaptured)
+    assertEquals(expectedResponse, capturedResponse.getValue)
+
+    EasyMock.verify(controller, requestChannel, forwardingManager)
   }
 
   private def authorizeResource(authorizer: Authorizer,
@@ -3980,13 +4010,13 @@ class KafkaApisTest {
     request
   }
 
-  private def verifyShouldNeverHandle(handler: RequestChannel.Request => Unit): Unit = {
+  private def verifyShouldNeverHandleErrorMessage(handler: RequestChannel.Request => Unit): Unit = {
     val request = createMockRequest()
     val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request))
     assertEquals(KafkaApis.shouldNeverReceive(request).getMessage, e.getMessage)
   }
 
-  private def verifyShouldAlwaysForward(handler: RequestChannel.Request => Unit): Unit = {
+  private def verifyShouldAlwaysForwardErrorMessage(handler: RequestChannel.Request => Unit): Unit = {
     val request = createMockRequest()
     val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request))
     assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage, e.getMessage)
@@ -3995,126 +4025,132 @@ class KafkaApisTest {
   @Test
   def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest)
+    verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest)
   }
 
   @Test
   def testRaftShouldNeverHandleStopReplicaRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleStopReplicaRequest)
+    verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleStopReplicaRequest)
   }
 
   @Test
   def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching))
+    verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
   def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleControlledShutdownRequest)
+    verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleControlledShutdownRequest)
   }
 
   @Test
   def testRaftShouldNeverHandleAlterIsrRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleAlterIsrRequest)
+    verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleAlterIsrRequest)
   }
 
   @Test
   def testRaftShouldNeverHandleEnvelope(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching))
+    verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
   def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTopicsRequest)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTopicsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateAcls)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateAcls)
   }
 
   @Test
   def testRaftShouldAlwaysForwardDeleteAcls(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteAcls)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteAcls)
   }
 
   @Test
   def testRaftShouldAlwaysForwardAlterConfigsRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterConfigsRequest)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterConfigsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardIncrementalAlterConfigsRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTokenRequest)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTokenRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleRenewTokenRequest)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleRenewTokenRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleExpireTokenRequest)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleExpireTokenRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleUpdateFeatures)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleUpdateFeatures)
+  }
+
+  @Test
+  def testRaftShouldAlwaysForwardElectLeaders(): Unit = {
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleElectLeaders)
   }
 
   @Test
   def testRaftShouldAlwaysForwardListPartitionReassignments(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest)
+    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 64003b7..06c7569 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -17,6 +17,7 @@
 package kafka.utils
 
 import java.io._
+import java.net.InetAddress
 import java.nio._
 import java.nio.channels._
 import java.nio.charset.{Charset, StandardCharsets}
@@ -26,12 +27,12 @@ import java.time.Duration
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
 import java.util.{Arrays, Collections, Optional, Properties}
-import com.yammer.metrics.core.Meter
 
+import com.yammer.metrics.core.Meter
 import javax.net.ssl.X509TrustManager
 import kafka.api._
 import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{ControllerEventManager, LeaderIsrAndControllerEpoch}
 import kafka.log._
 import kafka.metrics.KafkaYammerMetrics
 import kafka.network.RequestChannel
@@ -44,6 +45,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
 import org.apache.kafka.common.config.ConfigResource
@@ -53,9 +55,9 @@ import org.apache.kafka.common.header.Header
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ClientInformation, ListenerName, Mode}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, RequestContext, RequestHeader}
@@ -73,14 +75,13 @@ import org.apache.zookeeper.data.ACL
 import org.junit.jupiter.api.Assertions._
 import org.mockito.Mockito
 
-import java.net.InetAddress
-
 import scala.annotation.nowarn
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.{Map, Seq, mutable}
 import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.jdk.CollectionConverters._
+import scala.util.{Failure, Success, Try}
 
 /**
  * Utility functions to help with testing
@@ -1625,19 +1626,37 @@ object TestUtils extends Logging {
     waitForLeaderToBecome(client, topicPartition, None)
   }
 
-  def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]): Unit = {
+  def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
+    waitUntilTrue(() => {
+      val nodes = client.describeCluster().nodes().get()
+      nodes.asScala.exists(_.id == brokerId)
+    }, s"Timed out waiting for brokerId $brokerId to come online")
+  }
+
+  def waitForLeaderToBecome(
+    client: Admin,
+    topicPartition: TopicPartition,
+    expectedLeaderOpt: Option[Int]
+  ): Unit = {
     val topic = topicPartition.topic
-    val partition = topicPartition.partition
+    val partitionId = topicPartition.partition
+
+    def currentLeader: Try[Option[Int]] = Try {
+      val topicDescription = client.describeTopics(List(topic).asJava).allTopicNames.get.get(topic)
+      topicDescription.partitions.asScala
+        .find(_.partition == partitionId)
+        .flatMap(partitionState => Option(partitionState.leader))
+        .map(_.id)
+    }
 
-    waitUntilTrue(() => {
-      try {
-        val topicResult = client.describeTopics(Arrays.asList(topic)).allTopicNames.get.get(topic)
-        val partitionResult = topicResult.partitions.get(partition)
-        Option(partitionResult.leader).map(_.id) == leader
-      } catch {
-        case e: ExecutionException if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
-      }
-    }, "Timed out waiting for leader metadata")
+    val (lastLeaderCheck, isLeaderElected) = computeUntilTrue(currentLeader) {
+      case Success(leaderOpt) => leaderOpt == expectedLeaderOpt
+      case Failure(e: ExecutionException) if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
+      case Failure(e) => throw e
+    }
+
+    assertTrue(isLeaderElected, s"Timed out waiting for leader to become $expectedLeaderOpt. " +
+      s"Last metadata lookup returned leader = ${lastLeaderCheck.getOrElse("unknown")}")
   }
 
   def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition], brokerIds: Set[Int]): Unit = {
@@ -1652,7 +1671,7 @@ object TestUtils extends Logging {
 
         brokerIds.intersect(isr).isEmpty
       },
-      s"Expected brokers $brokerIds to no longer in the ISR for $partition"
+      s"Expected brokers $brokerIds to no longer be in the ISR for $partition"
     )
   }
 
@@ -1917,4 +1936,28 @@ object TestUtils extends Logging {
     )
   }
 
+  def verifyNoUnexpectedThreads(context: String): Unit = {
+    // Threads which may cause transient failures in subsequent tests if not shutdown.
+    // These include threads which make connections to brokers and may cause issues
+    // when broker ports are reused (e.g. auto-create topics) as well as threads
+    // which reset static JAAS configuration.
+    val unexpectedThreadNames = Set(
+      ControllerEventManager.ControllerEventThreadName,
+      KafkaProducer.NETWORK_THREAD_PREFIX,
+      AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
+      AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
+      ZooKeeperTestHarness.ZkClientEventThreadSuffix
+    )
+
+    def unexpectedThreads: Set[String] = {
+      val allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName)
+      allThreads.filter(t => unexpectedThreadNames.exists(s => t.contains(s))).toSet
+    }
+
+    val (unexpected, _) = TestUtils.computeUntilTrue(unexpectedThreads)(_.isEmpty)
+    assertTrue(unexpected.isEmpty,
+      s"Found ${unexpected.size} unexpected threads during $context: " +
+        s"${unexpected.mkString("`", ",", "`")}")
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 8b61c0e..a81cbb9 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,19 +19,12 @@ package kafka.zk
 
 import javax.security.auth.login.Configuration
 import kafka.utils.{CoreUtils, Logging, TestUtils}
-import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag}
-import org.junit.jupiter.api.Assertions._
 import org.apache.kafka.common.security.JaasUtils
-
-import scala.collection.Set
-import scala.jdk.CollectionConverters._
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
-import kafka.controller.ControllerEventManager
-import org.apache.kafka.clients.admin.AdminClientUnitTestEnv
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag}
 
 @Tag("integration")
 abstract class ZooKeeperTestHarness extends Logging {
@@ -87,16 +80,6 @@ abstract class ZooKeeperTestHarness extends Logging {
 object ZooKeeperTestHarness {
   val ZkClientEventThreadSuffix = "-EventThread"
 
-  // Threads which may cause transient failures in subsequent tests if not shutdown.
-  // These include threads which make connections to brokers and may cause issues
-  // when broker ports are reused (e.g. auto-create topics) as well as threads
-  // which reset static JAAS configuration.
-  val unexpectedThreadNames = Set(ControllerEventManager.ControllerEventThreadName,
-                                  KafkaProducer.NETWORK_THREAD_PREFIX,
-                                  AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
-                                  AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
-                                  ZkClientEventThreadSuffix)
-
   /**
    * Verify that a previous test that doesn't use ZooKeeperTestHarness hasn't left behind an unexpected thread.
    * This assumes that brokers, ZooKeeper clients, producers and consumers are not created in another @BeforeClass,
@@ -104,7 +87,7 @@ object ZooKeeperTestHarness {
    */
   @BeforeAll
   def setUpClass(): Unit = {
-    verifyNoUnexpectedThreads("@BeforeClass")
+    TestUtils.verifyNoUnexpectedThreads("@BeforeAll")
   }
 
   /**
@@ -112,19 +95,7 @@ object ZooKeeperTestHarness {
    */
   @AfterAll
   def tearDownClass(): Unit = {
-    verifyNoUnexpectedThreads("@AfterClass")
+    TestUtils.verifyNoUnexpectedThreads("@AfterAll")
   }
 
-  /**
-   * Verifies that threads which are known to cause transient failures in subsequent tests
-   * have been shutdown.
-   */
-  def verifyNoUnexpectedThreads(context: String): Unit = {
-    def allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName)
-    val (threads, noUnexpected) = TestUtils.computeUntilTrue(allThreads) { threads =>
-      threads.forall(t => unexpectedThreadNames.forall(s => !t.contains(s)))
-    }
-    assertTrue(noUnexpected, s"Found unexpected threads during $context, allThreads=$threads, " +
-      s"unexpected=${threads.filterNot(t => unexpectedThreadNames.forall(s => !t.contains(s)))}")
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index a0eb1ea..5cfb9ee 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -25,6 +25,7 @@ import scala.collection.Seq
 import com.yammer.metrics.core.{Gauge, Meter, MetricName}
 import kafka.server.KafkaConfig
 import kafka.metrics.KafkaYammerMetrics
+import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Time
@@ -33,7 +34,7 @@ import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
 import org.apache.zookeeper.ZooKeeper.States
 import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
-import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertTrue, assertThrows, fail}
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertThrows, assertTrue, fail}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
 import scala.jdk.CollectionConverters._
@@ -46,7 +47,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @BeforeEach
   override def setUp(): Unit = {
-    ZooKeeperTestHarness.verifyNoUnexpectedThreads("@BeforeEach")
+    TestUtils.verifyNoUnexpectedThreads("@BeforeEach")
     cleanMetricsRegistry()
     super.setUp()
     zooKeeperClient = newZooKeeperClient()
@@ -58,7 +59,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       zooKeeperClient.close()
     super.tearDown()
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
-    ZooKeeperTestHarness.verifyNoUnexpectedThreads("@AfterEach")
+    TestUtils.verifyNoUnexpectedThreads("@AfterEach")
   }
 
   @Test
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index c04a8fb..5a63094 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -17,8 +17,6 @@
 
 package org.apache.kafka.controller;
 
-import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
@@ -35,11 +33,11 @@ import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FeatureMapAndEpoch;
 import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
 import org.slf4j.Logger;
@@ -52,7 +50,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKER_RECORD;
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index 5092ba1..cf0f6bf 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -38,6 +40,8 @@ import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
  * PartitionChangeBuilder handles changing partition registrations.
  */
 public class PartitionChangeBuilder {
+    private static final Logger log = LoggerFactory.getLogger(PartitionChangeBuilder.class);
+
     public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
         if (record.isr() != null) return false;
         if (record.leader() != NO_LEADER_CHANGE) return false;
@@ -141,12 +145,15 @@ public class PartitionChangeBuilder {
     private void tryElection(PartitionChangeRecord record) {
         BestLeader bestLeader = new BestLeader();
         if (bestLeader.node != partition.leader) {
+            log.debug("Setting new leader for topicId {}, partition {} to {}", topicId, partitionId, bestLeader.node);
             record.setLeader(bestLeader.node);
             if (bestLeader.unclean) {
                 // If the election was unclean, we have to forcibly set the ISR to just the
                 // new leader. This can result in data loss!
                 record.setIsr(Collections.singletonList(bestLeader.node));
             }
+        } else {
+            log.debug("Failed to find a new leader with current state: {}", this);
         }
     }
 
@@ -240,4 +247,20 @@ public class PartitionChangeBuilder {
                 PARTITION_CHANGE_RECORD.highestSupportedVersion()));
         }
     }
+
+    @Override
+    public String toString() {
+        return "PartitionChangeBuilder(" +
+            "partition=" + partition +
+            ", topicId=" + topicId +
+            ", partitionId=" + partitionId +
+            ", isAcceptableLeader=" + isAcceptableLeader +
+            ", uncleanElectionOk=" + uncleanElectionOk +
+            ", targetIsr=" + targetIsr +
+            ", targetReplicas=" + targetReplicas +
+            ", targetRemoving=" + targetRemoving +
+            ", targetAdding=" + targetAdding +
+            ", alwaysElectPreferredIfPossible=" + alwaysElectPreferredIfPossible +
+            ')';
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 3066f4b..4450e25 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -132,6 +132,14 @@ public class ReplicationControlManager {
             this.id = id;
             this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
         }
+
+        public String name() {
+            return name;
+        }
+
+        public Uuid topicId() {
+            return id;
+        }
     }
 
     private final SnapshotRegistry snapshotRegistry;
@@ -587,6 +595,11 @@ public class ReplicationControlManager {
     }
 
     // VisibleForTesting
+    TopicControlInfo getTopic(Uuid topicId) {
+        return topics.get(topicId);
+    }
+
+    // VisibleForTesting
     BrokersToIsrs brokersToIsrs() {
         return brokersToIsrs;
     }
@@ -787,7 +800,7 @@ public class ReplicationControlManager {
     }
 
     ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
-        boolean uncleanOk = electionTypeIsUnclean(request.electionType());
+        ElectionType electionType = electionType(request.electionType());
         List<ApiMessageAndVersion> records = new ArrayList<>();
         ElectLeadersResponseData response = new ElectLeadersResponseData();
         if (request.topicPartitions() == null) {
@@ -804,11 +817,16 @@ public class ReplicationControlManager {
                 TopicControlInfo topic = topics.get(topicEntry.getValue());
                 if (topic != null) {
                     for (int partitionId : topic.parts.keySet()) {
-                        ApiError error = electLeader(topicName, partitionId, uncleanOk, records);
-                        topicResults.partitionResult().add(new PartitionResult().
-                            setPartitionId(partitionId).
-                            setErrorCode(error.error().code()).
-                            setErrorMessage(error.message()));
+                        ApiError error = electLeader(topicName, partitionId, electionType, records);
+
+                        // When electing leaders for all partitions, we do not return
+                        // partitions which already have the desired leader.
+                        if (error.error() != Errors.ELECTION_NOT_NEEDED) {
+                            topicResults.partitionResult().add(new PartitionResult().
+                                setPartitionId(partitionId).
+                                setErrorCode(error.error().code()).
+                                setErrorMessage(error.message()));
+                        }
                     }
                 }
             }
@@ -818,7 +836,7 @@ public class ReplicationControlManager {
                     new ReplicaElectionResult().setTopic(topic.topic());
                 response.replicaElectionResults().add(topicResults);
                 for (int partitionId : topic.partitions()) {
-                    ApiError error = electLeader(topic.topic(), partitionId, uncleanOk, records);
+                    ApiError error = electLeader(topic.topic(), partitionId, electionType, records);
                     topicResults.partitionResult().add(new PartitionResult().
                         setPartitionId(partitionId).
                         setErrorCode(error.error().code()).
@@ -829,17 +847,15 @@ public class ReplicationControlManager {
         return ControllerResult.of(records, response);
     }
 
-    static boolean electionTypeIsUnclean(byte electionType) {
-        ElectionType type;
+    private static ElectionType electionType(byte electionType) {
         try {
-            type = ElectionType.valueOf(electionType);
+            return ElectionType.valueOf(electionType);
         } catch (IllegalArgumentException e) {
             throw new InvalidRequestException("Unknown election type " + (int) electionType);
         }
-        return type == ElectionType.UNCLEAN;
     }
 
-    ApiError electLeader(String topic, int partitionId, boolean uncleanOk,
+    ApiError electLeader(String topic, int partitionId, ElectionType electionType,
                          List<ApiMessageAndVersion> records) {
         Uuid topicId = topicsByName.get(topic);
         if (topicId == null) {
@@ -856,21 +872,24 @@ public class ReplicationControlManager {
             return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
                 "No such partition as " + topic + "-" + partitionId);
         }
+        if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader())
+            || (electionType == ElectionType.UNCLEAN && partition.hasLeader())) {
+            return new ApiError(Errors.ELECTION_NOT_NEEDED);
+        }
+
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
             topicId,
             partitionId,
             r -> clusterControl.unfenced(r),
-            () -> uncleanOk || configurationControl.uncleanLeaderElectionEnabledForTopic(topic));
-        builder.setAlwaysElectPreferredIfPossible(true);
+            () -> electionType == ElectionType.UNCLEAN);
+
+        builder.setAlwaysElectPreferredIfPossible(electionType == ElectionType.PREFERRED);
         Optional<ApiMessageAndVersion> record = builder.build();
         if (!record.isPresent()) {
-            if (partition.leader == NO_LEADER) {
-                // If we can't find any leader for the partition, return an error.
-                return new ApiError(Errors.LEADER_NOT_AVAILABLE,
-                    "Unable to find any leader for the partition.");
+            if (electionType == ElectionType.PREFERRED) {
+                return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE);
             } else {
-                // There is nothing to do.
-                return ApiError.NONE;
+                return new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE);
             }
         }
         records.add(record.get());
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java
index e159732..4b574b3 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java
@@ -81,4 +81,11 @@ public final class ClientQuotasDelta {
         }
         return new ClientQuotasImage(newEntities);
     }
+
+    @Override
+    public String toString() {
+        return "ClientQuotasDelta(" +
+            "changes=" + changes +
+            ')';
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
index 90f61d7..6c48b8e 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
@@ -126,4 +126,11 @@ public final class ClusterDelta {
         }
         return new ClusterImage(newBrokers);
     }
+
+    @Override
+    public String toString() {
+        return "ClusterDelta(" +
+            "changedBrokers=" + changedBrokers +
+            ')';
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
index 4b3aa0b..677f764 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
@@ -74,4 +74,13 @@ public final class ConfigurationDelta {
         }
         return new ConfigurationImage(newData);
     }
+
+    @Override
+    public String toString() {
+        // Values are intentionally left out of this so that sensitive configs
+        // do not end up in logging by mistake.
+        return "ConfigurationDelta(" +
+            "changedKeys=" + changes.keySet() +
+            ')';
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
index df3b0ff..d0f5848 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
@@ -96,4 +96,11 @@ public final class ConfigurationsDelta {
         }
         return new ConfigurationsImage(newData);
     }
+
+    @Override
+    public String toString() {
+        return "ConfigurationsDelta(" +
+            "changes=" + changes +
+            ')';
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
index 940697e..781c496 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
@@ -83,4 +83,11 @@ public final class FeaturesDelta {
         }
         return new FeaturesImage(newFinalizedVersions);
     }
+
+    @Override
+    public String toString() {
+        return "FeaturesDelta(" +
+            "changes=" + changes +
+            ')';
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index 6be0dd6..4b9451b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -256,4 +256,15 @@ public final class MetadataDelta {
         return new MetadataImage(newFeatures, newCluster, newTopics, newConfigs,
             newClientQuotas);
     }
+
+    @Override
+    public String toString() {
+        return "MetadataDelta(" +
+            "featuresDelta=" + featuresDelta +
+            ", clusterDelta=" + clusterDelta +
+            ", topicsDelta=" + topicsDelta +
+            ", configsDelta=" + configsDelta +
+            ", clientQuotasDelta=" + clientQuotasDelta +
+            ')';
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
index 88fef30..3214a0c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
@@ -138,4 +138,11 @@ public final class TopicDelta {
 
         return new LocalReplicaChanges(deletes, leaders, followers);
     }
+
+    @Override
+    public String toString() {
+        return "TopicDelta(" +
+            "partitionChanges=" + partitionChanges +
+            ')';
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
index a146bba..f9d8087 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
@@ -201,4 +201,12 @@ public final class TopicsDelta {
 
         return new LocalReplicaChanges(deletes, leaders, followers);
     }
+
+    @Override
+    public String toString() {
+        return "TopicsDelta(" +
+            "changedTopics=" + changedTopics +
+            ", deletedTopicIds=" + deletedTopicIds +
+            ')';
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index 1b013f4..f935a80 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -107,6 +107,14 @@ public class PartitionChangeBuilderTest {
             shouldTryElection());
         assertFalse(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).
             shouldTryElection());
+
+        assertTrue(createFooBuilder(true)
+            .setTargetIsr(Arrays.asList(3))
+            .shouldTryElection());
+        assertTrue(createFooBuilder(true)
+            .setTargetIsr(Arrays.asList(4))
+            .setTargetReplicas(Arrays.asList(2, 1, 3, 4))
+            .shouldTryElection());
     }
 
     private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder,
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 09449d3..78b0995 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.controller;
 
 import java.util.Optional;
+import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.ConfigResource;
@@ -37,12 +38,12 @@ import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
 import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
 import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
 import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
-import org.apache.kafka.common.message.CreateTopicsRequestData;
-import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.ElectLeadersRequestData;
 import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
 import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitionsCollection;
@@ -50,9 +51,10 @@ import org.apache.kafka.common.message.ElectLeadersResponseData;
 import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
 import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
-import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@@ -63,20 +65,21 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.metadata.BrokerHeartbeatReply;
-import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -84,18 +87,26 @@ import java.util.List;
 import java.util.Map;
 import java.util.OptionalInt;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.common.protocol.Errors.ELECTION_NOT_NEEDED;
+import static org.apache.kafka.common.protocol.Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE;
 import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
 import static org.apache.kafka.common.protocol.Errors.INVALID_PARTITIONS;
 import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT;
 import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
 import static org.apache.kafka.common.protocol.Errors.NONE;
 import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
+import static org.apache.kafka.common.protocol.Errors.PREFERRED_LEADER_NOT_AVAILABLE;
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
 import static org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -117,7 +128,7 @@ public class ReplicationControlManagerTest {
         final MockTime time = new MockTime();
         final MockRandom random = new MockRandom();
         final ClusterControlManager clusterControl = new ClusterControlManager(
-            logContext, time, snapshotRegistry, BROKER_SESSION_TIMEOUT_MS,
+            logContext, time, snapshotRegistry, TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
             new StripedReplicaPlacer(random));
         final ControllerMetrics metrics = new MockControllerMetrics();
         final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
@@ -200,7 +211,45 @@ public class ReplicationControlManagerTest {
             }
         }
 
-        void unfenceBrokers(Integer... brokerIds) throws Exception {
+        void alterIsr(
+            TopicIdPartition topicIdPartition,
+            int leaderId,
+            List<Integer> isr
+        ) throws Exception {
+            BrokerRegistration registration = clusterControl.brokerRegistrations().get(leaderId);
+            assertFalse(registration.fenced());
+
+            PartitionRegistration partition = replicationControl.getPartition(
+                topicIdPartition.topicId(),
+                topicIdPartition.partitionId()
+            );
+            assertNotNull(partition);
+            assertEquals(leaderId, partition.leader);
+
+            PartitionData partitionData = new PartitionData()
+                .setPartitionIndex(topicIdPartition.partitionId())
+                .setCurrentIsrVersion(partition.partitionEpoch)
+                .setLeaderEpoch(partition.leaderEpoch)
+                .setNewIsr(isr);
+
+            String topicName = replicationControl.getTopic(topicIdPartition.topicId()).name();
+            TopicData topicData = new TopicData()
+                .setName(topicName)
+                .setPartitions(singletonList(partitionData));
+
+            ControllerResult<AlterIsrResponseData> alterIsr = replicationControl.alterIsr(
+                new AlterIsrRequestData()
+                    .setBrokerId(leaderId)
+                    .setBrokerEpoch(registration.epoch())
+                    .setTopics(singletonList(topicData)));
+            replay(alterIsr.records());
+        }
+
+        void unfenceBrokers(Integer... brokerIds)  throws Exception {
+            unfenceBrokers(Utils.mkSet(brokerIds));
+        }
+
+        void unfenceBrokers(Set<Integer> brokerIds) throws Exception {
             for (int brokerId : brokerIds) {
                 ControllerResult<BrokerHeartbeatReply> result = replicationControl.
                     processBrokerHeartbeat(new BrokerHeartbeatRequestData().
@@ -213,6 +262,19 @@ public class ReplicationControlManagerTest {
             }
         }
 
+        void alterTopicConfig(
+            String topic,
+            String configKey,
+            String configValue
+        ) throws Exception {
+            ConfigRecord configRecord = new ConfigRecord()
+                .setResourceType(ConfigResource.Type.TOPIC.id())
+                .setResourceName(topic)
+                .setName(configKey)
+                .setValue(configValue);
+            replay(singletonList(new ApiMessageAndVersion(configRecord, (short) 0)));
+        }
+
         void fenceBrokers(Set<Integer> brokerIds) throws Exception {
             time.sleep(BROKER_SESSION_TIMEOUT_MS);
 
@@ -284,10 +346,10 @@ public class ReplicationControlManagerTest {
                 setErrorMessage(Errors.TOPIC_ALREADY_EXISTS.exception().getMessage()));
         assertEquals(expectedResponse3, result3.response());
         Uuid fooId = result2.response().topics().find("foo").topicId();
-        RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
-            Arrays.asList(new ApiMessageAndVersion(new PartitionRecord().
+        RecordTestUtils.assertBatchIteratorContains(asList(
+            asList(new ApiMessageAndVersion(new PartitionRecord().
                     setPartitionId(0).setTopicId(fooId).
-                    setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)).
+                    setReplicas(asList(1, 2, 0)).setIsr(asList(1, 2, 0)).
                     setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).
                     setLeaderEpoch(0).setPartitionEpoch(0), (short) 0),
                 new ApiMessageAndVersion(new TopicRecord().
@@ -450,7 +512,7 @@ public class ReplicationControlManagerTest {
         assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
         long brokerEpoch = ctx.currentBrokerEpoch(0);
         PartitionData shrinkIsrRequest = newAlterIsrPartition(
-            replicationControl, topicIdPartition, Arrays.asList(0, 1));
+            replicationControl, topicIdPartition, asList(0, 1));
         ControllerResult<AlterIsrResponseData> shrinkIsrResult = sendAlterIsr(
             replicationControl, 0, brokerEpoch, "foo", shrinkIsrRequest);
         AlterIsrResponseData.PartitionData shrinkIsrResponse = assertAlterIsrResponse(
@@ -458,7 +520,7 @@ public class ReplicationControlManagerTest {
         assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
 
         PartitionData expandIsrRequest = newAlterIsrPartition(
-            replicationControl, topicIdPartition, Arrays.asList(0, 1, 2));
+            replicationControl, topicIdPartition, asList(0, 1, 2));
         ControllerResult<AlterIsrResponseData> expandIsrResult = sendAlterIsr(
             replicationControl, 0, brokerEpoch, "foo", expandIsrRequest);
         AlterIsrResponseData.PartitionData expandIsrResponse = assertAlterIsrResponse(
@@ -482,7 +544,7 @@ public class ReplicationControlManagerTest {
 
         // Invalid leader
         PartitionData invalidLeaderRequest = newAlterIsrPartition(
-            replicationControl, topicIdPartition, Arrays.asList(0, 1));
+            replicationControl, topicIdPartition, asList(0, 1));
         ControllerResult<AlterIsrResponseData> invalidLeaderResult = sendAlterIsr(
             replicationControl, 1, ctx.currentBrokerEpoch(1),
             "foo", invalidLeaderRequest);
@@ -490,13 +552,13 @@ public class ReplicationControlManagerTest {
 
         // Stale broker epoch
         PartitionData invalidBrokerEpochRequest = newAlterIsrPartition(
-            replicationControl, topicIdPartition, Arrays.asList(0, 1));
+            replicationControl, topicIdPartition, asList(0, 1));
         assertThrows(StaleBrokerEpochException.class, () -> sendAlterIsr(
             replicationControl, 0, brokerEpoch - 1, "foo", invalidBrokerEpochRequest));
 
         // Invalid leader epoch
         PartitionData invalidLeaderEpochRequest = newAlterIsrPartition(
-            replicationControl, topicIdPartition, Arrays.asList(0, 1));
+            replicationControl, topicIdPartition, asList(0, 1));
         invalidLeaderEpochRequest.setLeaderEpoch(500);
         ControllerResult<AlterIsrResponseData> invalidLeaderEpochResult = sendAlterIsr(
             replicationControl, 1, ctx.currentBrokerEpoch(1),
@@ -505,8 +567,8 @@ public class ReplicationControlManagerTest {
 
         // Invalid ISR (3 is not a valid replica)
         PartitionData invalidIsrRequest1 = newAlterIsrPartition(
-            replicationControl, topicIdPartition, Arrays.asList(0, 1));
-        invalidIsrRequest1.setNewIsr(Arrays.asList(0, 1, 3));
+            replicationControl, topicIdPartition, asList(0, 1));
+        invalidIsrRequest1.setNewIsr(asList(0, 1, 3));
         ControllerResult<AlterIsrResponseData> invalidIsrResult1 = sendAlterIsr(
             replicationControl, 1, ctx.currentBrokerEpoch(1),
             "foo", invalidIsrRequest1);
@@ -514,8 +576,8 @@ public class ReplicationControlManagerTest {
 
         // Invalid ISR (does not include leader 0)
         PartitionData invalidIsrRequest2 = newAlterIsrPartition(
-            replicationControl, topicIdPartition, Arrays.asList(0, 1));
-        invalidIsrRequest2.setNewIsr(Arrays.asList(1, 2));
+            replicationControl, topicIdPartition, asList(0, 1));
+        invalidIsrRequest2.setNewIsr(asList(1, 2));
         ControllerResult<AlterIsrResponseData> invalidIsrResult2 = sendAlterIsr(
             replicationControl, 1, ctx.currentBrokerEpoch(1),
             "foo", invalidIsrRequest2);
@@ -647,28 +709,28 @@ public class ReplicationControlManagerTest {
         assertNull(replicationControl.getPartition(topicId, 3));
         assertCreatedTopicConfigs(ctx, "foo", requestConfigs);
 
-        assertEquals(Collections.singletonMap(topicId, new ResultOrError<>("foo")),
+        assertEquals(singletonMap(topicId, new ResultOrError<>("foo")),
             replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(topicId)));
-        assertEquals(Collections.singletonMap("foo", new ResultOrError<>(topicId)),
+        assertEquals(singletonMap("foo", new ResultOrError<>(topicId)),
             replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("foo")));
         Uuid invalidId = new Uuid(topicId.getMostSignificantBits() + 1,
             topicId.getLeastSignificantBits());
-        assertEquals(Collections.singletonMap(invalidId,
+        assertEquals(singletonMap(invalidId,
             new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID))),
                 replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(invalidId)));
-        assertEquals(Collections.singletonMap("bar",
+        assertEquals(singletonMap("bar",
             new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))),
                 replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar")));
 
         ControllerResult<Map<Uuid, ApiError>> invalidDeleteResult = replicationControl.
             deleteTopics(Collections.singletonList(invalidId));
         assertEquals(0, invalidDeleteResult.records().size());
-        assertEquals(Collections.singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)),
+        assertEquals(singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)),
             invalidDeleteResult.response());
         ControllerResult<Map<Uuid, ApiError>> deleteResult = replicationControl.
             deleteTopics(Collections.singletonList(topicId));
         assertTrue(deleteResult.isAtomic());
-        assertEquals(Collections.singletonMap(topicId, new ApiError(NONE, null)),
+        assertEquals(singletonMap(topicId, new ApiError(NONE, null)),
             deleteResult.response());
         assertEquals(1, deleteResult.records().size());
         ctx.replay(deleteResult.records());
@@ -676,10 +738,10 @@ public class ReplicationControlManagerTest {
         assertNull(replicationControl.getPartition(topicId, 1));
         assertNull(replicationControl.getPartition(topicId, 2));
         assertNull(replicationControl.getPartition(topicId, 3));
-        assertEquals(Collections.singletonMap(topicId, new ResultOrError<>(
+        assertEquals(singletonMap(topicId, new ResultOrError<>(
             new ApiError(UNKNOWN_TOPIC_ID))), replicationControl.findTopicNames(
                 Long.MAX_VALUE, Collections.singleton(topicId)));
-        assertEquals(Collections.singletonMap("foo", new ResultOrError<>(
+        assertEquals(singletonMap("foo", new ResultOrError<>(
             new ApiError(UNKNOWN_TOPIC_OR_PARTITION))), replicationControl.findTopicIds(
                 Long.MAX_VALUE, Collections.singleton("foo")));
         assertEmptyTopicConfigs(ctx, "foo");
@@ -715,7 +777,7 @@ public class ReplicationControlManagerTest {
             setName("quux").setCount(2).setAssignments(null));
         ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult =
             replicationControl.createPartitions(topics);
-        assertEquals(Arrays.asList(new CreatePartitionsTopicResult().
+        assertEquals(asList(new CreatePartitionsTopicResult().
                 setName("foo").
                 setErrorCode(NONE.code()).
                 setErrorMessage(null),
@@ -735,20 +797,20 @@ public class ReplicationControlManagerTest {
         ctx.replay(createPartitionsResult.records());
         List<CreatePartitionsTopic> topics2 = new ArrayList<>();
         topics2.add(new CreatePartitionsTopic().
-            setName("foo").setCount(6).setAssignments(Arrays.asList(
-                new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
+            setName("foo").setCount(6).setAssignments(asList(
+                new CreatePartitionsAssignment().setBrokerIds(asList(1, 0)))));
         topics2.add(new CreatePartitionsTopic().
-            setName("bar").setCount(5).setAssignments(Arrays.asList(
-            new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1)))));
+            setName("bar").setCount(5).setAssignments(asList(
+            new CreatePartitionsAssignment().setBrokerIds(asList(1)))));
         topics2.add(new CreatePartitionsTopic().
-            setName("quux").setCount(4).setAssignments(Arrays.asList(
-            new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
+            setName("quux").setCount(4).setAssignments(asList(
+            new CreatePartitionsAssignment().setBrokerIds(asList(1, 0)))));
         topics2.add(new CreatePartitionsTopic().
-            setName("foo2").setCount(3).setAssignments(Arrays.asList(
-            new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(2, 0)))));
+            setName("foo2").setCount(3).setAssignments(asList(
+            new CreatePartitionsAssignment().setBrokerIds(asList(2, 0)))));
         ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult2 =
             replicationControl.createPartitions(topics2);
-        assertEquals(Arrays.asList(new CreatePartitionsTopicResult().
+        assertEquals(asList(new CreatePartitionsTopicResult().
                 setName("foo").
                 setErrorCode(NONE.code()).
                 setErrorMessage(null),
@@ -775,13 +837,13 @@ public class ReplicationControlManagerTest {
     public void testValidateGoodManualPartitionAssignments() throws Exception {
         ReplicationControlTestContext ctx = new ReplicationControlTestContext();
         ctx.registerBrokers(1, 2, 3);
-        ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1),
+        ctx.replicationControl.validateManualPartitionAssignment(asList(1),
             OptionalInt.of(1));
-        ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1),
+        ctx.replicationControl.validateManualPartitionAssignment(asList(1),
             OptionalInt.empty());
-        ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3),
+        ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3),
             OptionalInt.of(3));
-        ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3),
+        ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3),
             OptionalInt.empty());
     }
 
@@ -791,20 +853,20 @@ public class ReplicationControlManagerTest {
         ctx.registerBrokers(1, 2);
         assertEquals("The manual partition assignment includes an empty replica list.",
             assertThrows(InvalidReplicaAssignmentException.class, () ->
-                ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(),
+                ctx.replicationControl.validateManualPartitionAssignment(asList(),
                     OptionalInt.empty())).getMessage());
         assertEquals("The manual partition assignment includes broker 3, but no such " +
             "broker is registered.", assertThrows(InvalidReplicaAssignmentException.class, () ->
-                ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3),
+                ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3),
                     OptionalInt.empty())).getMessage());
         assertEquals("The manual partition assignment includes the broker 2 more than " +
             "once.", assertThrows(InvalidReplicaAssignmentException.class, () ->
-                ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 2),
+                ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 2),
                     OptionalInt.empty())).getMessage());
         assertEquals("The manual partition assignment includes a partition with 2 " +
             "replica(s), but this is not consistent with previous partitions, which have " +
                 "3 replica(s).", assertThrows(InvalidReplicaAssignmentException.class, () ->
-                    ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2),
+                    ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2),
                         OptionalInt.of(3))).getMessage());
     }
 
@@ -824,18 +886,18 @@ public class ReplicationControlManagerTest {
         assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
         ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
             replication.alterPartitionReassignments(
-                new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
-                    new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+                new AlterPartitionReassignmentsRequestData().setTopics(asList(
+                    new ReassignableTopic().setName("foo").setPartitions(asList(
                         new ReassignablePartition().setPartitionIndex(0).
-                            setReplicas(Arrays.asList(3, 2, 1)),
+                            setReplicas(asList(3, 2, 1)),
                         new ReassignablePartition().setPartitionIndex(1).
-                            setReplicas(Arrays.asList(0, 2, 1)),
+                            setReplicas(asList(0, 2, 1)),
                         new ReassignablePartition().setPartitionIndex(2).
-                            setReplicas(Arrays.asList(0, 2, 1)))),
+                            setReplicas(asList(0, 2, 1)))),
                 new ReassignableTopic().setName("bar"))));
         assertEquals(new AlterPartitionReassignmentsResponseData().
-                setErrorMessage(null).setResponses(Arrays.asList(
-                    new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+                setErrorMessage(null).setResponses(asList(
+                    new ReassignableTopicResponse().setName("foo").setPartitions(asList(
                         new ReassignablePartitionResponse().setPartitionIndex(0).
                             setErrorMessage(null),
                         new ReassignablePartitionResponse().setPartitionIndex(1).
@@ -849,41 +911,41 @@ public class ReplicationControlManagerTest {
         ctx.replay(alterResult.records());
         ListPartitionReassignmentsResponseData currentReassigning =
             new ListPartitionReassignmentsResponseData().setErrorMessage(null).
-                setTopics(Arrays.asList(new OngoingTopicReassignment().
-                    setName("foo").setPartitions(Arrays.asList(
+                setTopics(asList(new OngoingTopicReassignment().
+                    setName("foo").setPartitions(asList(
                     new OngoingPartitionReassignment().setPartitionIndex(1).
-                        setRemovingReplicas(Arrays.asList(3)).
-                        setAddingReplicas(Arrays.asList(0)).
-                        setReplicas(Arrays.asList(0, 2, 1, 3))))));
+                        setRemovingReplicas(asList(3)).
+                        setAddingReplicas(asList(0)).
+                        setReplicas(asList(0, 2, 1, 3))))));
         assertEquals(currentReassigning, replication.listPartitionReassignments(null));
-        assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList(
+        assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(asList(
                 new ListPartitionReassignmentsTopics().setName("bar").
-                    setPartitionIndexes(Arrays.asList(0, 1, 2)))));
-        assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList(
+                    setPartitionIndexes(asList(0, 1, 2)))));
+        assertEquals(currentReassigning, replication.listPartitionReassignments(asList(
             new ListPartitionReassignmentsTopics().setName("foo").
-                setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+                setPartitionIndexes(asList(0, 1, 2)))));
         ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
             replication.alterPartitionReassignments(
-                new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
-                    new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+                new AlterPartitionReassignmentsRequestData().setTopics(asList(
+                    new ReassignableTopic().setName("foo").setPartitions(asList(
                         new ReassignablePartition().setPartitionIndex(0).
                             setReplicas(null),
                         new ReassignablePartition().setPartitionIndex(1).
                             setReplicas(null),
                         new ReassignablePartition().setPartitionIndex(2).
                             setReplicas(null))),
-                    new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
+                    new ReassignableTopic().setName("bar").setPartitions(asList(
                         new ReassignablePartition().setPartitionIndex(0).
                             setReplicas(null))))));
         assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
             new PartitionChangeRecord().setTopicId(fooId).
                 setPartitionId(1).
-                setReplicas(Arrays.asList(2, 1, 3)).
+                setReplicas(asList(2, 1, 3)).
                 setLeader(3).
                 setRemovingReplicas(Collections.emptyList()).
                 setAddingReplicas(Collections.emptyList()), (short) 0)),
-            new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(
-                new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+            new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
+                new ReassignableTopicResponse().setName("foo").setPartitions(asList(
                     new ReassignablePartitionResponse().setPartitionIndex(0).
                         setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null),
                     new ReassignablePartitionResponse().setPartitionIndex(1).
@@ -891,7 +953,7 @@ public class ReplicationControlManagerTest {
                     new ReassignablePartitionResponse().setPartitionIndex(2).
                         setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
                         setErrorMessage("Unable to find partition foo:2."))),
-                new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
+                new ReassignableTopicResponse().setName("bar").setPartitions(asList(
                     new ReassignablePartitionResponse().setPartitionIndex(0).
                         setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).
                         setErrorMessage(null)))))),
@@ -899,11 +961,11 @@ public class ReplicationControlManagerTest {
         log.info("running final alterIsr...");
         ControllerResult<AlterIsrResponseData> alterIsrResult = replication.alterIsr(
             new AlterIsrRequestData().setBrokerId(3).setBrokerEpoch(103).
-                setTopics(Arrays.asList(new TopicData().setName("foo").setPartitions(Arrays.asList(
+                setTopics(asList(new TopicData().setName("foo").setPartitions(asList(
                     new PartitionData().setPartitionIndex(1).setCurrentIsrVersion(1).
-                        setLeaderEpoch(0).setNewIsr(Arrays.asList(3, 0, 2, 1)))))));
-        assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
-            new AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList(
+                        setLeaderEpoch(0).setNewIsr(asList(3, 0, 2, 1)))))));
+        assertEquals(new AlterIsrResponseData().setTopics(asList(
+            new AlterIsrResponseData.TopicData().setName("foo").setPartitions(asList(
                 new AlterIsrResponseData.PartitionData().
                     setPartitionIndex(1).
                     setErrorCode(FENCED_LEADER_EPOCH.code()))))),
@@ -931,22 +993,22 @@ public class ReplicationControlManagerTest {
             new int[] {}, new int[] {}, 1, 1, 1), replication.getPartition(fooId, 0));
         ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
             replication.alterPartitionReassignments(
-                new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
-                    new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+                new AlterPartitionReassignmentsRequestData().setTopics(asList(
+                    new ReassignableTopic().setName("foo").setPartitions(asList(
                         new ReassignablePartition().setPartitionIndex(0).
-                            setReplicas(Arrays.asList(1, 2, 3)),
+                            setReplicas(asList(1, 2, 3)),
                         new ReassignablePartition().setPartitionIndex(1).
-                            setReplicas(Arrays.asList(1, 2, 3, 0)),
+                            setReplicas(asList(1, 2, 3, 0)),
                         new ReassignablePartition().setPartitionIndex(2).
-                            setReplicas(Arrays.asList(5, 6, 7)),
+                            setReplicas(asList(5, 6, 7)),
                         new ReassignablePartition().setPartitionIndex(3).
-                            setReplicas(Arrays.asList()))),
-                new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
+                            setReplicas(asList()))),
+                new ReassignableTopic().setName("bar").setPartitions(asList(
                         new ReassignablePartition().setPartitionIndex(0).
-                            setReplicas(Arrays.asList(1, 2, 3, 4, 0)))))));
+                            setReplicas(asList(1, 2, 3, 4, 0)))))));
         assertEquals(new AlterPartitionReassignmentsResponseData().
-                setErrorMessage(null).setResponses(Arrays.asList(
-            new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+                setErrorMessage(null).setResponses(asList(
+            new ReassignableTopicResponse().setName("foo").setPartitions(asList(
                 new ReassignablePartitionResponse().setPartitionIndex(0).
                     setErrorMessage(null),
                 new ReassignablePartitionResponse().setPartitionIndex(1).
@@ -959,7 +1021,7 @@ public class ReplicationControlManagerTest {
                     setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
                     setErrorMessage("The manual partition assignment includes an empty " +
                         "replica list."))),
-            new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
+            new ReassignableTopicResponse().setName("bar").setPartitions(asList(
                 new ReassignablePartitionResponse().setPartitionIndex(0).
                     setErrorMessage(null))))),
             alterResult.response());
@@ -972,55 +1034,55 @@ public class ReplicationControlManagerTest {
             new int[] {}, new int[] {0, 1}, 4, 1, 2), replication.getPartition(barId, 0));
         ListPartitionReassignmentsResponseData currentReassigning =
             new ListPartitionReassignmentsResponseData().setErrorMessage(null).
-                setTopics(Arrays.asList(new OngoingTopicReassignment().
-                    setName("bar").setPartitions(Arrays.asList(
+                setTopics(asList(new OngoingTopicReassignment().
+                    setName("bar").setPartitions(asList(
                     new OngoingPartitionReassignment().setPartitionIndex(0).
                         setRemovingReplicas(Collections.emptyList()).
-                        setAddingReplicas(Arrays.asList(0, 1)).
-                        setReplicas(Arrays.asList(1, 2, 3, 4, 0))))));
+                        setAddingReplicas(asList(0, 1)).
+                        setReplicas(asList(1, 2, 3, 4, 0))))));
         assertEquals(currentReassigning, replication.listPartitionReassignments(null));
-        assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList(
+        assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(asList(
             new ListPartitionReassignmentsTopics().setName("foo").
-                setPartitionIndexes(Arrays.asList(0, 1, 2)))));
-        assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList(
+                setPartitionIndexes(asList(0, 1, 2)))));
+        assertEquals(currentReassigning, replication.listPartitionReassignments(asList(
             new ListPartitionReassignmentsTopics().setName("bar").
-                setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+                setPartitionIndexes(asList(0, 1, 2)))));
         ControllerResult<AlterIsrResponseData> alterIsrResult = replication.alterIsr(
             new AlterIsrRequestData().setBrokerId(4).setBrokerEpoch(104).
-                setTopics(Arrays.asList(new TopicData().setName("bar").setPartitions(Arrays.asList(
+                setTopics(asList(new TopicData().setName("bar").setPartitions(asList(
                     new PartitionData().setPartitionIndex(0).setCurrentIsrVersion(2).
-                        setLeaderEpoch(1).setNewIsr(Arrays.asList(4, 1, 2, 3, 0)))))));
-        assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
-            new AlterIsrResponseData.TopicData().setName("bar").setPartitions(Arrays.asList(
+                        setLeaderEpoch(1).setNewIsr(asList(4, 1, 2, 3, 0)))))));
+        assertEquals(new AlterIsrResponseData().setTopics(asList(
+            new AlterIsrResponseData.TopicData().setName("bar").setPartitions(asList(
                 new AlterIsrResponseData.PartitionData().
                     setPartitionIndex(0).
                     setLeaderId(4).
                     setLeaderEpoch(1).
-                    setIsr(Arrays.asList(4, 1, 2, 3, 0)).
+                    setIsr(asList(4, 1, 2, 3, 0)).
                     setCurrentIsrVersion(3).
                     setErrorCode(NONE.code()))))),
             alterIsrResult.response());
         ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
             replication.alterPartitionReassignments(
-                new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
-                    new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+                new AlterPartitionReassignmentsRequestData().setTopics(asList(
+                    new ReassignableTopic().setName("foo").setPartitions(asList(
                         new ReassignablePartition().setPartitionIndex(0).
                             setReplicas(null))),
-                    new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
+                    new ReassignableTopic().setName("bar").setPartitions(asList(
                         new ReassignablePartition().setPartitionIndex(0).
                             setReplicas(null))))));
         assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
                 new PartitionChangeRecord().setTopicId(barId).
                     setPartitionId(0).
                     setLeader(4).
-                    setReplicas(Arrays.asList(2, 3, 4)).
+                    setReplicas(asList(2, 3, 4)).
                     setRemovingReplicas(null).
                     setAddingReplicas(Collections.emptyList()), (short) 0)),
-            new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(
-                new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+            new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
+                new ReassignableTopicResponse().setName("foo").setPartitions(asList(
                     new ReassignablePartitionResponse().setPartitionIndex(0).
                         setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))),
-                new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
+                new ReassignableTopicResponse().setName("bar").setPartitions(asList(
                     new ReassignablePartitionResponse().setPartitionIndex(0).
                         setErrorMessage(null)))))),
             cancelResult);
@@ -1052,6 +1114,160 @@ public class ReplicationControlManagerTest {
             ctx.replicationControl.getPartition(fooId, 1));
     }
 
+    private void assertLeaderAndIsr(
+        ReplicationControlManager replication,
+        TopicIdPartition topicIdPartition,
+        int leaderId,
+        int[] isr
+    ) {
+        PartitionRegistration registration = replication.getPartition(
+            topicIdPartition.topicId(),
+            topicIdPartition.partitionId()
+        );
+        assertArrayEquals(isr, registration.isr);
+        assertEquals(leaderId, registration.leader);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testElectUncleanLeaders(boolean electAllPartitions) throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
+
+        TopicIdPartition partition0 = new TopicIdPartition(fooId, 0);
+        TopicIdPartition partition1 = new TopicIdPartition(fooId, 1);
+        TopicIdPartition partition2 = new TopicIdPartition(fooId, 2);
+
+        ctx.fenceBrokers(Utils.mkSet(2, 3));
+        ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
+
+        assertLeaderAndIsr(replication, partition0, NO_LEADER, new int[]{1});
+        assertLeaderAndIsr(replication, partition1, 4, new int[]{4});
+        assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
+
+        ElectLeadersRequestData request = buildElectLeadersRequest(
+            ElectionType.UNCLEAN,
+            electAllPartitions ? null : singletonMap("foo", asList(0, 1, 2))
+        );
+
+        // No election can be done yet because no replicas are available for partition 0
+        ControllerResult<ElectLeadersResponseData> result1 = replication.electLeaders(request);
+        assertEquals(Collections.emptyList(), result1.records());
+
+        ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, electAllPartitions, Utils.mkMap(
+            Utils.mkEntry(
+                new TopicPartition("foo", 0),
+                new ApiError(ELIGIBLE_LEADERS_NOT_AVAILABLE)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 1),
+                new ApiError(ELECTION_NOT_NEEDED)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 2),
+                new ApiError(ELECTION_NOT_NEEDED)
+            )
+        ));
+        assertElectLeadersResponse(expectedResponse1, result1.response());
+
+        // Now we bring 2 back online which should allow the unclean election of partition 0
+        ctx.unfenceBrokers(Utils.mkSet(2));
+
+        // Bring 2 back into the ISR for partition 1. This allows us to verify that
+        // preferred election does not occur as a result of the unclean election request.
+        ctx.alterIsr(partition1, 4, asList(2, 4));
+
+        ControllerResult<ElectLeadersResponseData> result = replication.electLeaders(request);
+        assertEquals(1, result.records().size());
+
+        ApiMessageAndVersion record = result.records().get(0);
+        assertTrue(record.message() instanceof PartitionChangeRecord);
+
+        PartitionChangeRecord partitionChangeRecord = (PartitionChangeRecord) record.message();
+        assertEquals(0, partitionChangeRecord.partitionId());
+        assertEquals(2, partitionChangeRecord.leader());
+        assertEquals(singletonList(2), partitionChangeRecord.isr());
+        ctx.replay(result.records());
+
+        assertLeaderAndIsr(replication, partition0, 2, new int[]{2});
+        assertLeaderAndIsr(replication, partition1, 4, new int[]{2, 4});
+        assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
+
+        ElectLeadersResponseData expectedResponse = buildElectLeadersResponse(NONE, electAllPartitions, Utils.mkMap(
+            Utils.mkEntry(
+                new TopicPartition("foo", 0),
+                ApiError.NONE
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 1),
+                new ApiError(ELECTION_NOT_NEEDED)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 2),
+                new ApiError(ELECTION_NOT_NEEDED)
+            )
+        ));
+        assertElectLeadersResponse(expectedResponse, result.response());
+    }
+
+    @Test
+    public void testPreferredElectionDoesNotTriggerUncleanElection() throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(1, 2, 3, 4);
+        ctx.unfenceBrokers(1, 2, 3, 4);
+
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{new int[]{1, 2, 3}}).topicId();
+        TopicIdPartition partition = new TopicIdPartition(fooId, 0);
+
+        ctx.fenceBrokers(Utils.mkSet(2, 3));
+        ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
+        ctx.unfenceBrokers(Utils.mkSet(2));
+
+        assertLeaderAndIsr(replication, partition, NO_LEADER, new int[]{1});
+
+        ctx.alterTopicConfig("foo", "unclean.leader.election.enable", "true");
+
+        ElectLeadersRequestData request = buildElectLeadersRequest(
+            ElectionType.PREFERRED,
+            singletonMap("foo", singletonList(0))
+        );
+
+        // No election should be done even though unclean election is available
+        ControllerResult<ElectLeadersResponseData> result = replication.electLeaders(request);
+        assertEquals(Collections.emptyList(), result.records());
+
+        ElectLeadersResponseData expectedResponse = buildElectLeadersResponse(NONE, false, singletonMap(
+            new TopicPartition("foo", 0), new ApiError(PREFERRED_LEADER_NOT_AVAILABLE)
+        ));
+        assertEquals(expectedResponse, result.response());
+    }
+
+    private ElectLeadersRequestData buildElectLeadersRequest(
+        ElectionType electionType,
+        Map<String, List<Integer>> partitions
+    ) {
+        ElectLeadersRequestData request = new ElectLeadersRequestData().
+            setElectionType(electionType.value);
+
+        if (partitions == null) {
+            request.setTopicPartitions(null);
+        } else {
+            partitions.forEach((topic, partitionIds) -> {
+                request.topicPartitions().add(new TopicPartitions()
+                    .setTopic(topic)
+                    .setPartitions(partitionIds)
+                );
+            });
+        }
+        return request;
+    }
+
     @Test
     public void testFenceMultipleBrokers() throws Exception {
         ReplicationControlTestContext ctx = new ReplicationControlTestContext();
@@ -1083,7 +1299,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testElectLeaders() throws Exception {
+    public void testElectPreferredLeaders() throws Exception {
         ReplicationControlTestContext ctx = new ReplicationControlTestContext();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3, 4);
@@ -1091,60 +1307,130 @@ public class ReplicationControlManagerTest {
         Uuid fooId = ctx.createTestTopic("foo", new int[][]{
             new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
         ElectLeadersRequestData request1 = new ElectLeadersRequestData().
-            setElectionType((byte) 0).
-            setTopicPartitions(new TopicPartitionsCollection(Arrays.asList(
+            setElectionType(ElectionType.PREFERRED.value).
+            setTopicPartitions(new TopicPartitionsCollection(asList(
                 new TopicPartitions().setTopic("foo").
-                    setPartitions(Arrays.asList(0, 1)),
+                    setPartitions(asList(0, 1)),
                 new TopicPartitions().setTopic("bar").
-                    setPartitions(Arrays.asList(0, 1))).iterator()));
+                    setPartitions(asList(0, 1))).iterator()));
         ControllerResult<ElectLeadersResponseData> election1Result =
             replication.electLeaders(request1);
-        ElectLeadersResponseData expectedResponse1 = new ElectLeadersResponseData().
-            setErrorCode((short) 0).
-            setReplicaElectionResults(Arrays.asList(
-                new ReplicaElectionResult().setTopic("foo").
-                    setPartitionResult(Arrays.asList(
-                        new PartitionResult().setPartitionId(0).
-                            setErrorCode(NONE.code()).
-                            setErrorMessage(null),
-                        new PartitionResult().setPartitionId(1).
-                            setErrorCode(NONE.code()).
-                            setErrorMessage(null))),
-                new ReplicaElectionResult().setTopic("bar").
-                    setPartitionResult(Arrays.asList(
-                        new PartitionResult().setPartitionId(0).
-                            setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
-                            setErrorMessage("No such topic as bar"),
-                        new PartitionResult().setPartitionId(1).
-                            setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
-                            setErrorMessage("No such topic as bar")))));
-        assertEquals(expectedResponse1, election1Result.response());
+        ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, false, Utils.mkMap(
+            Utils.mkEntry(
+                new TopicPartition("foo", 0),
+                new ApiError(PREFERRED_LEADER_NOT_AVAILABLE)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 1),
+                new ApiError(ELECTION_NOT_NEEDED)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("bar", 0),
+                new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
+            ),
+            Utils.mkEntry(
+                new TopicPartition("bar", 1),
+                new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
+            )
+        ));
+        assertElectLeadersResponse(expectedResponse1, election1Result.response());
         assertEquals(Collections.emptyList(), election1Result.records());
         ctx.unfenceBrokers(0, 1);
 
         ControllerResult<AlterIsrResponseData> alterIsrResult = replication.alterIsr(
             new AlterIsrRequestData().setBrokerId(2).setBrokerEpoch(102).
-                setTopics(Arrays.asList(new AlterIsrRequestData.TopicData().setName("foo").
-                    setPartitions(Arrays.asList(new AlterIsrRequestData.PartitionData().
+                setTopics(asList(new AlterIsrRequestData.TopicData().setName("foo").
+                    setPartitions(asList(new AlterIsrRequestData.PartitionData().
                         setPartitionIndex(0).setCurrentIsrVersion(0).
-                        setLeaderEpoch(0).setNewIsr(Arrays.asList(1, 2, 3)))))));
-        assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
-            new AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList(
+                        setLeaderEpoch(0).setNewIsr(asList(1, 2, 3)))))));
+        assertEquals(new AlterIsrResponseData().setTopics(asList(
+            new AlterIsrResponseData.TopicData().setName("foo").setPartitions(asList(
                 new AlterIsrResponseData.PartitionData().
                     setPartitionIndex(0).
                     setLeaderId(2).
                     setLeaderEpoch(0).
-                    setIsr(Arrays.asList(1, 2, 3)).
+                    setIsr(asList(1, 2, 3)).
                     setCurrentIsrVersion(1).
                     setErrorCode(NONE.code()))))),
             alterIsrResult.response());
+
+        ElectLeadersResponseData expectedResponse2 = buildElectLeadersResponse(NONE, false, Utils.mkMap(
+            Utils.mkEntry(
+                new TopicPartition("foo", 0),
+                ApiError.NONE
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 1),
+                new ApiError(ELECTION_NOT_NEEDED)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("bar", 0),
+                new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
+            ),
+            Utils.mkEntry(
+                new TopicPartition("bar", 1),
+                new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
+            )
+        ));
+
         ctx.replay(alterIsrResult.records());
         ControllerResult<ElectLeadersResponseData> election2Result =
             replication.electLeaders(request1);
-        assertEquals(expectedResponse1, election2Result.response());
-        assertEquals(Arrays.asList(new ApiMessageAndVersion(new PartitionChangeRecord().
+        assertElectLeadersResponse(expectedResponse2, election2Result.response());
+        assertEquals(asList(new ApiMessageAndVersion(new PartitionChangeRecord().
             setPartitionId(0).
             setTopicId(fooId).
             setLeader(1), (short) 0)), election2Result.records());
     }
+
+    private void assertElectLeadersResponse(
+        ElectLeadersResponseData expected,
+        ElectLeadersResponseData actual
+    ) {
+        assertEquals(Errors.forCode(expected.errorCode()), Errors.forCode(actual.errorCode()));
+        assertEquals(collectElectLeadersErrors(expected), collectElectLeadersErrors(actual));
+    }
+
+    private Map<TopicPartition, PartitionResult> collectElectLeadersErrors(ElectLeadersResponseData response) {
+        Map<TopicPartition, PartitionResult> res = new HashMap<>();
+        response.replicaElectionResults().forEach(topicResult -> {
+            String topic = topicResult.topic();
+            topicResult.partitionResult().forEach(partitionResult -> {
+                TopicPartition topicPartition = new TopicPartition(topic, partitionResult.partitionId());
+                res.put(topicPartition, partitionResult);
+            });
+        });
+        return res;
+    }
+
+    private ElectLeadersResponseData buildElectLeadersResponse(
+        Errors topLevelError,
+        boolean electAllPartitions,
+        Map<TopicPartition, ApiError> errors
+    ) {
+        Map<String, List<Map.Entry<TopicPartition, ApiError>>> errorsByTopic = errors.entrySet().stream()
+            .collect(Collectors.groupingBy(entry -> entry.getKey().topic()));
+
+        ElectLeadersResponseData response = new ElectLeadersResponseData()
+            .setErrorCode(topLevelError.code());
+
+        errorsByTopic.forEach((topic, partitionErrors) -> {
+            ReplicaElectionResult electionResult = new ReplicaElectionResult().setTopic(topic);
+            electionResult.setPartitionResult(partitionErrors.stream()
+                .filter(entry -> !electAllPartitions || entry.getValue().error() != ELECTION_NOT_NEEDED)
+                .map(entry -> {
+                    TopicPartition topicPartition = entry.getKey();
+                    ApiError error = entry.getValue();
+                    return new PartitionResult()
+                        .setPartitionId(topicPartition.partition())
+                        .setErrorCode(error.error().code())
+                        .setErrorMessage(error.message());
+                })
+                .collect(Collectors.toList()));
+            response.replicaElectionResults().add(electionResult);
+        });
+
+        return response;
+    }
+
 }