You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/06/14 11:12:54 UTC

[kafka] branch trunk updated: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 2) (#12181)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f83d95d9a2 KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 2) (#12181)
f83d95d9a2 is described below

commit f83d95d9a28267f7ef7a7b1e584dcdb4aa842210
Author: David Jacot <dj...@confluent.io>
AuthorDate: Tue Jun 14 13:12:45 2022 +0200

    KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 2) (#12181)
    
    This path implements [KIP-841](https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft). Specifically, it implements the following:
    * It introduces INELIGIBLE_REPLICA and NEW_LEADER_ELECTED error codes.
    * The KRaft controller validates the new ISR provided in the AlterPartition request and rejects the call if any replica in the new ISR is not eligible to join the the ISR - e.g. when fenced or shutting down. The leader reverts to the last committed ISR when its request is rejected due to this.
    * The partition leader also verifies that a replica is eligible before trying to add it back to the ISR. If it is not eligible, the ISR expansion is not triggered at all.
    * Updates the AlterPartition API to use topic ids. Updates the AlterPartition manger to handle topic names/ids. Updates the ZK controller and the KRaft controller to handle topic names/ids depending on the version of the request used.
    
    Reviewers: Artem Livshits <84...@users.noreply.github.com>, José Armando García Sancio <js...@users.noreply.github.com>, Jason Gustafson <ja...@confluent.io>
---
 .../common/errors/IneligibleReplicaException.java  |  23 ++
 .../common/errors/NewLeaderElectedException.java   |  23 ++
 .../org/apache/kafka/common/protocol/Errors.java   |   6 +-
 .../common/requests/AlterPartitionRequest.java     |  21 +-
 .../common/message/AlterPartitionRequest.json      |   9 +-
 .../common/message/AlterPartitionResponse.json     |  10 +-
 .../kafka/common/requests/RequestResponseTest.java |  10 +-
 core/src/main/scala/kafka/cluster/Partition.scala  | 185 +++++++----
 .../scala/kafka/controller/KafkaController.scala   | 351 +++++++++++----------
 .../scala/kafka/server/AlterPartitionManager.scala | 156 ++++++---
 .../main/scala/kafka/server/ControllerApis.scala   |  48 +--
 core/src/main/scala/kafka/server/KafkaApis.scala   |   5 +-
 .../kafka/server/ZkAlterPartitionManager.scala     |  13 +-
 .../kafka/server/metadata/KRaftMetadataCache.scala |  10 +-
 .../unit/kafka/cluster/AbstractPartitionTest.scala |   2 +-
 .../unit/kafka/cluster/PartitionLockTest.scala     |  13 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 207 ++++++++++++
 .../controller/ControllerIntegrationTest.scala     | 337 +++++++++++++++-----
 .../kafka/integration/KafkaServerTestHarness.scala |   4 +-
 .../kafka/server/AlterPartitionManagerTest.scala   | 322 +++++++++++++------
 .../unit/kafka/server/ControllerApisTest.scala     |   4 +-
 .../unit/kafka/server/MetadataCacheTest.scala      |  64 +++-
 .../server/ReplicaManagerConcurrencyTest.scala     |  32 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  12 +-
 .../kafka/controller/ControllerRequestContext.java |  29 +-
 .../apache/kafka/controller/QuorumController.java  |   2 +-
 .../controller/ReplicationControlManager.java      |  68 +++-
 .../authorizer/ClusterMetadataAuthorizer.java      |   4 +-
 .../controller/ControllerRequestContextUtil.java}  |  50 ++-
 .../kafka/controller/QuorumControllerTest.java     |   9 +-
 .../controller/ReplicationControlManagerTest.java  | 306 ++++++++++++++----
 32 files changed, 1709 insertions(+), 628 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/errors/IneligibleReplicaException.java b/clients/src/main/java/org/apache/kafka/common/errors/IneligibleReplicaException.java
new file mode 100644
index 0000000000..6c79add033
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/IneligibleReplicaException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class IneligibleReplicaException extends ApiException {
+    public IneligibleReplicaException(String message) {
+        super(message);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NewLeaderElectedException.java b/clients/src/main/java/org/apache/kafka/common/errors/NewLeaderElectedException.java
new file mode 100644
index 0000000000..20fd869df9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NewLeaderElectedException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class NewLeaderElectedException extends ApiException {
+    public NewLeaderElectedException(String message) {
+        super(message);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 5db9717906..2ca42bafcf 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -52,6 +52,7 @@ import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
 import org.apache.kafka.common.errors.InconsistentTopicIdException;
 import org.apache.kafka.common.errors.InconsistentVoterSetException;
 import org.apache.kafka.common.errors.InconsistentClusterIdException;
+import org.apache.kafka.common.errors.IneligibleReplicaException;
 import org.apache.kafka.common.errors.InvalidCommitOffsetSizeException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.InvalidFetchSessionEpochException;
@@ -77,6 +78,7 @@ import org.apache.kafka.common.errors.ListenerNotFoundException;
 import org.apache.kafka.common.errors.LogDirNotFoundException;
 import org.apache.kafka.common.errors.MemberIdRequiredException;
 import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.NewLeaderElectedException;
 import org.apache.kafka.common.errors.NoReassignmentInProgressException;
 import org.apache.kafka.common.errors.NotControllerException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
@@ -364,7 +366,9 @@ public enum Errors {
     INCONSISTENT_TOPIC_ID(103, "The log's topic ID did not match the topic ID in the request", InconsistentTopicIdException::new),
     INCONSISTENT_CLUSTER_ID(104, "The clusterId in the request does not match that found on the server", InconsistentClusterIdException::new),
     TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", TransactionalIdNotFoundException::new),
-    FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new);
+    FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new),
+    INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible replica.", IneligibleReplicaException::new),
+    NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
index 091bed6173..2d246f2104 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
@@ -45,8 +45,8 @@ public class AlterPartitionRequest extends AbstractRequest {
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         return new AlterPartitionResponse(new AlterPartitionResponseData()
-                .setThrottleTimeMs(throttleTimeMs)
-                .setErrorCode(Errors.forException(e).code()));
+            .setThrottleTimeMs(throttleTimeMs)
+            .setErrorCode(Errors.forException(e).code()));
     }
 
     public static AlterPartitionRequest parse(ByteBuffer buffer, short version) {
@@ -57,8 +57,21 @@ public class AlterPartitionRequest extends AbstractRequest {
 
         private final AlterPartitionRequestData data;
 
-        public Builder(AlterPartitionRequestData data) {
-            super(ApiKeys.ALTER_PARTITION);
+        /**
+         * Constructs a builder for AlterPartitionRequest.
+         *
+         * @param data The data to be sent. Note that because the version of the
+         *             request is not known at this time, it is expected that all
+         *             topics have a topic id and a topic name set.
+         * @param canUseTopicIds True if version 2 and above can be used.
+         */
+        public Builder(AlterPartitionRequestData data, boolean canUseTopicIds) {
+            super(
+                ApiKeys.ALTER_PARTITION,
+                ApiKeys.ALTER_PARTITION.oldestVersion(),
+                // Version 1 is the maximum version that can be used without topic ids.
+                canUseTopicIds ? ApiKeys.ALTER_PARTITION.latestVersion() : 1
+            );
             this.data = data;
         }
 
diff --git a/clients/src/main/resources/common/message/AlterPartitionRequest.json b/clients/src/main/resources/common/message/AlterPartitionRequest.json
index 97f02457de..d91f317f97 100644
--- a/clients/src/main/resources/common/message/AlterPartitionRequest.json
+++ b/clients/src/main/resources/common/message/AlterPartitionRequest.json
@@ -18,7 +18,10 @@
   "type": "request",
   "listeners": ["zkBroker", "controller"],
   "name": "AlterPartitionRequest",
-  "validVersions": "0-1",
+  // Version 1 adds LeaderRecoveryState field (KIP-704).
+  //
+  // Version 2 adds TopicId field to replace TopicName field (KIP-841).
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@@ -26,8 +29,10 @@
     { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
       "about": "The epoch of the requesting broker" },
     { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
-      { "name":  "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+      { "name":  "TopicName", "type": "string", "versions": "0-1", "ignorable": true, "entityType": "topicName",
         "about": "The name of the topic to alter ISRs for" },
+      { "name":  "TopicId", "type": "uuid", "versions": "2+", "ignorable": true,
+        "about": "The ID of the topic to alter ISRs for" },
       { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
           "about": "The partition index" },
diff --git a/clients/src/main/resources/common/message/AlterPartitionResponse.json b/clients/src/main/resources/common/message/AlterPartitionResponse.json
index aaeb5cfad7..e8be99fd5e 100644
--- a/clients/src/main/resources/common/message/AlterPartitionResponse.json
+++ b/clients/src/main/resources/common/message/AlterPartitionResponse.json
@@ -17,7 +17,11 @@
   "apiKey": 56,
   "type": "response",
   "name": "AlterPartitionResponse",
-  "validVersions": "0-1",
+  // Version 1 adds LeaderRecoveryState field (KIP-704).
+  //
+  // Version 2 adds TopicId field to replace TopicName field, can return the following new errors:
+  // INELIGIBLE_REPLICA, NEW_LEADER_ELECTED and UNKNOWN_TOPIC_ID (KIP-841).
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
@@ -25,8 +29,10 @@
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The top level response error code" },
     { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
-      { "name":  "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+      { "name": "TopicName", "type": "string", "versions": "0-1", "ignorable": true, "entityType": "topicName",
         "about": "The name of the topic" },
+      { "name":  "TopicId", "type": "uuid", "versions": "2+", "ignorable": true,
+        "about": "The ID of the topic" },
       { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
           "about": "The partition index" },
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 138a9c0333..13f47a9771 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1321,9 +1321,10 @@ public class RequestResponseTest {
             .setBrokerEpoch(123L)
             .setBrokerId(1)
             .setTopics(singletonList(new AlterPartitionRequestData.TopicData()
-                .setName("topic1")
+                .setTopicName("topic1")
+                .setTopicId(Uuid.randomUuid())
                 .setPartitions(singletonList(partitionData))));
-        return new AlterPartitionRequest.Builder(data).build(version);
+        return new AlterPartitionRequest.Builder(data, version >= 1).build(version);
     }
 
     private AlterPartitionResponse createAlterPartitionResponse(int version) {
@@ -1343,8 +1344,9 @@ public class RequestResponseTest {
                 .setErrorCode(Errors.NONE.code())
                 .setThrottleTimeMs(123)
                 .setTopics(singletonList(new AlterPartitionResponseData.TopicData()
-                        .setName("topic1")
-                        .setPartitions(singletonList(partitionData))));
+                    .setTopicName("topic1")
+                    .setTopicId(Uuid.randomUuid())
+                    .setPartitions(singletonList(partitionData))));
         return new AlterPartitionResponse(data);
     }
 
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 346d2ed184..046e21c1f9 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -19,7 +19,6 @@ package kafka.cluster
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import java.util.Optional
 import java.util.concurrent.CompletableFuture
-
 import kafka.api.LeaderAndIsr
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.{KafkaController, StateChangeLogger}
@@ -27,9 +26,11 @@ import kafka.log._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpoints
+import kafka.server.metadata.KRaftMetadataCache
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import kafka.zookeeper.ZooKeeperClientException
+import org.apache.kafka.common.TopicIdPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.message.{DescribeProducersResponseData, FetchResponseData}
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@@ -143,7 +144,6 @@ sealed trait PartitionState {
    * the high watermark as well as determining which replicas are required for acks=all produce requests.
    *
    * Only applicable as of IBP 2.7-IV2, for older versions this will return the committed ISR
-   *
    */
   def maximalIsr: Set[Int]
 
@@ -159,48 +159,61 @@ sealed trait PartitionState {
 }
 
 sealed trait PendingPartitionChange extends PartitionState {
+  def lastCommittedState: CommittedPartitionState
   def sentLeaderAndIsr: LeaderAndIsr
 
   override val leaderRecoveryState: LeaderRecoveryState = LeaderRecoveryState.RECOVERED
+
+  def notifyListener(alterPartitionListener: AlterPartitionListener): Unit
 }
 
 case class PendingExpandIsr(
-  isr: Set[Int],
   newInSyncReplicaId: Int,
-  sentLeaderAndIsr: LeaderAndIsr
+  sentLeaderAndIsr: LeaderAndIsr,
+  lastCommittedState: CommittedPartitionState
 ) extends PendingPartitionChange {
+  val isr = lastCommittedState.isr
   val maximalIsr = isr + newInSyncReplicaId
   val isInflight = true
 
+  def notifyListener(alterPartitionListener: AlterPartitionListener): Unit = {
+    alterPartitionListener.markIsrExpand()
+  }
+
   override def toString: String = {
-    s"PendingExpandIsr(isr=$isr" +
-    s", newInSyncReplicaId=$newInSyncReplicaId" +
+    s"PendingExpandIsr(newInSyncReplicaId=$newInSyncReplicaId" +
     s", sentLeaderAndIsr=$sentLeaderAndIsr" +
     s", leaderRecoveryState=$leaderRecoveryState" +
+    s", lastCommittedState=$lastCommittedState" +
     ")"
   }
 }
 
 case class PendingShrinkIsr(
-  isr: Set[Int],
   outOfSyncReplicaIds: Set[Int],
-  sentLeaderAndIsr: LeaderAndIsr
+  sentLeaderAndIsr: LeaderAndIsr,
+  lastCommittedState: CommittedPartitionState
 ) extends PendingPartitionChange  {
+  val isr = lastCommittedState.isr
   val maximalIsr = isr
   val isInflight = true
 
+  def notifyListener(alterPartitionListener: AlterPartitionListener): Unit = {
+    alterPartitionListener.markIsrShrink()
+  }
+
   override def toString: String = {
-    s"PendingShrinkIsr(isr=$isr" +
-    s", outOfSyncReplicaIds=$outOfSyncReplicaIds" +
+    s"PendingShrinkIsr(outOfSyncReplicaIds=$outOfSyncReplicaIds" +
     s", sentLeaderAndIsr=$sentLeaderAndIsr" +
     s", leaderRecoveryState=$leaderRecoveryState" +
+    s", lastCommittedState=$lastCommittedState" +
     ")"
   }
 }
 
 case class CommittedPartitionState(
   isr: Set[Int],
-  override val leaderRecoveryState: LeaderRecoveryState
+  leaderRecoveryState: LeaderRecoveryState
 ) extends PartitionState {
   val maximalIsr = isr
   val isInflight = false
@@ -834,10 +847,11 @@ class Partition(val topicPartition: TopicPartition,
     if (needsIsrUpdate) {
       val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock) {
         // check if this replica needs to be added to the ISR
-        if (!partitionState.isInflight && needsExpandIsr(followerReplica)) {
-          Some(prepareIsrExpand(followerReplica.brokerId))
-        } else {
-          None
+        partitionState match {
+          case currentState: CommittedPartitionState if needsExpandIsr(followerReplica) =>
+            Some(prepareIsrExpand(currentState, followerReplica.brokerId))
+          case _ =>
+            None
         }
       }
       // Send the AlterPartition request outside of the LeaderAndIsr lock since the completion logic
@@ -847,21 +861,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   private def needsExpandIsr(followerReplica: Replica): Boolean = {
-    canAddReplicaToIsr(followerReplica.brokerId) && isFollowerAtHighwatermark(followerReplica)
+    canAddReplicaToIsr(followerReplica.brokerId) && isFollowerInSync(followerReplica)
   }
 
   private def canAddReplicaToIsr(followerReplicaId: Int): Boolean = {
     val current = partitionState
-    !current.isInflight && !current.isr.contains(followerReplicaId)
+    !current.isInflight &&
+      !current.isr.contains(followerReplicaId) &&
+      isReplicaIsrEligible(followerReplicaId)
   }
 
-  private def isFollowerAtHighwatermark(followerReplica: Replica): Boolean = {
+  private def isFollowerInSync(followerReplica: Replica): Boolean = {
     leaderLogIfLocal.exists { leaderLog =>
       val followerEndOffset = followerReplica.stateSnapshot.logEndOffset
       followerEndOffset >= leaderLog.highWatermark && leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)
     }
   }
 
+  private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
+    metadataCache match {
+      // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are
+      // allowed to join the ISR. This does not apply to ZK mode.
+      case kRaftMetadataCache: KRaftMetadataCache =>
+        !kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
+          !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId)
+
+      case _ => true
+    }
+  }
+
   /*
    * Returns a tuple where the first element is a boolean indicating whether enough replicas reached `requiredOffset`
    * and the second element is an error (which would be `Errors.NONE` for no error).
@@ -1009,21 +1037,22 @@ class Partition(val topicPartition: TopicPartition,
       val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock) {
         leaderLogIfLocal.flatMap { leaderLog =>
           val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)
-          if (!partitionState.isInflight && outOfSyncReplicaIds.nonEmpty) {
-            val outOfSyncReplicaLog = outOfSyncReplicaIds.map { replicaId =>
-              val logEndOffsetMessage = getReplica(replicaId)
-                .map(_.stateSnapshot.logEndOffset.toString)
-                .getOrElse("unknown")
-              s"(brokerId: $replicaId, endOffset: $logEndOffsetMessage)"
-            }.mkString(" ")
-            val newIsrLog = (partitionState.isr -- outOfSyncReplicaIds).mkString(",")
-            info(s"Shrinking ISR from ${partitionState.isr.mkString(",")} to $newIsrLog. " +
-              s"Leader: (highWatermark: ${leaderLog.highWatermark}, " +
-              s"endOffset: ${leaderLog.logEndOffset}). " +
-              s"Out of sync replicas: $outOfSyncReplicaLog.")
-            Some(prepareIsrShrink(outOfSyncReplicaIds))
-          } else {
-            None
+          partitionState match {
+            case currentState: CommittedPartitionState if outOfSyncReplicaIds.nonEmpty =>
+              val outOfSyncReplicaLog = outOfSyncReplicaIds.map { replicaId =>
+                val logEndOffsetMessage = getReplica(replicaId)
+                  .map(_.stateSnapshot.logEndOffset.toString)
+                  .getOrElse("unknown")
+                s"(brokerId: $replicaId, endOffset: $logEndOffsetMessage)"
+              }.mkString(" ")
+              val newIsrLog = (partitionState.isr -- outOfSyncReplicaIds).mkString(",")
+              info(s"Shrinking ISR from ${partitionState.isr.mkString(",")} to $newIsrLog. " +
+                s"Leader: (highWatermark: ${leaderLog.highWatermark}, " +
+                s"endOffset: ${leaderLog.logEndOffset}). " +
+                s"Out of sync replicas: $outOfSyncReplicaLog.")
+              Some(prepareIsrShrink(currentState, outOfSyncReplicaIds))
+            case _ =>
+              None
           }
         }
       }
@@ -1496,33 +1525,63 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def prepareIsrExpand(newInSyncReplicaId: Int): PendingExpandIsr = {
+  private def prepareIsrExpand(
+    currentState: CommittedPartitionState,
+    newInSyncReplicaId: Int
+  ): PendingExpandIsr = {
     // When expanding the ISR, we assume that the new replica will make it into the ISR
     // before we receive confirmation that it has. This ensures that the HW will already
     // reflect the updated ISR even if there is a delay before we receive the confirmation.
     // Alternatively, if the update fails, no harm is done since the expanded ISR puts
     // a stricter requirement for advancement of the HW.
     val isrToSend = partitionState.isr + newInSyncReplicaId
-    val newLeaderAndIsr = LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, partitionState.leaderRecoveryState, partitionEpoch)
-    val updatedState = PendingExpandIsr(partitionState.isr, newInSyncReplicaId, newLeaderAndIsr)
+    val newLeaderAndIsr = LeaderAndIsr(
+      localBrokerId,
+      leaderEpoch,
+      isrToSend.toList,
+      partitionState.leaderRecoveryState,
+      partitionEpoch
+    )
+    val updatedState = PendingExpandIsr(
+      newInSyncReplicaId,
+      newLeaderAndIsr,
+      currentState
+    )
     partitionState = updatedState
     updatedState
   }
 
-  private[cluster] def prepareIsrShrink(outOfSyncReplicaIds: Set[Int]): PendingShrinkIsr = {
+  private[cluster] def prepareIsrShrink(
+    currentState: CommittedPartitionState,
+    outOfSyncReplicaIds: Set[Int]
+  ): PendingShrinkIsr = {
     // When shrinking the ISR, we cannot assume that the update will succeed as this could
     // erroneously advance the HW if the `AlterPartition` were to fail. Hence the "maximal ISR"
     // for `PendingShrinkIsr` is the the current ISR.
     val isrToSend = partitionState.isr -- outOfSyncReplicaIds
-    val newLeaderAndIsr = LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, partitionState.leaderRecoveryState, partitionEpoch)
-    val updatedState = PendingShrinkIsr(partitionState.isr, outOfSyncReplicaIds, newLeaderAndIsr)
+    val newLeaderAndIsr = LeaderAndIsr(
+      localBrokerId,
+      leaderEpoch,
+      isrToSend.toList,
+      partitionState.leaderRecoveryState,
+      partitionEpoch
+    )
+    val updatedState = PendingShrinkIsr(
+      outOfSyncReplicaIds,
+      newLeaderAndIsr,
+      currentState
+    )
     partitionState = updatedState
     updatedState
   }
 
   private def submitAlterPartition(proposedIsrState: PendingPartitionChange): CompletableFuture[LeaderAndIsr] = {
     debug(s"Submitting ISR state change $proposedIsrState")
-    val future = alterIsrManager.submit(topicPartition, proposedIsrState.sentLeaderAndIsr, controllerEpoch)
+    val future = alterIsrManager.submit(
+      new TopicIdPartition(topicId.getOrElse(Uuid.ZERO_UUID), topicPartition),
+      proposedIsrState.sentLeaderAndIsr,
+      controllerEpoch
+    )
     future.whenComplete { (leaderAndIsr, e) =>
       var hwIncremented = false
       var shouldRetry = false
@@ -1569,24 +1628,47 @@ class Partition(val topicPartition: TopicPartition,
   ): Boolean = {
     alterPartitionListener.markFailed()
     error match {
-      case Errors.OPERATION_NOT_ATTEMPTED =>
-        // Since the operation was not attempted, it is safe to reset back to the committed state.
-        partitionState = CommittedPartitionState(proposedIsrState.isr, LeaderRecoveryState.RECOVERED)
-        debug(s"Failed to alter partition to $proposedIsrState since there is a pending AlterPartition still inflight. " +
-          s"partition state has been reset to the latest committed state $partitionState")
+      case Errors.OPERATION_NOT_ATTEMPTED | Errors.INELIGIBLE_REPLICA =>
+        // Care must be taken when resetting to the last committed state since we may not
+        // know in general whether the request was applied or not taking into account retries
+        // and controller changes which might have occurred before we received the response.
+        // However, when the controller returns INELIGIBLE_REPLICA (or OPERATION_NOT_ATTEMPTED),
+        // the controller is explicitly telling us 1) that the current partition epoch is correct,
+        // and 2) that the request was not applied. Even if the controller that sent the response
+        // is stale, we are guaranteed from the monotonicity of the controller epoch that the
+        // request could not have been applied by any past or future controller.
+        partitionState = proposedIsrState.lastCommittedState
+        info(s"Failed to alter partition to $proposedIsrState since the controller rejected the request with $error. " +
+          s"Partition state has been reset to the latest committed state $partitionState.")
         false
       case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
         debug(s"Failed to alter partition to $proposedIsrState since the controller doesn't know about " +
-          "this topic or partition. Giving up.")
+          "this topic or partition. Partition state may be out of sync, awaiting new the latest metadata.")
+        false
+      case Errors.UNKNOWN_TOPIC_ID =>
+        debug(s"Failed to alter partition to $proposedIsrState since the controller doesn't know about " +
+          "this topic. Partition state may be out of sync, awaiting new the latest metadata.")
         false
       case Errors.FENCED_LEADER_EPOCH =>
-        debug(s"Failed to alter partition to $proposedIsrState since the leader epoch is old. Giving up.")
+        debug(s"Failed to alter partition to $proposedIsrState since the leader epoch is old. " +
+          "Partition state may be out of sync, awaiting new the latest metadata.")
         false
       case Errors.INVALID_UPDATE_VERSION =>
-        debug(s"Failed to alter partition to $proposedIsrState because the partition epoch is invalid. Giving up.")
+        debug(s"Failed to alter partition to $proposedIsrState because the partition epoch is invalid. " +
+          "Partition state may be out of sync, awaiting new the latest metadata.")
         false
       case Errors.INVALID_REQUEST =>
-        debug(s"Failed to alter partition to $proposedIsrState because the request is invalid. Giving up.")
+        debug(s"Failed to alter partition to $proposedIsrState because the request is invalid. " +
+          "Partition state may be out of sync, awaiting new the latest metadata.")
+        false
+      case Errors.NEW_LEADER_ELECTED =>
+        // The operation completed successfully but this replica got removed from the replica set by the controller
+        // while completing a ongoing reassignment. This replica is no longer the leader but it does not know it
+        // yet. It should remain in the current pending state until the metadata overrides it.
+        // This is only raised in KRaft mode.
+        debug(s"The alter partition request successfully updated the partition state to $proposedIsrState but " +
+          "this replica got removed from the replica set while completing a reassignment. " +
+          "Waiting on new metadata to clean up this replica.")
         false
       case _ =>
         warn(s"Failed to update ISR to $proposedIsrState due to unexpected $error. Retrying.")
@@ -1624,10 +1706,7 @@ class Partition(val topicPartition: TopicPartition,
       partitionEpoch = leaderAndIsr.partitionEpoch
       info(s"ISR updated to ${partitionState.isr.mkString(",")} and version updated to $partitionEpoch")
 
-      proposedIsrState match {
-        case PendingExpandIsr(_, _, _) => alterPartitionListener.markIsrExpand()
-        case PendingShrinkIsr(_, _, _) => alterPartitionListener.markIsrShrink()
-      }
+      proposedIsrState.notifyListener(alterPartitionListener)
 
       // we may need to increment high watermark since ISR could be down to 1
       leaderLogIfLocal.exists(log => maybeIncrementLeaderHW(log))
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 289de9ab29..8d16eb7e1d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -16,12 +16,10 @@
  */
 package kafka.controller
 
-import java.util
 import java.util.concurrent.TimeUnit
 import kafka.admin.AdminOperationException
 import kafka.api._
 import kafka.common._
-import kafka.controller.KafkaController.AlterPartitionCallback
 import kafka.cluster.Broker
 import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback, UpdateFeaturesCallback}
 import kafka.coordinator.transaction.ZkProducerIdManager
@@ -38,6 +36,7 @@ import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
 import org.apache.kafka.common.ElectionType
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException}
 import org.apache.kafka.common.message.{AllocateProducerIdsRequestData, AllocateProducerIdsResponseData, AlterPartitionRequestData, AlterPartitionResponseData}
 import org.apache.kafka.common.metrics.Metrics
@@ -66,7 +65,6 @@ object KafkaController extends Logging {
   type ElectLeadersCallback = Map[TopicPartition, Either[ApiError, Int]] => Unit
   type ListReassignmentsCallback = Either[Map[TopicPartition, ReplicaAssignment], ApiError] => Unit
   type AlterReassignmentsCallback = Either[Map[TopicPartition, ApiError], ApiError] => Unit
-  type AlterPartitionCallback = Either[Map[TopicPartition, Either[Errors, LeaderAndIsr]], Errors] => Unit
   type UpdateFeaturesCallback = Either[ApiError, Map[String, ApiError]] => Unit
 }
 
@@ -2225,197 +2223,226 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-  def alterPartitions(alterPartitionRequest: AlterPartitionRequestData, callback: AlterPartitionResponseData => Unit): Unit = {
-    val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+  def alterPartitions(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionRequestVersion,
+      callback
+    ))
+  }
 
-    alterPartitionRequest.topics.forEach { topicReq =>
-      topicReq.partitions.forEach { partitionReq =>
-        partitionsToAlter.put(
-          new TopicPartition(topicReq.name, partitionReq.partitionIndex),
-          LeaderAndIsr(
-            alterPartitionRequest.brokerId,
-            partitionReq.leaderEpoch,
-            partitionReq.newIsr().asScala.toList.map(_.toInt),
-            LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
-            partitionReq.partitionEpoch
-          )
-        )
-      }
+  private def processAlterPartition(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    val partitionResponses = try {
+      tryProcessAlterPartition(
+        alterPartitionRequest,
+        alterPartitionRequestVersion,
+        callback
+      )
+    } catch {
+      case e: Throwable =>
+        error(s"Error when processing AlterPartition: $alterPartitionRequest", e)
+        callback(new AlterPartitionResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code))
+        mutable.Map.empty
     }
 
-    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, LeaderAndIsr]], Errors]): Unit = {
-      val resp = new AlterPartitionResponseData()
-      results match {
-        case Right(error) =>
-          resp.setErrorCode(error.code)
-        case Left(partitionResults) =>
-          resp.setTopics(new util.ArrayList())
-          partitionResults
-            .groupBy { case (tp, _) => tp.topic }   // Group by topic
-            .foreach { case (topic, partitions) =>
-              // Add each topic part to the response
-              val topicResp = new AlterPartitionResponseData.TopicData()
-                .setName(topic)
-                .setPartitions(new util.ArrayList())
-              resp.topics.add(topicResp)
-              partitions.foreach { case (tp, errorOrIsr) =>
-                // Add each partition part to the response (new ISR or error)
-                errorOrIsr match {
-                  case Left(error) => topicResp.partitions.add(
-                    new AlterPartitionResponseData.PartitionData()
-                      .setPartitionIndex(tp.partition)
-                      .setErrorCode(error.code))
-                  case Right(leaderAndIsr) =>
-                    /* Setting the LeaderRecoveryState field is always safe because it will always be the same
-                     * as the value set in the request. For version 0, that is always the default RECOVERED
-                     * which is ignored when serializing to version 0. For any other version, the
-                     * LeaderRecoveryState field is supported.
-                     */
-                    topicResp.partitions.add(
-                      new AlterPartitionResponseData.PartitionData()
-                        .setPartitionIndex(tp.partition)
-                        .setLeaderId(leaderAndIsr.leader)
-                        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
-                        .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                        .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
-                        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
-                    )
-                }
-            }
-          }
+    // After we have returned the result of the `AlterPartition` request, we should check whether
+    // there are any reassignments which can be completed by a successful ISR expansion.
+    partitionResponses.forKeyValue { (topicPartition, partitionResponse) =>
+      if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
+        val isSuccessfulUpdate = partitionResponse.isRight
+        if (isSuccessfulUpdate) {
+          maybeCompleteReassignment(topicPartition)
+        }
       }
-      callback.apply(resp)
     }
-
-    eventManager.put(
-      AlterPartitionReceived(alterPartitionRequest.brokerId, alterPartitionRequest.brokerEpoch, partitionsToAlter, responseCallback)
-    )
   }
 
-  private def processAlterPartition(
-    brokerId: Int,
-    brokerEpoch: Long,
-    partitionsToAlter: Map[TopicPartition, LeaderAndIsr],
-    callback: AlterPartitionCallback
-  ): Unit = {
+  private def tryProcessAlterPartition(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): mutable.Map[TopicPartition, Either[Errors, LeaderAndIsr]] = {
+    val useTopicsIds = alterPartitionRequestVersion > 1
 
     // Handle a few short-circuits
     if (!isActive) {
-      callback.apply(Right(Errors.NOT_CONTROLLER))
-      return
+      callback(new AlterPartitionResponseData().setErrorCode(Errors.NOT_CONTROLLER.code))
+      return mutable.Map.empty
     }
 
+    val brokerId = alterPartitionRequest.brokerId
+    val brokerEpoch = alterPartitionRequest.brokerEpoch
     val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
     if (brokerEpochOpt.isEmpty) {
       info(s"Ignoring AlterPartition due to unknown broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
-      return
+      callback(new AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
+      return mutable.Map.empty
     }
 
     if (!brokerEpochOpt.contains(brokerEpoch)) {
       info(s"Ignoring AlterPartition due to stale broker epoch $brokerEpoch and local broker epoch $brokerEpochOpt for broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
-      return
+      callback(new AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
+      return mutable.Map.empty
     }
 
-    val response = try {
-      val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
-
-      // Determine which partitions we will accept the new ISR for
-      val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = partitionsToAlter.flatMap {
-        case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
-          controllerContext.partitionLeadershipInfo(tp) match {
-            case Some(leaderIsrAndControllerEpoch) =>
-              val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-              if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
-                partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
-                None
-              } else if (newLeaderAndIsr.partitionEpoch < currentLeaderAndIsr.partitionEpoch) {
-                partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
-                None
-              } else if (newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
-                // If a partition is already in the desired state, just return it
-                partitionResponses(tp) = Right(currentLeaderAndIsr)
-                None
-              } else if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
-                partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
-                info(
-                  s"Rejecting AlterPartition from node $brokerId for $tp because leader is recovering and ISR is greater than 1: " +
-                  s"$newLeaderAndIsr"
-                )
-                None
-              } else if (currentLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERED &&
-                newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING) {
-
-                partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
-                info(
-                  s"Rejecting AlterPartition from node $brokerId for $tp because the leader recovery state cannot change from " +
-                  s"RECOVERED to RECOVERING: $newLeaderAndIsr"
-                )
-                None
-              } else {
-                Some(tp -> newLeaderAndIsr)
-              }
-            case None =>
-              partitionResponses(tp) = Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
-              None
-          }
+    val partitionsToAlter = new mutable.HashMap[TopicPartition, LeaderAndIsr]()
+    val alterPartitionResponse = new AlterPartitionResponseData()
+
+    alterPartitionRequest.topics.forEach { topicReq =>
+      val topicNameOpt = if (useTopicsIds) {
+        controllerContext.topicName(topicReq.topicId)
+      } else {
+        Some(topicReq.topicName)
       }
 
-      // Do the updates in ZK
-      debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
-      val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) = zkClient.updateLeaderAndIsr(
-        adjustedIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
-
-      val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = finishedUpdates.flatMap {
-        case (partition: TopicPartition, isrOrError: Either[Throwable, LeaderAndIsr]) =>
-          isrOrError match {
-            case Right(updatedIsr) =>
-              debug(s"ISR for partition $partition updated to [${updatedIsr.isr.mkString(",")}] and zkVersion updated to [${updatedIsr.partitionEpoch}]")
-              partitionResponses(partition) = Right(updatedIsr)
-              Some(partition -> updatedIsr)
-            case Left(e) =>
-              error(s"Failed to update ISR for partition $partition", e)
-              partitionResponses(partition) = Left(Errors.forException(e))
-              None
+      topicNameOpt match {
+        case None =>
+          val topicResponse = new AlterPartitionResponseData.TopicData()
+            .setTopicId(topicReq.topicId)
+          alterPartitionResponse.topics.add(topicResponse)
+          topicReq.partitions.forEach { partitionReq =>
+            topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+              .setPartitionIndex(partitionReq.partitionIndex)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
           }
-      }
 
-      badVersionUpdates.foreach { partition =>
-        info(s"Failed to update ISR to ${adjustedIsrs(partition)} for partition $partition, bad ZK version.")
-        partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
+        case Some(topicName) =>
+          topicReq.partitions.forEach { partitionReq =>
+            partitionsToAlter.put(
+              new TopicPartition(topicName, partitionReq.partitionIndex),
+              LeaderAndIsr(
+                alterPartitionRequest.brokerId,
+                partitionReq.leaderEpoch,
+                partitionReq.newIsr.asScala.toList.map(_.toInt),
+                LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
+                partitionReq.partitionEpoch
+              )
+            )
+          }
       }
+    }
+
+    val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
+    // Determine which partitions we will accept the new ISR for
+    val adjustedIsrs = partitionsToAlter.flatMap { case (tp, newLeaderAndIsr) =>
+      controllerContext.partitionLeadershipInfo(tp) match {
+        case Some(leaderIsrAndControllerEpoch) =>
+          val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+          if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
+            partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
+            None
+          } else if (newLeaderAndIsr.partitionEpoch < currentLeaderAndIsr.partitionEpoch) {
+            partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
+            None
+          } else if (newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
+            // If a partition is already in the desired state, just return it
+            partitionResponses(tp) = Right(currentLeaderAndIsr)
+            None
+          } else if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
+            partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
+            info(
+              s"Rejecting AlterPartition from node $brokerId for $tp because leader is recovering and ISR is greater than 1: " +
+              s"$newLeaderAndIsr"
+            )
+            None
+          } else if (currentLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERED &&
+            newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING) {
+
+            partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
+            info(
+              s"Rejecting AlterPartition from node $brokerId for $tp because the leader recovery state cannot change from " +
+              s"RECOVERED to RECOVERING: $newLeaderAndIsr"
+            )
+            None
+          } else {
+            Some(tp -> newLeaderAndIsr)
+          }
 
-      def processUpdateNotifications(partitions: Seq[TopicPartition]): Unit = {
-        val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
-        sendUpdateMetadataRequest(liveBrokers, partitions.toSet)
+        case None =>
+          partitionResponses(tp) = Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          None
       }
+    }
 
-      // Update our cache and send out metadata updates
-      updateLeaderAndIsrCache(successfulUpdates.keys.toSeq)
-      processUpdateNotifications(partitionsToAlter.keys.toSeq)
+    // Do the updates in ZK
+    debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+    val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) = zkClient.updateLeaderAndIsr(
+      adjustedIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
+
+    val successfulUpdates = finishedUpdates.flatMap { case (partition, isrOrError) =>
+      isrOrError match {
+        case Right(updatedIsr) =>
+          debug(s"ISR for partition $partition updated to $updatedIsr.")
+          partitionResponses(partition) = Right(updatedIsr)
+          Some(partition -> updatedIsr)
+        case Left(e) =>
+          error(s"Failed to update ISR for partition $partition", e)
+          partitionResponses(partition) = Left(Errors.forException(e))
+          None
+      }
+    }
 
-      Left(partitionResponses)
-    } catch {
-      case e: Throwable =>
-        error(s"Error when processing AlterPartition for partitions: ${partitionsToAlter.keys.toSeq}", e)
-        Right(Errors.UNKNOWN_SERVER_ERROR)
+    badVersionUpdates.foreach { partition =>
+      info(s"Failed to update ISR to ${adjustedIsrs(partition)} for partition $partition, bad ZK version.")
+      partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
     }
 
-    callback.apply(response)
+    // Update our cache and send out metadata updates
+    updateLeaderAndIsrCache(successfulUpdates.keys.toSeq)
+    sendUpdateMetadataRequest(
+      controllerContext.liveOrShuttingDownBrokerIds.toSeq,
+      partitionsToAlter.keySet
+    )
 
-    // After we have returned the result of the `AlterPartition` request, we should check whether
-    // there are any reassignments which can be completed by a successful ISR expansion.
-    response.left.foreach { alterPartitionResponses =>
-      alterPartitionResponses.forKeyValue { (topicPartition, partitionResponse) =>
-        if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
-          val isSuccessfulUpdate = partitionResponse.isRight
-          if (isSuccessfulUpdate) {
-            maybeCompleteReassignment(topicPartition)
-          }
+    partitionResponses.groupBy(_._1.topic).forKeyValue { (topicName, partitionResponses) =>
+      // Add each topic part to the response
+      val topicResponse = if (useTopicsIds) {
+        new AlterPartitionResponseData.TopicData()
+          .setTopicId(controllerContext.topicIds.getOrElse(topicName, Uuid.ZERO_UUID))
+      } else {
+        new AlterPartitionResponseData.TopicData()
+          .setTopicName(topicName)
+      }
+      alterPartitionResponse.topics.add(topicResponse)
+
+      partitionResponses.forKeyValue { (tp, errorOrIsr) =>
+        // Add each partition part to the response (new ISR or error)
+        errorOrIsr match {
+          case Left(error) =>
+            topicResponse.partitions.add(
+              new AlterPartitionResponseData.PartitionData()
+                .setPartitionIndex(tp.partition)
+                .setErrorCode(error.code))
+          case Right(leaderAndIsr) =>
+            /* Setting the LeaderRecoveryState field is always safe because it will always be the same
+             * as the value set in the request. For version 0, that is always the default RECOVERED
+             * which is ignored when serializing to version 0. For any other version, the
+             * LeaderRecoveryState field is supported.
+             */
+            topicResponse.partitions.add(
+              new AlterPartitionResponseData.PartitionData()
+                .setPartitionIndex(tp.partition)
+                .setLeaderId(leaderAndIsr.leader)
+                .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+                .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+                .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
+                .setPartitionEpoch(leaderAndIsr.partitionEpoch)
+            )
         }
       }
     }
+
+    callback(alterPartitionResponse)
+
+    partitionResponses
   }
 
   def allocateProducerIds(allocateProducerIdsRequest: AllocateProducerIdsRequestData,
@@ -2542,8 +2569,8 @@ class KafkaController(val config: KafkaConfig,
           processPartitionReassignmentIsrChange(partition)
         case IsrChangeNotification =>
           processIsrChangeNotification()
-        case AlterPartitionReceived(brokerId, brokerEpoch, partitionsToAlter, callback) =>
-          processAlterPartition(brokerId, brokerEpoch, partitionsToAlter, callback)
+        case AlterPartitionReceived(alterPartitionRequest, alterPartitionRequestVersion, callback) =>
+          processAlterPartition(alterPartitionRequest, alterPartitionRequestVersion, callback)
         case AllocateProducerIds(brokerId, brokerEpoch, callback) =>
           processAllocateProducerIds(brokerId, brokerEpoch, callback)
         case Startup =>
@@ -2806,7 +2833,9 @@ case object IsrChangeNotification extends ControllerEvent {
 }
 
 case class AlterPartitionReceived(
-  brokerId: Int, brokerEpoch: Long, partitionsToAlter: Map[TopicPartition, LeaderAndIsr], callback: AlterPartitionCallback
+  alterPartitionRequest: AlterPartitionRequestData,
+  alterPartitionRequestVersion: Short,
+  callback: AlterPartitionResponseData => Unit
 ) extends ControllerEvent {
   override def state: ControllerState = ControllerState.IsrChange
   override def preempt(): Unit = {}
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index ecae4dd276..d791e0a0dd 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -19,17 +19,19 @@ package kafka.server
 import java.util
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, TimeUnit}
-
 import kafka.api.LeaderAndIsr
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.{KafkaScheduler, Logging, Scheduler}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicIdPartition
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.errors.OperationNotAttemptedException
 import org.apache.kafka.common.message.AlterPartitionRequestData
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.RequestHeader
 import org.apache.kafka.common.requests.{AlterPartitionRequest, AlterPartitionResponse}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.LeaderRecoveryState
@@ -54,14 +56,14 @@ trait AlterPartitionManager {
   def shutdown(): Unit = {}
 
   def submit(
-    topicPartition: TopicPartition,
+    topicIdPartition: TopicIdPartition,
     leaderAndIsr: LeaderAndIsr,
     controllerEpoch: Int
   ): CompletableFuture[LeaderAndIsr]
 }
 
 case class AlterPartitionItem(
-  topicPartition: TopicPartition,
+  topicIdPartition: TopicIdPartition,
   leaderAndIsr: LeaderAndIsr,
   future: CompletableFuture[LeaderAndIsr],
   controllerEpoch: Int // controllerEpoch needed for `ZkAlterPartitionManager`
@@ -112,7 +114,6 @@ object AlterPartitionManager {
   ): AlterPartitionManager = {
     new ZkAlterPartitionManager(scheduler, time, zkClient)
   }
-
 }
 
 class DefaultAlterPartitionManager(
@@ -124,7 +125,20 @@ class DefaultAlterPartitionManager(
   val metadataVersionSupplier: () => MetadataVersion
 ) extends AlterPartitionManager with Logging with KafkaMetricsGroup {
 
-  // Used to allow only one pending ISR update per partition (visible for testing)
+  // Used to allow only one pending ISR update per partition (visible for testing).
+  // Note that we key items by TopicPartition despite using TopicIdPartition while
+  // submitting changes. We do this to ensure that topics with the same name but
+  // with a different topic id or no topic id collide here. There are two cases to
+  // consider:
+  // 1) When the cluster is upgraded from IBP < 2.8 to IBP >= 2.8, the ZK controller
+  //    assigns topic ids to the partitions. So partitions will start sending updates
+  //    with a topic id while they might still have updates without topic ids in this
+  //    Map. This would break the contract of only allowing one pending ISR update per
+  //    partition.
+  // 2) When a topic is deleted and re-created, we cannot have two entries in this Map
+  //    especially if we cannot use an AlterPartition request version which supports
+  //    topic ids in the end because the two updates with the same name would be merged
+  //    together.
   private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterPartitionItem] = new ConcurrentHashMap[TopicPartition, AlterPartitionItem]()
 
   // Used to allow only one in-flight request at a time
@@ -139,18 +153,18 @@ class DefaultAlterPartitionManager(
   }
 
   override def submit(
-    topicPartition: TopicPartition,
+    topicIdPartition: TopicIdPartition,
     leaderAndIsr: LeaderAndIsr,
     controllerEpoch: Int
   ): CompletableFuture[LeaderAndIsr] = {
     val future = new CompletableFuture[LeaderAndIsr]()
-    val alterPartitionItem = AlterPartitionItem(topicPartition, leaderAndIsr, future, controllerEpoch)
-    val enqueued = unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicPartition, alterPartitionItem) == null
+    val alterPartitionItem = AlterPartitionItem(topicIdPartition, leaderAndIsr, future, controllerEpoch)
+    val enqueued = unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition.topicPartition, alterPartitionItem) == null
     if (enqueued) {
       maybePropagateIsrChanges()
     } else {
       future.completeExceptionally(new OperationNotAttemptedException(
-        s"Failed to enqueue ISR change state $leaderAndIsr for partition $topicPartition"))
+        s"Failed to enqueue ISR change state $leaderAndIsr for partition $topicIdPartition"))
     }
     future
   }
@@ -172,13 +186,14 @@ class DefaultAlterPartitionManager(
   }
 
   private def sendRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): Unit = {
-    val message = buildRequest(inflightAlterPartitionItems)
-    debug(s"Sending AlterPartition to controller $message")
+    val brokerEpoch = brokerEpochSupplier()
+    val (request, topicNamesByIds) = buildRequest(inflightAlterPartitionItems, brokerEpoch)
+    debug(s"Sending AlterPartition to controller $request")
 
     // We will not timeout AlterPartition request, instead letting it retry indefinitely
     // until a response is received, or a new LeaderAndIsr overwrites the existing isrState
     // which causes the response for those partitions to be ignored.
-    controllerChannelManager.sendRequest(new AlterPartitionRequest.Builder(message),
+    controllerChannelManager.sendRequest(request,
       new ControllerRequestCompletionHandler {
         override def onComplete(response: ClientResponse): Unit = {
           debug(s"Received AlterPartition response $response")
@@ -192,8 +207,13 @@ class DefaultAlterPartitionManager(
             } else if (response.versionMismatch != null) {
               Errors.UNSUPPORTED_VERSION
             } else {
-              val body = response.responseBody().asInstanceOf[AlterPartitionResponse]
-              handleAlterPartitionResponse(body, message.brokerEpoch, inflightAlterPartitionItems)
+              handleAlterPartitionResponse(
+                response.requestHeader,
+                response.responseBody.asInstanceOf[AlterPartitionResponse],
+                brokerEpoch,
+                inflightAlterPartitionItems,
+                topicNamesByIds
+              )
             }
           } finally {
             // clear the flag so future requests can proceed
@@ -217,36 +237,74 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  /**
+   * Builds an AlterPartition request.
+   *
+   * While building the request, we don't know which version of the AlterPartition API is
+   * supported by the controller. The final decision is taken when the AlterPartitionRequest
+   * is built in the network client based on the advertised api versions of the controller.
+   *
+   * We could use version 2 or above if all the pending changes have an topic id defined;
+   * otherwise we must use version 1 or below.
+   *
+   * @return A tuple containing the AlterPartitionRequest.Builder and a mapping from
+   *         topic id to topic name. This mapping is used in the response handling.
+   */
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+    brokerEpoch: Long
+  ): (AlterPartitionRequest.Builder, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    // We build this mapping in order to map topic id back to their name when we
+    // receive the response. We cannot rely on the metadata cache for this because
+    // the metadata cache is updated after the partition state so it might not know
+    // yet about a topic id already used here.
+    val topicNamesByIds = mutable.HashMap[Uuid, String]()
+    // We can use topic ids only if all the pending changed have one defined and
+    // we use IBP 2.8 or above.
+    var canUseTopicIds = metadataVersion.isTopicIdsSupported
+
     val message = new AlterPartitionRequestData()
       .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpochSupplier.apply())
+      .setBrokerEpoch(brokerEpoch)
 
-      inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach { case (topic, items) =>
+    inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { case (topicName, items) =>
+      val topicId = items.head.topicIdPartition.topicId
+      canUseTopicIds &= topicId != Uuid.ZERO_UUID
+      topicNamesByIds(topicId) = topicName
+
+      // Both the topic name and the topic id are set here because at this stage
+      // we don't know which version of the request will be used.
       val topicData = new AlterPartitionRequestData.TopicData()
-        .setName(topic)
+        .setTopicName(topicName)
+        .setTopicId(topicId)
       message.topics.add(topicData)
+
       items.foreach { item =>
         val partitionData = new AlterPartitionRequestData.PartitionData()
-          .setPartitionIndex(item.topicPartition.partition)
+          .setPartitionIndex(item.topicIdPartition.partition)
           .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
           .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
           .setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
 
-        if (metadataVersionSupplier().isAtLeast(MetadataVersion.IBP_3_2_IV0)) {
+        if (metadataVersion.isLeaderRecoverySupported) {
           partitionData.setLeaderRecoveryState(item.leaderAndIsr.leaderRecoveryState.value)
         }
 
         topicData.partitions.add(partitionData)
       }
     }
-    message
+
+    // If we cannot use topic ids, the builder will ensure that no version higher than 1 is used.
+    (new AlterPartitionRequest.Builder(message, canUseTopicIds), topicNamesByIds)
   }
 
   def handleAlterPartitionResponse(
+    requestHeader: RequestHeader,
     alterPartitionResp: AlterPartitionResponse,
     sentBrokerEpoch: Long,
-    inflightAlterPartitionItems: Seq[AlterPartitionItem]
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+    topicNamesByIds: mutable.Map[Uuid, String]
   ): Errors = {
     val data = alterPartitionResp.data
 
@@ -260,31 +318,37 @@ class DefaultAlterPartitionManager(
 
       case Errors.NONE =>
         // Collect partition-level responses to pass to the callbacks
-        val partitionResponses = new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
+        val partitionResponses = new mutable.HashMap[TopicIdPartition, Either[Errors, LeaderAndIsr]]()
         data.topics.forEach { topic =>
-          topic.partitions.forEach { partition =>
-            val tp = new TopicPartition(topic.name, partition.partitionIndex)
-            val apiError = Errors.forCode(partition.errorCode)
-            debug(s"Controller successfully handled AlterPartition request for $tp: $partition")
-            if (apiError == Errors.NONE) {
-              LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).asScala match {
-                case Some(leaderRecoveryState) =>
-                  partitionResponses(tp) = Right(
-                    LeaderAndIsr(
-                      partition.leaderId,
-                      partition.leaderEpoch,
-                      partition.isr.asScala.toList.map(_.toInt),
-                      leaderRecoveryState,
-                      partition.partitionEpoch
+          // Topic IDs are used since version 2 of the AlterPartition API.
+          val topicName = if (requestHeader.apiVersion > 1) topicNamesByIds.get(topic.topicId).orNull else topic.topicName
+          if (topicName == null || topicName.isEmpty) {
+            error(s"Received an unexpected topic $topic in the alter partition response, ignoring it.")
+          } else {
+            topic.partitions.forEach { partition =>
+              val tp = new TopicIdPartition(topic.topicId, partition.partitionIndex, topicName)
+              val apiError = Errors.forCode(partition.errorCode)
+              debug(s"Controller successfully handled AlterPartition request for $tp: $partition")
+              if (apiError == Errors.NONE) {
+                LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).asScala match {
+                  case Some(leaderRecoveryState) =>
+                    partitionResponses(tp) = Right(
+                      LeaderAndIsr(
+                        partition.leaderId,
+                        partition.leaderEpoch,
+                        partition.isr.asScala.toList.map(_.toInt),
+                        leaderRecoveryState,
+                        partition.partitionEpoch
+                      )
                     )
-                  )
 
-                case None =>
-                  error(s"Controller returned an invalid leader recovery state (${partition.leaderRecoveryState}) for $tp: $partition")
-                  partitionResponses(tp) = Left(Errors.UNKNOWN_SERVER_ERROR)
+                  case None =>
+                    error(s"Controller returned an invalid leader recovery state (${partition.leaderRecoveryState}) for $tp: $partition")
+                    partitionResponses(tp) = Left(Errors.UNKNOWN_SERVER_ERROR)
+                }
+              } else {
+                partitionResponses(tp) = Left(apiError)
               }
-            } else {
-              partitionResponses(tp) = Left(apiError)
             }
           }
         }
@@ -293,7 +357,7 @@ class DefaultAlterPartitionManager(
         // partition was somehow erroneously excluded from the response. Note that these callbacks are run from
         // the leaderIsrUpdateLock write lock in Partition#sendAlterPartitionRequest
         inflightAlterPartitionItems.foreach { inflightAlterPartition =>
-          partitionResponses.get(inflightAlterPartition.topicPartition) match {
+          partitionResponses.get(inflightAlterPartition.topicIdPartition) match {
             case Some(leaderAndIsrOrError) =>
               try {
                 leaderAndIsrOrError match {
@@ -302,11 +366,11 @@ class DefaultAlterPartitionManager(
                 }
               } finally {
                 // Regardless of callback outcome, we need to clear from the unsent updates map to unblock further updates
-                unsentIsrUpdates.remove(inflightAlterPartition.topicPartition)
+                unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition.topicPartition)
               }
             case None =>
               // Don't remove this partition from the update map so it will get re-sent
-              warn(s"Partition ${inflightAlterPartition.topicPartition} was sent but not included in the response")
+              warn(s"Partition ${inflightAlterPartition.topicIdPartition} was sent but not included in the response")
           }
         }
 
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 0aaab0aefb..85ae7d70b6 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -153,8 +153,8 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   def handleDeleteTopics(request: RequestChannel.Request): Unit = {
     val deleteTopicsRequest = request.body[DeleteTopicsRequest]
-    val context = new ControllerRequestContext(request.context.principal,
-      requestTimeoutMsToDeadlineNs(time, deleteTopicsRequest.data().timeoutMs()))
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
+      requestTimeoutMsToDeadlineNs(time, deleteTopicsRequest.data.timeoutMs))
     val future = deleteTopics(context,
       deleteTopicsRequest.data,
       request.context.apiVersion,
@@ -317,10 +317,10 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   def handleCreateTopics(request: RequestChannel.Request): Unit = {
     val createTopicsRequest = request.body[CreateTopicsRequest]
-    val context = new ControllerRequestContext(request.context.principal,
-      requestTimeoutMsToDeadlineNs(time, createTopicsRequest.data().timeoutMs()))
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
+      requestTimeoutMsToDeadlineNs(time, createTopicsRequest.data.timeoutMs))
     val future = createTopics(context,
-        createTopicsRequest.data(),
+        createTopicsRequest.data,
         authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false),
         names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(identity),
         names => authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
@@ -429,7 +429,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleLegacyAlterConfigs(request: RequestChannel.Request): Unit = {
     val response = new AlterConfigsResponseData()
     val alterConfigsRequest = request.body[AlterConfigsRequest]
-    val context = new ControllerRequestContext(request.context.principal, OptionalLong.empty())
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty())
     val duplicateResources = new util.HashSet[ConfigResource]
     val configChanges = new util.HashMap[ConfigResource, util.Map[String, String]]()
     alterConfigsRequest.data.resources.forEach { resource =>
@@ -508,8 +508,8 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleElectLeaders(request: RequestChannel.Request): Unit = {
     authHelper.authorizeClusterOperation(request, ALTER)
     val electLeadersRequest = request.body[ElectLeadersRequest]
-    val context = new ControllerRequestContext(request.context.principal,
-      requestTimeoutMsToDeadlineNs(time, electLeadersRequest.data().timeoutMs()))
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
+      requestTimeoutMsToDeadlineNs(time, electLeadersRequest.data.timeoutMs))
     val future = controller.electLeaders(context, electLeadersRequest.data)
     future.whenComplete { (responseData, exception) =>
       if (exception != null) {
@@ -526,7 +526,7 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   def handleAlterPartitionRequest(request: RequestChannel.Request): Unit = {
     val alterPartitionRequest = request.body[AlterPartitionRequest]
-    val context = new ControllerRequestContext(request.context.principal,
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       OptionalLong.empty())
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     val future = controller.alterPartition(context, alterPartitionRequest.data)
@@ -543,7 +543,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleBrokerHeartBeatRequest(request: RequestChannel.Request): Unit = {
     val heartbeatRequest = request.body[BrokerHeartbeatRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-    val context = new ControllerRequestContext(request.context.principal,
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       requestTimeoutMsToDeadlineNs(time, config.brokerHeartbeatIntervalMs))
     controller.processBrokerHeartbeat(context, heartbeatRequest.data).handle[Unit] { (reply, e) =>
       def createResponseCallback(requestThrottleMs: Int,
@@ -570,10 +570,10 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleUnregisterBroker(request: RequestChannel.Request): Unit = {
     val decommissionRequest = request.body[UnregisterBrokerRequest]
     authHelper.authorizeClusterOperation(request, ALTER)
-    val context = new ControllerRequestContext(request.context.principal,
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       OptionalLong.empty())
 
-    controller.unregisterBroker(context, decommissionRequest.data().brokerId()).handle[Unit] { (_, e) =>
+    controller.unregisterBroker(context, decommissionRequest.data.brokerId).handle[Unit] { (_, e) =>
       def createResponseCallback(requestThrottleMs: Int,
                                  e: Throwable): UnregisterBrokerResponse = {
         if (e != null) {
@@ -593,7 +593,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleBrokerRegistration(request: RequestChannel.Request): Unit = {
     val registrationRequest = request.body[BrokerRegistrationRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-    val context = new ControllerRequestContext(request.context.principal,
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       OptionalLong.empty())
 
     controller.registerBroker(context, registrationRequest.data).handle[Unit] { (reply, e) =>
@@ -634,7 +634,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleAlterClientQuotas(request: RequestChannel.Request): Unit = {
     val quotaRequest = request.body[AlterClientQuotasRequest]
     authHelper.authorizeClusterOperation(request, ALTER_CONFIGS)
-    val context = new ControllerRequestContext(request.context.principal,
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       OptionalLong.empty())
     controller.alterClientQuotas(context, quotaRequest.entries, quotaRequest.validateOnly)
       .whenComplete { (results, exception) =>
@@ -650,7 +650,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleIncrementalAlterConfigs(request: RequestChannel.Request): Unit = {
     val response = new IncrementalAlterConfigsResponseData()
     val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
-    val context = new ControllerRequestContext(request.context.principal,
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       OptionalLong.empty())
     val duplicateResources = new util.HashSet[ConfigResource]
     val configChanges = new util.HashMap[ConfigResource,
@@ -716,8 +716,8 @@ class ControllerApis(val requestChannel: RequestChannel,
       authHelper.filterByAuthorized(request.context, ALTER, TOPIC, topics)(n => n)
     }
     val createPartitionsRequest = request.body[CreatePartitionsRequest]
-    val context = new ControllerRequestContext(request.context.principal,
-      requestTimeoutMsToDeadlineNs(time, createPartitionsRequest.data().timeoutMs()))
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
+      requestTimeoutMsToDeadlineNs(time, createPartitionsRequest.data.timeoutMs))
     val future = createPartitions(context,
       createPartitionsRequest.data(),
       filterAlterAuthorizedTopics)
@@ -776,9 +776,9 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleAlterPartitionReassignments(request: RequestChannel.Request): Unit = {
     val alterRequest = request.body[AlterPartitionReassignmentsRequest]
     authHelper.authorizeClusterOperation(request, ALTER)
-    val context = new ControllerRequestContext(request.context.principal,
-      requestTimeoutMsToDeadlineNs(time, alterRequest.data().timeoutMs()))
-    val response = controller.alterPartitionReassignments(context, alterRequest.data()).get()
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
+      requestTimeoutMsToDeadlineNs(time, alterRequest.data.timeoutMs))
+    val response = controller.alterPartitionReassignments(context, alterRequest.data).get()
     requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
       new AlterPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
   }
@@ -786,9 +786,9 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleListPartitionReassignments(request: RequestChannel.Request): Unit = {
     val listRequest = request.body[ListPartitionReassignmentsRequest]
     authHelper.authorizeClusterOperation(request, DESCRIBE)
-    val context = new ControllerRequestContext(request.context.principal,
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       OptionalLong.empty())
-    val response = controller.listPartitionReassignments(context, listRequest.data()).get()
+    val response = controller.listPartitionReassignments(context, listRequest.data).get()
     requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
       new ListPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
   }
@@ -796,7 +796,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = {
     val allocatedProducerIdsRequest = request.body[AllocateProducerIdsRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-    val context = new ControllerRequestContext(request.context.principal,
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
         OptionalLong.empty())
     controller.allocateProducerIds(context, allocatedProducerIdsRequest.data)
       .whenComplete((results, exception) => {
@@ -814,7 +814,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
     val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
     authHelper.authorizeClusterOperation(request, ALTER)
-    val context = new ControllerRequestContext(request.context.principal,
+    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       OptionalLong.empty())
     controller.updateFeatures(context, updateFeaturesRequest.data)
       .whenComplete((response, exception) => {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index e2537f4e2f..b128db596c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3306,9 +3306,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendResponseExemptThrottle(request, alterPartitionRequest.getErrorResponse(
         AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.NOT_CONTROLLER.exception))
     else
-      zkSupport.controller.alterPartitions(alterPartitionRequest.data, alterPartitionResp =>
-        requestHelper.sendResponseExemptThrottle(request, new AlterPartitionResponse(alterPartitionResp))
-      )
+      zkSupport.controller.alterPartitions(alterPartitionRequest.data, request.context.apiVersion, alterPartitionResp =>
+        requestHelper.sendResponseExemptThrottle(request, new AlterPartitionResponse(alterPartitionResp)))
   }
 
   def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
diff --git a/core/src/main/scala/kafka/server/ZkAlterPartitionManager.scala b/core/src/main/scala/kafka/server/ZkAlterPartitionManager.scala
index c906ad6a70..c3d842b796 100644
--- a/core/src/main/scala/kafka/server/ZkAlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAlterPartitionManager.scala
@@ -19,10 +19,11 @@ package kafka.server
 import kafka.utils.{Logging, ReplicationUtils, Scheduler}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
+
 import java.util.concurrent.atomic.AtomicLong
 import java.util.concurrent.{CompletableFuture, TimeUnit}
-
 import kafka.api.LeaderAndIsr
+import org.apache.kafka.common.TopicIdPartition
 import org.apache.kafka.common.errors.InvalidUpdateVersionException
 import org.apache.kafka.common.utils.Time
 
@@ -58,21 +59,21 @@ class ZkAlterPartitionManager(scheduler: Scheduler, time: Time, zkClient: KafkaZ
   }
 
   override def submit(
-    topicPartition: TopicPartition,
+    topicIdPartition: TopicIdPartition,
     leaderAndIsr: LeaderAndIsr,
     controllerEpoch: Int
   ): CompletableFuture[LeaderAndIsr]= {
     debug(s"Writing new ISR ${leaderAndIsr.isr} to ZooKeeper with version " +
-      s"${leaderAndIsr.partitionEpoch} for partition $topicPartition")
+      s"${leaderAndIsr.partitionEpoch} for partition $topicIdPartition")
 
-    val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicPartition,
+    val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicIdPartition.topicPartition,
       leaderAndIsr, controllerEpoch)
 
     val future = new CompletableFuture[LeaderAndIsr]()
     if (updateSucceeded) {
       // Track which partitions need to be propagated to the controller
       isrChangeSet synchronized {
-        isrChangeSet += topicPartition
+        isrChangeSet += topicIdPartition.topicPartition
         lastIsrChangeMs.set(time.milliseconds())
       }
 
@@ -81,7 +82,7 @@ class ZkAlterPartitionManager(scheduler: Scheduler, time: Time, zkClient: KafkaZ
       future.complete(leaderAndIsr.withPartitionEpoch(newVersion))
     } else {
       future.completeExceptionally(new InvalidUpdateVersionException(
-        s"ISR update $leaderAndIsr for partition $topicPartition with controller epoch $controllerEpoch " +
+        s"ISR update $leaderAndIsr for partition $topicIdPartition with controller epoch $controllerEpoch " +
           "failed with an invalid version error"))
     }
     future
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index e3270fa70a..ae2e652357 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -199,7 +199,15 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
   override def getTopicName(topicId: Uuid): Option[String] = _currentImage.topics().topicsById.asScala.get(topicId).map(_.name())
 
   override def hasAliveBroker(brokerId: Int): Boolean = {
-    Option(_currentImage.cluster().broker(brokerId)).count(!_.fenced()) == 1
+    Option(_currentImage.cluster.broker(brokerId)).count(!_.fenced()) == 1
+  }
+
+  def isBrokerFenced(brokerId: Int): Boolean = {
+    Option(_currentImage.cluster.broker(brokerId)).count(_.fenced) == 1
+  }
+
+  def isBrokerShuttingDown(brokerId: Int): Boolean = {
+    Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 1
   }
 
   override def getAliveBrokers(): Iterable[BrokerMetadata] = getAliveBrokers(_currentImage)
diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
index 147743a77d..13e627b529 100644
--- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
@@ -65,7 +65,7 @@ class AbstractPartitionTest {
 
     val logProps = createLogProperties(Map.empty)
     logConfig = LogConfig(logProps)
-    configRepository = MockConfigRepository.forTopic(topicPartition.topic(), logProps)
+    configRepository = MockConfigRepository.forTopic(topicPartition.topic, logProps)
 
     tmpDir = TestUtils.tempDir()
     logDir1 = TestUtils.randomPartitionLogDir(tmpDir)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 1cf66a9b4c..6c374fe3c1 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -20,7 +20,6 @@ package kafka.cluster
 import java.util.{Optional, Properties}
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
-
 import kafka.api.LeaderAndIsr
 import kafka.log._
 import kafka.server._
@@ -28,6 +27,7 @@ import kafka.server.checkpoints.OffsetCheckpoints
 import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.metadata.MockConfigRepository
 import kafka.utils._
+import org.apache.kafka.common.TopicIdPartition
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
@@ -276,10 +276,13 @@ class PartitionLockTest extends Logging {
       logManager,
       alterIsrManager) {
 
-      override def prepareIsrShrink(outOfSyncReplicaIds: Set[Int]): PendingShrinkIsr = {
+      override def prepareIsrShrink(
+        currentState: CommittedPartitionState,
+        outOfSyncReplicaIds: Set[Int]
+      ): PendingShrinkIsr = {
         shrinkIsrSemaphore.acquire()
         try {
-          super.prepareIsrShrink(outOfSyncReplicaIds)
+          super.prepareIsrShrink(currentState, outOfSyncReplicaIds)
         } finally {
           shrinkIsrSemaphore.release()
         }
@@ -319,12 +322,14 @@ class PartitionLockTest extends Logging {
         new SlowLog(log, offsets.logStartOffset, localLog, leaderEpochCache, producerStateManager, appendSemaphore)
       }
     }
+
+    val topicIdPartition = new TopicIdPartition(partition.topicId.getOrElse(Uuid.ZERO_UUID), topicPartition)
     when(offsetCheckpoints.fetch(
       ArgumentMatchers.anyString,
       ArgumentMatchers.eq(topicPartition)
     )).thenReturn(None)
     when(alterIsrManager.submit(
-      ArgumentMatchers.eq(topicPartition),
+      ArgumentMatchers.eq(topicIdPartition),
       ArgumentMatchers.any[LeaderAndIsr],
       ArgumentMatchers.anyInt()
     )).thenReturn(new CompletableFuture[LeaderAndIsr]())
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 04d2b15c60..7dcead062b 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -46,6 +46,7 @@ import java.nio.ByteBuffer
 import java.util.Optional
 import java.util.concurrent.{CountDownLatch, Semaphore}
 import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.metadata.KRaftMetadataCache
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
@@ -1332,6 +1333,202 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(alterPartitionListener.failures.get, 1)
   }
 
+  @Test
+  def testIsrNotExpandedIfReplicaIsFenced(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val remoteBrokerId = brokerId + 1
+    val replicas = List(brokerId, remoteBrokerId)
+    val isr = Set(brokerId)
+
+    val metadataCache = mock(classOf[KRaftMetadataCache])
+    val partition = new Partition(
+      topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = MetadataVersion.latest,
+      localBrokerId = brokerId,
+      time,
+      alterPartitionListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterPartitionManager
+    )
+
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
+    assertTrue(partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(controllerEpoch)
+        .setLeader(brokerId)
+        .setLeaderEpoch(leaderEpoch)
+        .setIsr(isr.toList.map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setReplicas(replicas.map(Int.box).asJava)
+        .setIsNew(true),
+      offsetCheckpoints, None), "Expected become leader transition to succeed")
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+
+    // Fetch to let the follower catch up to the log end offset and
+    // to check if an expansion is possible.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
+
+    // Follower fetches and catches up to the log end offset.
+    assertReplicaState(partition, remoteBrokerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = log.logEndOffset
+    )
+
+    // Expansion is triggered.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
+    assertEquals(1, alterPartitionManager.isrUpdates.size)
+
+    // Controller rejects the expansion because the broker is fenced.
+    alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA)
+
+    // The leader reverts back to the previous ISR.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+    assertFalse(partition.partitionState.isInflight)
+    assertEquals(0, alterPartitionManager.isrUpdates.size)
+
+    // The leader eventually learns about the fenced broker.
+    when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(true)
+
+    // The follower fetches again.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
+
+    // Expansion is not triggered because the follower is fenced.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+    assertFalse(partition.partitionState.isInflight)
+    assertEquals(0, alterPartitionManager.isrUpdates.size)
+
+    // The broker is eventually unfenced.
+    when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(false)
+
+    // The follower fetches again.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
+
+    // Expansion is triggered.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
+    assertTrue(partition.partitionState.isInflight)
+    assertEquals(1, alterPartitionManager.isrUpdates.size)
+
+    // Expansion succeeds.
+    alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 1)
+
+    // ISR is committed.
+    assertEquals(replicas.toSet, partition.partitionState.isr)
+    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
+    assertFalse(partition.partitionState.isInflight)
+    assertEquals(0, alterPartitionManager.isrUpdates.size)
+  }
+
+  @Test
+  def testIsrNotExpandedIfReplicaIsInControlledShutdown(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val remoteBrokerId = brokerId + 1
+    val replicas = List(brokerId, remoteBrokerId)
+    val isr = Set(brokerId)
+
+    val metadataCache = mock(classOf[KRaftMetadataCache])
+    val partition = new Partition(
+      topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = MetadataVersion.latest,
+      localBrokerId = brokerId,
+      time,
+      alterPartitionListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterPartitionManager
+    )
+
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
+    assertTrue(partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(controllerEpoch)
+        .setLeader(brokerId)
+        .setLeaderEpoch(leaderEpoch)
+        .setIsr(isr.toList.map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setReplicas(replicas.map(Int.box).asJava)
+        .setIsNew(true),
+      offsetCheckpoints, None), "Expected become leader transition to succeed")
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+
+    // Fetch to let the follower catch up to the log end offset and
+    // to check if an expansion is possible.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
+
+    // Follower fetches and catches up to the log end offset.
+    assertReplicaState(partition, remoteBrokerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = log.logEndOffset
+    )
+
+    // Expansion is triggered.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
+    assertEquals(1, alterPartitionManager.isrUpdates.size)
+
+    // Controller rejects the expansion because the broker is in controlled shutdown.
+    alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA)
+
+    // The leader reverts back to the previous ISR.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+    assertFalse(partition.partitionState.isInflight)
+    assertEquals(0, alterPartitionManager.isrUpdates.size)
+
+    // The leader eventually learns about the in controlled shutdown broker.
+    when(metadataCache.isBrokerShuttingDown(remoteBrokerId)).thenReturn(true)
+
+    // The follower fetches again.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
+
+    // Expansion is not triggered because the follower is fenced.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+    assertFalse(partition.partitionState.isInflight)
+    assertEquals(0, alterPartitionManager.isrUpdates.size)
+
+    // The broker eventually comes back.
+    when(metadataCache.isBrokerShuttingDown(remoteBrokerId)).thenReturn(false)
+
+    // The follower fetches again.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
+
+    // Expansion is triggered.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
+    assertTrue(partition.partitionState.isInflight)
+    assertEquals(1, alterPartitionManager.isrUpdates.size)
+
+    // Expansion succeeds.
+    alterPartitionManager.completeIsrUpdate(newPartitionEpoch= 1)
+
+    // ISR is committed.
+    assertEquals(replicas.toSet, partition.partitionState.isr)
+    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
+    assertFalse(partition.partitionState.isInflight)
+    assertEquals(0, alterPartitionManager.isrUpdates.size)
+  }
+
   @Test
   def testRetryShrinkIsr(): Unit = {
     val log = logManager.getOrCreateLog(topicPartition, topicId = None)
@@ -1652,6 +1849,16 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(0L, partition.localLogOrException.highWatermark)
   }
 
+  @Test
+  def testAlterIsrNewLeaderElected(): Unit = {
+    handleAlterIsrFailure(Errors.NEW_LEADER_ELECTED,
+      (brokerId: Int, remoteBrokerId: Int, partition: Partition) => {
+        assertEquals(partition.partitionState.isr, Set(brokerId))
+        assertEquals(partition.partitionState.maximalIsr, Set(brokerId, remoteBrokerId))
+        assertEquals(alterPartitionManager.isrUpdates.size, 0)
+      })
+  }
+
   @Test
   def testAlterIsrUnknownTopic(): Unit = {
     handleAlterIsrFailure(Errors.UNKNOWN_TOPIC_OR_PARTITION,
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index d49502dd62..57cbeafd4d 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -19,14 +19,17 @@ package kafka.controller
 
 import java.util.Properties
 import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingQueue, TimeUnit}
+import java.util.stream.{Stream => JStream}
 import com.yammer.metrics.core.Timer
 import kafka.api.LeaderAndIsr
-import kafka.controller.KafkaController.AlterPartitionCallback
 import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
 import kafka.utils.{LogCaptureAppender, TestUtils}
 import kafka.zk.{FeatureZNodeStatus, _}
 import org.apache.kafka.common.errors.{ControllerMovedException, StaleBrokerEpochException}
+import org.apache.kafka.common.message.AlterPartitionRequestData
+import org.apache.kafka.common.message.AlterPartitionResponseData
 import org.apache.kafka.common.metrics.KafkaMetric
+import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.{ElectionType, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
@@ -36,6 +39,9 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.log4j.Level
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.Arguments
+import org.junit.jupiter.params.provider.MethodSource
 import org.mockito.Mockito.{doAnswer, spy, verify}
 import org.mockito.invocation.InvocationOnMock
 
@@ -43,6 +49,16 @@ import scala.collection.{Map, Seq, mutable}
 import scala.jdk.CollectionConverters._
 import scala.util.{Failure, Success, Try}
 
+object ControllerIntegrationTest {
+  def testAlterPartitionSource(): JStream[Arguments] = {
+    Seq(MetadataVersion.IBP_2_7_IV0, MetadataVersion.latest).asJava.stream.flatMap { metadataVersion =>
+      ApiKeys.ALTER_PARTITION.allVersions.stream.map { alterPartitionVersion =>
+        Arguments.of(metadataVersion, alterPartitionVersion)
+      }
+    }
+  }
+}
+
 class ControllerIntegrationTest extends QuorumTestHarness {
   var servers = Seq.empty[KafkaServer]
   val firstControllerEpoch = KafkaController.InitialControllerEpoch + 1
@@ -846,6 +862,135 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     }
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testAlterPartitionSource"))
+  def testAlterPartition(metadataVersion: MetadataVersion, alterPartitionVersion: Short): Unit = {
+    if (!metadataVersion.isTopicIdsSupported && alterPartitionVersion > 1) {
+      // This combination is not valid. We cannot use alter partition version > 1
+      // if the broker is on an IBP < 2.8 because topics don't have id in this case.
+      return
+    }
+
+    servers = makeServers(1, interBrokerProtocolVersion = Some(metadataVersion))
+
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
+    val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds.getOrElse(tp.topic, Uuid.ZERO_UUID)
+    val brokerId = controllerId
+    val brokerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+    // The caller of the AlterPartition API can only use topics ids iff 1) the controller is
+    // on IBP >= 2.8 and 2) the AlterPartition version 2 and above is used.
+    val canCallerUseTopicIds = metadataVersion.isTopicIdsSupported && alterPartitionVersion > 1
+
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(brokerId)
+      .setBrokerEpoch(brokerEpoch)
+      .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+        .setTopicName(if (!canCallerUseTopicIds) tp.topic else "")
+        .setTopicId(if (canCallerUseTopicIds) topicId else Uuid.ZERO_UUID)
+        .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+          .setPartitionIndex(tp.partition)
+          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
+          .setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
+          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+        ).asJava)
+      ).asJava)
+
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    controller.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionVersion,
+      future.complete
+    ))
+
+    val expectedAlterPartitionResponse = new AlterPartitionResponseData()
+      .setTopics(Seq(new AlterPartitionResponseData.TopicData()
+        .setTopicName(if (!canCallerUseTopicIds) tp.topic else "")
+        .setTopicId(if (canCallerUseTopicIds) topicId else Uuid.ZERO_UUID)
+        .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
+          .setPartitionIndex(tp.partition)
+          .setLeaderId(brokerId)
+          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
+          .setIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
+          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+        ).asJava)
+      ).asJava)
+
+    assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testAlterPartitionVersion2KeepWorkingWhenControllerDowngradeToPre28IBP(): Unit = {
+    // When the controller downgrades from IBP >= 2.8 to IBP < 2.8, it does not assign
+    // topic ids anymore. However, the already assigned topic ids are kept. This means
+    // that using AlterPartition version 2 should still work assuming that it only
+    // contains topic with topics ids.
+    servers = makeServers(1, interBrokerProtocolVersion = Some(MetadataVersion.latest))
+
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+
+    // Downgrade controller to IBP 2.7
+    servers(0).shutdown()
+    servers(0).awaitShutdown()
+    servers = makeServers(1, interBrokerProtocolVersion = Some(IBP_2_7_IV0))
+    TestUtils.waitUntilControllerElected(zkClient)
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
+    val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds.getOrElse(tp.topic, Uuid.ZERO_UUID)
+    val brokerId = controllerId
+    val brokerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(brokerId)
+      .setBrokerEpoch(brokerEpoch)
+      .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+        .setTopicId(topicId)
+        .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+          .setPartitionIndex(tp.partition)
+          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
+          .setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
+          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+        ).asJava)
+      ).asJava)
+
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    controller.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      ApiKeys.ALTER_PARTITION.latestVersion,
+      future.complete
+    ))
+
+    val expectedAlterPartitionResponse = new AlterPartitionResponseData()
+      .setTopics(Seq(new AlterPartitionResponseData.TopicData()
+        .setTopicId(topicId)
+        .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
+          .setPartitionIndex(tp.partition)
+          .setLeaderId(brokerId)
+          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
+          .setIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
+          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+        ).asJava)
+      ).asJava)
+
+    assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
+  }
+
   @Test
   def testIdempotentAlterPartition(): Unit = {
     servers = makeServers(2)
@@ -855,29 +1000,49 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
 
-    val latch = new CountDownLatch(1)
     val controller = getController().kafkaController
-
     val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
     val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds(tp.topic)
+    val brokerId = otherBroker.config.brokerId
+    val brokerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(otherBroker.config.brokerId)
 
-    val callback = (result: Either[Map[TopicPartition, Either[Errors, LeaderAndIsr]], Errors]) => {
-      result match {
-        case Left(partitionResults: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
-          partitionResults.get(tp) match {
-            case Some(Left(error: Errors)) => throw new AssertionError(s"Should not have seen error for $tp")
-            case Some(Right(leaderAndIsr: LeaderAndIsr)) => assertEquals(leaderAndIsr, newLeaderAndIsr, "ISR should remain unchanged")
-            case None => throw new AssertionError(s"Should have seen $tp in result")
-          }
-        case Right(_: Errors) => throw new AssertionError("Should not have had top-level error here")
-      }
-      latch.countDown()
-    }
-
-    val brokerEpoch = controller.controllerContext.liveBrokerIdAndEpochs.get(otherBroker.config.brokerId).get
     // When re-sending the current ISR, we should not get and error or any ISR changes
-    controller.eventManager.put(AlterPartitionReceived(otherBroker.config.brokerId, brokerEpoch, Map(tp -> newLeaderAndIsr), callback))
-    latch.await()
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(brokerId)
+      .setBrokerEpoch(brokerEpoch)
+      .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+        .setTopicId(topicId)
+        .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+          .setPartitionIndex(tp.partition)
+          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
+          .setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
+          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+        ).asJava)
+      ).asJava)
+
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    controller.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
+      future.complete
+    ))
+
+    val expectedAlterPartitionResponse = new AlterPartitionResponseData()
+      .setTopics(Seq(new AlterPartitionResponseData.TopicData()
+        .setTopicId(topicId)
+        .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
+          .setPartitionIndex(tp.partition)
+          .setLeaderId(brokerId)
+          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
+          .setIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
+          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+        ).asJava)
+      ).asJava)
+
+    assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
   @Test
@@ -895,20 +1060,23 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     val leaderBrokerEpoch = servers(leaderId).kafkaController.brokerEpoch
     val leaderEpoch = partitionState.leaderAndIsr.leaderEpoch
     val partitionEpoch = partitionState.leaderAndIsr.partitionEpoch
+    val topicId = controller.controllerContext.topicIds.get(tp.topic)
 
     def assertAlterPartition(
       topLevelError: Errors = Errors.NONE,
       partitionError: Errors = Errors.NONE,
       topicPartition: TopicPartition = tp,
+      topicIdOpt: Option[Uuid] = topicId,
       leaderId: Int = leaderId,
       brokerEpoch: Long = leaderBrokerEpoch,
       leaderEpoch: Int = leaderEpoch,
       partitionEpoch: Int = partitionEpoch,
       isr: Set[Int] = replicas.toSet,
-      leaderRecoveryState: LeaderRecoveryState = LeaderRecoveryState.RECOVERED
+      leaderRecoveryState: Byte = LeaderRecoveryState.RECOVERED.value
     ): Unit = {
       assertAlterPartitionError(
         topicPartition = topicPartition,
+        topicIdOpt = topicIdOpt,
         leaderId = leaderId,
         brokerEpoch = brokerEpoch,
         leaderEpoch = leaderEpoch,
@@ -930,14 +1098,22 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       leaderId = 99,
     )
 
+    assertAlterPartition(
+      partitionError = Errors.UNKNOWN_TOPIC_ID,
+      topicPartition = tp,
+      topicIdOpt = Some(Uuid.randomUuid())
+    )
+
     assertAlterPartition(
       partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION,
-      topicPartition = new TopicPartition("unknown", 0)
+      topicPartition = new TopicPartition("unknown", 0),
+      topicIdOpt = None
     )
 
     assertAlterPartition(
       partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION,
-      topicPartition = new TopicPartition(tp.topic, 1)
+      topicPartition = new TopicPartition(tp.topic, 1),
+      topicIdOpt = None
     )
 
     assertAlterPartition(
@@ -957,12 +1133,12 @@ class ControllerIntegrationTest extends QuorumTestHarness {
 
     assertAlterPartition(
       partitionError = Errors.INVALID_REQUEST,
-      leaderRecoveryState = LeaderRecoveryState.RECOVERING
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
     )
 
     assertAlterPartition(
       partitionError = Errors.INVALID_REQUEST,
-      leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING.value,
       isr = Set(controllerId)
     )
 
@@ -971,21 +1147,27 @@ class ControllerIntegrationTest extends QuorumTestHarness {
 
     assertAlterPartition(
       partitionError = Errors.INVALID_UPDATE_VERSION,
-      leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING.value,
       partitionEpoch = partitionEpoch - 1
     )
 
     assertAlterPartition(
       partitionError = Errors.FENCED_LEADER_EPOCH,
-      leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING.value,
       leaderEpoch = leaderEpoch - 1
     )
 
     assertAlterPartition(
       partitionError = Errors.FENCED_LEADER_EPOCH,
-      leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING.value,
       leaderEpoch = leaderEpoch + 1
     )
+
+    // Validate that unexpected exceptions are handled correctly.
+    assertAlterPartition(
+      topLevelError = Errors.UNKNOWN_SERVER_ERROR,
+      leaderRecoveryState = 25, // Invalid recovery state.
+    )
   }
 
   @Test
@@ -1008,6 +1190,8 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     val replica1 :: replica2 :: Nil = replicas
 
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    val topicIdOpt = controller.controllerContext.topicIds.get(tp.topic)
+
     servers(replica1).shutdown()
     servers(replica1).awaitShutdown()
 
@@ -1042,10 +1226,11 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       brokerEpoch: Long = leaderBrokerEpoch,
       leaderEpoch: Int = leaderEpoch,
       partitionEpoch: Int = partitionEpoch,
-      leaderRecoveryState: LeaderRecoveryState = LeaderRecoveryState.RECOVERED
+      leaderRecoveryState: Byte = LeaderRecoveryState.RECOVERED.value
     ): Unit = {
       assertAlterPartitionError(
         topicPartition = tp,
+        topicIdOpt = topicIdOpt,
         leaderId = leaderId,
         brokerEpoch = brokerEpoch,
         leaderEpoch = leaderEpoch,
@@ -1084,7 +1269,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
 
     assertAlterPartition(
       partitionError = Errors.INVALID_REQUEST,
-      leaderRecoveryState = LeaderRecoveryState.RECOVERING
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
     )
 
     // Version/epoch errors take precedence over other validations since
@@ -1093,86 +1278,70 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     assertAlterPartition(
       partitionError = Errors.INVALID_UPDATE_VERSION,
       partitionEpoch = partitionEpoch - 1,
-      leaderRecoveryState = LeaderRecoveryState.RECOVERING
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
     )
 
     assertAlterPartition(
       partitionError = Errors.FENCED_LEADER_EPOCH,
       leaderEpoch = leaderEpoch - 1,
-      leaderRecoveryState = LeaderRecoveryState.RECOVERING
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
     )
 
     assertAlterPartition(
       partitionError = Errors.FENCED_LEADER_EPOCH,
       leaderEpoch = leaderEpoch + 1,
-      leaderRecoveryState = LeaderRecoveryState.RECOVERING
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
     )
   }
 
   def assertAlterPartitionError(
     topicPartition: TopicPartition,
+    topicIdOpt: Option[Uuid],
     leaderId: Int,
     brokerEpoch: Long,
     leaderEpoch: Int,
     partitionEpoch: Int,
     isr: Set[Int],
-    leaderRecoveryState: LeaderRecoveryState,
+    leaderRecoveryState: Byte,
     topLevelError: Errors,
     partitionError: Errors,
   ): Unit = {
-    val leaderAndIsr = LeaderAndIsr(
-      leader = leaderId,
-      leaderEpoch = leaderEpoch,
-      isr = isr.toList,
-      partitionEpoch = partitionEpoch,
-      leaderRecoveryState = leaderRecoveryState
-    )
-
-    val future = captureAlterPartitionError(
-      brokerId = leaderId,
-      brokerEpoch = brokerEpoch,
-      topicPartition = topicPartition,
-      leaderAndIsr = leaderAndIsr
-    )
-
-    val errors = future.get(10, TimeUnit.SECONDS)
-    assertEquals(topLevelError, errors.topLevelError)
-
-    if (topLevelError == Errors.NONE) {
-      assertEquals(Some(partitionError), errors.partitionError)
-    }
-  }
-
-  private case class AlterPartitionError(topLevelError: Errors, partitionError: Option[Errors])
-
-  private def captureAlterPartitionError(
-    brokerId: Int,
-    brokerEpoch: Long,
-    topicPartition: TopicPartition,
-    leaderAndIsr: LeaderAndIsr
-  ): CompletableFuture[AlterPartitionError] = {
-    val future = new CompletableFuture[AlterPartitionError]()
-    val callback: AlterPartitionCallback = {
-      case Left(partitionResults: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
-        partitionResults.get(topicPartition) match {
-          case Some(Left(error: Errors)) =>
-            future.complete(AlterPartitionError(topLevelError = Errors.NONE, partitionError = Some(error)))
-
-          case Some(Right(_: LeaderAndIsr)) =>
-            future.complete(AlterPartitionError(topLevelError = Errors.NONE, partitionError = Some(Errors.NONE)))
-
-          case None =>
-            future.completeExceptionally(new AssertionError(s"Should have seen $topicPartition in result"))
-        }
-
-      case Right(error: Errors) =>
-        future.complete(AlterPartitionError(topLevelError = error, partitionError = None))
+    val topicName = if (topicIdOpt.isEmpty) topicPartition.topic else ""
+    val topicId = topicIdOpt.getOrElse(Uuid.ZERO_UUID)
+
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(leaderId)
+      .setBrokerEpoch(brokerEpoch)
+      .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+        .setTopicId(topicId)
+        .setTopicName(topicName)
+        .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+          .setPartitionIndex(topicPartition.partition)
+          .setLeaderEpoch(leaderEpoch)
+          .setPartitionEpoch(partitionEpoch)
+          .setNewIsr(isr.toList.map(Int.box).asJava)
+          .setLeaderRecoveryState(leaderRecoveryState)).asJava)).asJava)
+
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    getController().kafkaController.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      if (topicIdOpt.isDefined) AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION else 1,
+      future.complete
+    ))
+
+    val expectedAlterPartitionResponse = if (topLevelError != Errors.NONE) {
+      new AlterPartitionResponseData().setErrorCode(topLevelError.code)
+    } else {
+      new AlterPartitionResponseData()
+        .setTopics(Seq(new AlterPartitionResponseData.TopicData()
+          .setTopicId(topicId)
+          .setTopicName(topicName)
+          .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
+            .setPartitionIndex(topicPartition.partition)
+            .setErrorCode(partitionError.code)).asJava)).asJava)
     }
 
-    val partitionsToAlter = Map(topicPartition -> leaderAndIsr)
-    val controller = getController().kafkaController
-    controller.eventManager.put(AlterPartitionReceived(brokerId, brokerEpoch, partitionsToAlter, callback))
-    future
+    assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 29aebd55cd..af1762e319 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -20,7 +20,6 @@ package kafka.integration
 import java.io.File
 import java.util
 import java.util.Arrays
-
 import kafka.server.QuorumTestHarness
 import kafka.server._
 import kafka.utils.TestUtils
@@ -30,13 +29,12 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import scala.collection.{Seq, mutable}
 import scala.jdk.CollectionConverters._
 import java.util.Properties
-
 import kafka.utils.TestUtils.{createAdminClient, resource}
 import org.apache.kafka.common.{KafkaException, Uuid}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.controller.ControllerRequestContext.ANONYMOUS_CONTEXT
+import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
 
 /**
  * A test harness that brings up some number of broker nodes
diff --git a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
index dd3bfdd62d..c35bdc0193 100644
--- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
@@ -19,16 +19,18 @@ package kafka.server
 
 import java.util.Collections
 import java.util.stream.{Stream => JStream}
-
 import kafka.api.LeaderAndIsr
 import kafka.utils.{MockScheduler, MockTime}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.ClientResponse
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.TopicIdPartition
+import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.errors.{AuthenticationException, InvalidUpdateVersionException, OperationNotAttemptedException, UnknownServerException, UnsupportedVersionException}
 import org.apache.kafka.common.message.AlterPartitionResponseData
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.AbstractResponse
+import org.apache.kafka.common.requests.RequestHeader
 import org.apache.kafka.common.requests.{AbstractRequest, AlterPartitionRequest, AlterPartitionResponse}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
@@ -40,6 +42,7 @@ import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.Arguments
 import org.junit.jupiter.params.provider.MethodSource
+import org.mockito.ArgumentMatcher
 import org.mockito.ArgumentMatchers.{any, anyString}
 import org.mockito.Mockito.{mock, reset, times, verify}
 import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
@@ -49,15 +52,16 @@ import scala.jdk.CollectionConverters._
 class AlterPartitionManagerTest {
 
   val topic = "test-topic"
+  val topicId = Uuid.randomUuid()
   val time = new MockTime
   val metrics = new Metrics
   val brokerId = 1
 
   var brokerToController: BrokerToControllerChannelManager = _
 
-  val tp0 = new TopicPartition(topic, 0)
-  val tp1 = new TopicPartition(topic, 1)
-  val tp2 = new TopicPartition(topic, 2)
+  val tp0 = new TopicIdPartition(topicId, 0, topic)
+  val tp1 = new TopicIdPartition(topicId, 1, topic)
+  val tp2 = new TopicIdPartition(topicId, 2, topic)
 
   @BeforeEach
   def setup(): Unit = {
@@ -68,9 +72,9 @@ class AlterPartitionManagerTest {
   @MethodSource(Array("provideMetadataVersions"))
   def testBasic(metadataVersion: MetadataVersion): Unit = {
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion)
-    alterIsrManager.start()
-    alterIsrManager.submit(tp0, new LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
+    val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion)
+    alterPartitionManager.start()
+    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
     verify(brokerToController).start()
     verify(brokerToController).sendRequest(any(), any())
   }
@@ -84,9 +88,9 @@ class AlterPartitionManagerTest {
     val requestCapture = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[AlterPartitionRequest]])
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion)
-    alterIsrManager.start()
-    alterIsrManager.submit(tp0, new LeaderAndIsr(1, 1, List(1), leaderRecoveryState, 10), 0)
+    val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion)
+    alterPartitionManager.start()
+    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1), leaderRecoveryState, 10), 0)
     verify(brokerToController).start()
     verify(brokerToController).sendRequest(requestCapture.capture(), any())
 
@@ -98,30 +102,33 @@ class AlterPartitionManagerTest {
   @ParameterizedTest
   @MethodSource(Array("provideMetadataVersions"))
   def testOverwriteWithinBatch(metadataVersion: MetadataVersion): Unit = {
+    val canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
     val capture: ArgumentCaptor[AbstractRequest.Builder[AlterPartitionRequest]] = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[AlterPartitionRequest]])
     val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion)
-    alterIsrManager.start()
+    val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion)
+    alterPartitionManager.start()
 
     // Only send one ISR update for a given topic+partition
-    val firstSubmitFuture = alterIsrManager.submit(tp0, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
+    val firstSubmitFuture = alterPartitionManager.submit(tp0, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
     assertFalse(firstSubmitFuture.isDone)
 
-    val failedSubmitFuture = alterIsrManager.submit(tp0, LeaderAndIsr(1, 1, List(1,2), LeaderRecoveryState.RECOVERED, 10), 0)
+    val failedSubmitFuture = alterPartitionManager.submit(tp0, LeaderAndIsr(1, 1, List(1,2), LeaderRecoveryState.RECOVERED, 10), 0)
     assertTrue(failedSubmitFuture.isCompletedExceptionally)
     assertFutureThrows(failedSubmitFuture, classOf[OperationNotAttemptedException])
 
     // Simulate response
     val alterPartitionResp = partitionResponse(tp0, Errors.NONE)
-    val resp = new ClientResponse(null, null, "", 0L, 0L,
-      false, null, null, alterPartitionResp)
+    val resp = makeClientResponse(
+      response = alterPartitionResp,
+      version = if (canUseTopicIds) ApiKeys.ALTER_PARTITION.latestVersion else 1
+    )
     verify(brokerToController).sendRequest(capture.capture(), callbackCapture.capture())
     callbackCapture.getValue.onComplete(resp)
 
     // Now we can submit this partition again
-    val newSubmitFuture = alterIsrManager.submit(tp0, LeaderAndIsr(1, 1, List(1), LeaderRecoveryState.RECOVERED, 10), 0)
+    val newSubmitFuture = alterPartitionManager.submit(tp0, LeaderAndIsr(1, 1, List(1), LeaderRecoveryState.RECOVERED, 10), 0)
     assertFalse(newSubmitFuture.isDone)
 
     verify(brokerToController).start()
@@ -140,16 +147,16 @@ class AlterPartitionManagerTest {
     val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion)
-    alterIsrManager.start()
+    val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion)
+    alterPartitionManager.start()
 
     // First request will send batch of one
-    alterIsrManager.submit(new TopicPartition(topic, 0),
+    alterPartitionManager.submit(new TopicIdPartition(topicId, 0, topic),
       LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
 
     // Other submissions will queue up until a response
     for (i <- 1 to 9) {
-      alterIsrManager.submit(new TopicPartition(topic, i),
+      alterPartitionManager.submit(new TopicIdPartition(topicId, i, topic),
         LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
     }
 
@@ -200,8 +207,7 @@ class AlterPartitionManagerTest {
 
   private def testRetryOnTopLevelError(error: Errors): Unit = {
     val alterPartitionResp = new AlterPartitionResponse(new AlterPartitionResponseData().setErrorCode(error.code))
-    val response = new ClientResponse(null, null, "", 0L, 0L,
-      false, null, null, alterPartitionResp)
+    val response = makeClientResponse(alterPartitionResp, ApiKeys.ALTER_PARTITION.latestVersion)
     testRetryOnErrorResponse(response)
   }
 
@@ -210,16 +216,16 @@ class AlterPartitionManagerTest {
     val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => IBP_3_2_IV0)
-    alterIsrManager.start()
-    alterIsrManager.submit(tp0, leaderAndIsr, 0)
+    val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => IBP_3_2_IV0)
+    alterPartitionManager.start()
+    alterPartitionManager.submit(tp0, leaderAndIsr, 0)
 
     verify(brokerToController).start()
     verify(brokerToController).sendRequest(any(), callbackCapture.capture())
     callbackCapture.getValue.onComplete(response)
 
     // Any top-level error, we want to retry, so we don't clear items from the pending map
-    assertTrue(alterIsrManager.unsentIsrUpdates.containsKey(tp0))
+    assertTrue(alterPartitionManager.unsentIsrUpdates.containsKey(tp0.topicPartition))
 
     reset(brokerToController)
 
@@ -229,13 +235,12 @@ class AlterPartitionManagerTest {
 
     // After a successful response, we can submit another AlterIsrItem
     val retryAlterPartitionResponse = partitionResponse(tp0, Errors.NONE)
-    val retryResponse = new ClientResponse(null, null, "", 0L, 0L,
-      false, null, null, retryAlterPartitionResponse)
+    val retryResponse = makeClientResponse(retryAlterPartitionResponse, ApiKeys.ALTER_PARTITION.latestVersion)
 
     verify(brokerToController).sendRequest(any(), callbackCapture.capture())
     callbackCapture.getValue.onComplete(retryResponse)
 
-    assertFalse(alterIsrManager.unsentIsrUpdates.containsKey(tp0))
+    assertFalse(alterPartitionManager.unsentIsrUpdates.containsKey(tp0.topicPartition))
   }
 
   @Test
@@ -259,33 +264,32 @@ class AlterPartitionManagerTest {
   }
 
   private def checkPartitionError(error: Errors): Unit = {
-    val alterIsrManager = testPartitionError(tp0, error)
+    val alterPartitionManager = testPartitionError(tp0, error)
     // Any partition-level error should clear the item from the pending queue allowing for future updates
-    val future = alterIsrManager.submit(tp0, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
+    val future = alterPartitionManager.submit(tp0, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
     assertFalse(future.isDone)
   }
 
-  private def testPartitionError(tp: TopicPartition, error: Errors): AlterPartitionManager = {
+  private def testPartitionError(tp: TopicIdPartition, error: Errors): AlterPartitionManager = {
     val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
     reset(brokerToController)
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => IBP_3_2_IV0)
-    alterIsrManager.start()
+    val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => IBP_3_2_IV0)
+    alterPartitionManager.start()
 
-    val future = alterIsrManager.submit(tp, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
+    val future = alterPartitionManager.submit(tp, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
 
     verify(brokerToController).start()
     verify(brokerToController).sendRequest(any(), callbackCapture.capture())
     reset(brokerToController)
 
     val alterPartitionResp = partitionResponse(tp, error)
-    val resp = new ClientResponse(null, null, "", 0L, 0L,
-      false, null, null, alterPartitionResp)
+    val resp = makeClientResponse(alterPartitionResp, ApiKeys.ALTER_PARTITION.latestVersion)
     callbackCapture.getValue.onComplete(resp)
     assertTrue(future.isCompletedExceptionally)
     assertFutureThrows(future, error.exception.getClass)
-    alterIsrManager
+    alterPartitionManager
   }
 
   @ParameterizedTest
@@ -294,15 +298,15 @@ class AlterPartitionManagerTest {
     val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion)
-    alterIsrManager.start()
+    val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion)
+    alterPartitionManager.start()
 
     // First submit will send the request
-    alterIsrManager.submit(tp0, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
+    alterPartitionManager.submit(tp0, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
 
     // These will become pending unsent items
-    alterIsrManager.submit(tp1, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
-    alterIsrManager.submit(tp2, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
+    alterPartitionManager.submit(tp1, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
+    alterPartitionManager.submit(tp2, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
 
     verify(brokerToController).start()
     verify(brokerToController).sendRequest(any(), callbackCapture.capture())
@@ -311,78 +315,209 @@ class AlterPartitionManagerTest {
     reset(brokerToController)
 
     val alterPartitionResp = new AlterPartitionResponse(new AlterPartitionResponseData())
-    val resp = new ClientResponse(null, null, "", 0L, 0L,
-      false, null, null, alterPartitionResp)
+    val resp = makeClientResponse(alterPartitionResp, ApiKeys.ALTER_PARTITION.latestVersion)
     callbackCapture.getValue.onComplete(resp)
   }
 
   @ParameterizedTest
   @MethodSource(Array("provideMetadataVersions"))
   def testPartitionMissingInResponse(metadataVersion: MetadataVersion): Unit = {
-    brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
-
+    val expectedVersion = if (metadataVersion.isTopicIdsSupported) {
+      ApiKeys.ALTER_PARTITION.latestVersion
+    } else {
+      1.toShort
+    }
+    val leaderAndIsr = LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10)
+    val controlledEpoch = 0
     val brokerEpoch = 2
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => brokerEpoch, () => metadataVersion)
-    alterIsrManager.start()
-
-    def matchesAlterIsr(topicPartitions: Set[TopicPartition]): AbstractRequest.Builder[_ <: AbstractRequest] = {
-      ArgumentMatchers.argThat[AbstractRequest.Builder[_ <: AbstractRequest]] { request =>
-        assertEquals(ApiKeys.ALTER_PARTITION, request.apiKey())
-        val alterPartitionRequest = request.asInstanceOf[AlterPartitionRequest.Builder].build()
-
-        val requestTopicPartitions = alterPartitionRequest.data.topics.asScala.flatMap { topicData =>
-          val topic = topicData.name
-          topicData.partitions.asScala.map(partitionData => new TopicPartition(topic, partitionData.partitionIndex))
-        }.toSet
-
-        topicPartitions == requestTopicPartitions
-      }
-    }
-
-    def verifySendAlterIsr(topicPartitions: Set[TopicPartition]): ControllerRequestCompletionHandler = {
-      val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] =
-        ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-      Mockito.verify(brokerToController).sendRequest(
-        matchesAlterIsr(topicPartitions),
-        callbackCapture.capture()
-      )
-      Mockito.reset(brokerToController)
-      callbackCapture.getValue
-    }
-
-    def clientResponse(topicPartition: TopicPartition, error: Errors): ClientResponse = {
-      val alterIsrResponse = partitionResponse(topicPartition, error)
-      new ClientResponse(null, null, "", 0L, 0L,
-        false, null, null, alterIsrResponse)
-    }
+    val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
+    val alterPartitionManager = new DefaultAlterPartitionManager(
+      brokerToController,
+      scheduler,
+      time,
+      brokerId,
+      () => brokerEpoch,
+      () => metadataVersion
+    )
+    alterPartitionManager.start()
 
     // The first `submit` will send the `AlterIsr` request
-    val future1 = alterIsrManager.submit(tp0, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
-    val callback1 = verifySendAlterIsr(Set(tp0))
+    val future1 = alterPartitionManager.submit(tp0, leaderAndIsr, controlledEpoch)
+    val callback1 = verifySendRequest(brokerToController, alterPartitionRequestMatcher(
+      expectedTopicPartitions = Set(tp0),
+      expectedVersion = expectedVersion
+    ))
 
     // Additional calls while the `AlterIsr` request is inflight will be queued
-    val future2 = alterIsrManager.submit(tp1, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
-    val future3 = alterIsrManager.submit(tp2, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0)
+    val future2 = alterPartitionManager.submit(tp1, leaderAndIsr, controlledEpoch)
+    val future3 = alterPartitionManager.submit(tp2, leaderAndIsr, controlledEpoch)
 
     // Respond to the first request, which will also allow the next request to get sent
-    callback1.onComplete(clientResponse(tp0, Errors.UNKNOWN_SERVER_ERROR))
+    callback1.onComplete(makeClientResponse(
+      response = partitionResponse(tp0, Errors.UNKNOWN_SERVER_ERROR),
+      version = expectedVersion
+    ))
     assertFutureThrows(future1, classOf[UnknownServerException])
     assertFalse(future2.isDone)
     assertFalse(future3.isDone)
 
     // Verify the second request includes both expected partitions, but only respond with one of them
-    val callback2 = verifySendAlterIsr(Set(tp1, tp2))
-    callback2.onComplete(clientResponse(tp2, Errors.UNKNOWN_SERVER_ERROR))
+    val callback2 = verifySendRequest(brokerToController, alterPartitionRequestMatcher(
+      expectedTopicPartitions = Set(tp1, tp2),
+      expectedVersion = expectedVersion
+    ))
+    callback2.onComplete(makeClientResponse(
+      response = partitionResponse(tp2, Errors.UNKNOWN_SERVER_ERROR),
+      version = expectedVersion
+    ))
     assertFutureThrows(future3, classOf[UnknownServerException])
     assertFalse(future2.isDone)
 
     // The missing partition should be retried
-    val callback3 = verifySendAlterIsr(Set(tp1))
-    callback3.onComplete(clientResponse(tp1, Errors.UNKNOWN_SERVER_ERROR))
+    val callback3 = verifySendRequest(brokerToController, alterPartitionRequestMatcher(
+      expectedTopicPartitions = Set(tp1),
+      expectedVersion = expectedVersion
+    ))
+    callback3.onComplete(makeClientResponse(
+      response = partitionResponse(tp1, Errors.UNKNOWN_SERVER_ERROR),
+      version = expectedVersion
+    ))
     assertFutureThrows(future2, classOf[UnknownServerException])
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("provideMetadataVersions"))
+  def testPartialTopicIds(metadataVersion: MetadataVersion): Unit = {
+    val canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+    val foo = new TopicIdPartition(Uuid.ZERO_UUID, 0, "foo")
+    val bar = new TopicIdPartition(Uuid.randomUuid(), 0, "bar")
+    val zar = new TopicIdPartition(Uuid.randomUuid(), 0, "zar")
+
+    val leaderAndIsr = LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10)
+    val controlledEpoch = 0
+    val brokerEpoch = 2
+    val scheduler = new MockScheduler(time)
+    val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
+    val alterPartitionManager = new DefaultAlterPartitionManager(
+      brokerToController,
+      scheduler,
+      time,
+      brokerId,
+      () => brokerEpoch,
+      () => metadataVersion
+    )
+    alterPartitionManager.start()
+
+    // Submits an alter isr update with zar, which has a topic id.
+    val future1 = alterPartitionManager.submit(zar, leaderAndIsr, controlledEpoch)
+
+    // The latest version is expected if all the submitted partitions
+    // have topic ids and IBP >= 2.8; version 1 should be used otherwise.
+    val callback1 = verifySendRequest(brokerToController, alterPartitionRequestMatcher(
+      expectedTopicPartitions = Set(zar),
+      expectedVersion = if (canUseTopicIds) ApiKeys.ALTER_PARTITION.latestVersion else 1
+    ))
+
+    // Submits two additional alter isr changes with foo and bar while the previous one
+    // is still inflight. foo has no topic id, bar has one.
+    val future2 = alterPartitionManager.submit(foo, leaderAndIsr, controlledEpoch)
+    val future3 = alterPartitionManager.submit(bar, leaderAndIsr, controlledEpoch)
+
+    // Completes the first request. That triggers the next one.
+    callback1.onComplete(makeClientResponse(
+      response = makeAlterPartition(Seq(makeAlterPartitionTopicData(zar, Errors.NONE))),
+      version = if (canUseTopicIds) ApiKeys.ALTER_PARTITION.latestVersion else 1
+    ))
+
+    assertTrue(future1.isDone)
+    assertFalse(future2.isDone)
+    assertFalse(future3.isDone)
+
+    // Version 1 is expected because foo does not have a topic id.
+    val callback2 = verifySendRequest(brokerToController, alterPartitionRequestMatcher(
+      expectedTopicPartitions = Set(foo, bar),
+      expectedVersion = 1
+    ))
+
+    // Completes the second request.
+    callback2.onComplete(makeClientResponse(
+      response = makeAlterPartition(Seq(
+        makeAlterPartitionTopicData(foo, Errors.NONE),
+        makeAlterPartitionTopicData(bar, Errors.NONE),
+      )),
+      version = 1
+    ))
+
+    assertTrue(future1.isDone)
+    assertTrue(future2.isDone)
+    assertTrue(future3.isDone)
+  }
+
+  private def verifySendRequest(
+    brokerToController: BrokerToControllerChannelManager,
+    expectedRequest: ArgumentMatcher[AbstractRequest.Builder[_ <: AbstractRequest]]
+  ): ControllerRequestCompletionHandler = {
+    val callbackCapture = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+
+    Mockito.verify(brokerToController).sendRequest(
+      ArgumentMatchers.argThat(expectedRequest),
+      callbackCapture.capture()
+    )
+
+    Mockito.reset(brokerToController)
+
+    callbackCapture.getValue
+  }
+
+  private def alterPartitionRequestMatcher(
+    expectedTopicPartitions: Set[TopicIdPartition],
+    expectedVersion: Short
+  ): ArgumentMatcher[AbstractRequest.Builder[_ <: AbstractRequest]] = {
+    request => {
+      assertEquals(ApiKeys.ALTER_PARTITION, request.apiKey)
+
+      val alterPartitionRequest = request.asInstanceOf[AlterPartitionRequest.Builder].build()
+      assertEquals(expectedVersion, alterPartitionRequest.version)
+
+      val requestTopicPartitions = alterPartitionRequest.data.topics.asScala.flatMap { topicData =>
+        topicData.partitions.asScala.map { partitionData =>
+          new TopicIdPartition(topicData.topicId, partitionData.partitionIndex, topicData.topicName)
+        }
+      }.toSet
+
+      expectedTopicPartitions == requestTopicPartitions
+    }
+  }
+
+  private def makeClientResponse(
+    response: AbstractResponse,
+    version: Short
+  ): ClientResponse = {
+    val requestHeader = new RequestHeader(response.apiKey, version, "", 0)
+    new ClientResponse(requestHeader, null, "", 0L, 0L,
+      false, null, null, response)
+  }
+
+  private def makeAlterPartition(
+    topics: Seq[AlterPartitionResponseData.TopicData]
+  ): AlterPartitionResponse = {
+    new AlterPartitionResponse(new AlterPartitionResponseData().setTopics(topics.asJava))
+  }
+
+  private def makeAlterPartitionTopicData(
+    topicIdPartition: TopicIdPartition,
+    error: Errors
+  ): AlterPartitionResponseData.TopicData = {
+    new AlterPartitionResponseData.TopicData()
+      .setTopicName(topicIdPartition.topic)
+      .setTopicId(topicIdPartition.topicId)
+      .setPartitions(Collections.singletonList(
+        new AlterPartitionResponseData.PartitionData()
+          .setPartitionIndex(topicIdPartition.partition)
+          .setErrorCode(error.code)))
+  }
+
   @Test
   def testZkBasic(): Unit = {
     val scheduler = new MockScheduler(time)
@@ -410,11 +545,12 @@ class AlterPartitionManagerTest {
     assertFutureThrows(future2, classOf[InvalidUpdateVersionException])
   }
 
-  private def partitionResponse(tp: TopicPartition, error: Errors): AlterPartitionResponse = {
+  private def partitionResponse(tp: TopicIdPartition, error: Errors): AlterPartitionResponse = {
     new AlterPartitionResponse(new AlterPartitionResponseData()
       .setTopics(Collections.singletonList(
         new AlterPartitionResponseData.TopicData()
-          .setName(tp.topic())
+          .setTopicName(tp.topic)
+          .setTopicId(tp.topicId)
           .setPartitions(Collections.singletonList(
             new AlterPartitionResponseData.PartitionData()
               .setPartitionIndex(tp.partition())
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 9f0ae482c5..0c6f979874 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -52,8 +52,8 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.{ElectionType, Uuid}
+import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
 import org.apache.kafka.controller.{Controller, ControllerRequestContext}
-import org.apache.kafka.controller.ControllerRequestContext.ANONYMOUS_CONTEXT
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.junit.jupiter.api.Assertions._
@@ -298,7 +298,7 @@ class ControllerApisTest {
     assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(
       Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
         handleAlterPartitionRequest(buildRequest(new AlterPartitionRequest.Builder(
-          new AlterPartitionRequestData()).build(0))))
+          new AlterPartitionRequestData(), false).build(0))))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 524aded1f4..d92c76f711 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -19,25 +19,27 @@ package kafka.server
 import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 
 import java.util
-import util.Arrays.asList
+import java.util.Arrays.asList
+import java.util.Collections
+
+import kafka.api.LeaderAndIsr
+import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
 import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState, UpdateMetadataTopicState}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.UpdateMetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.metadata.{BrokerRegistrationChangeRecord, PartitionRecord, RegisterBrokerRecord, RemoveTopicRecord, TopicRecord}
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, BrokerEndpointCollection}
+import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage}
+import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.raft.{OffsetAndEpoch => RaftOffsetAndEpoch}
+
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
-
-import java.util.Collections
-import kafka.api.LeaderAndIsr
-import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
-import org.apache.kafka.common.metadata.{PartitionRecord, RegisterBrokerRecord, RemoveTopicRecord, TopicRecord}
-import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, BrokerEndpointCollection}
-import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage}
-import org.apache.kafka.server.common.MetadataVersion
+import org.junit.jupiter.api.Test
 
 import scala.collection.{Seq, mutable}
 import scala.jdk.CollectionConverters._
@@ -639,4 +641,48 @@ class MetadataCacheTest {
     assertEquals(Seq(expectedNode0, expectedNode1), partitionInfo.inSyncReplicas.toSeq)
     assertEquals(Seq(expectedNode1), partitionInfo.offlineReplicas.toSeq)
   }
+
+  @Test
+  def testIsBrokerFenced(): Unit = {
+    val metadataCache = MetadataCache.kRaftMetadataCache(0)
+
+    val delta = new MetadataDelta(MetadataImage.EMPTY)
+    delta.replay(new RegisterBrokerRecord()
+      .setBrokerId(0)
+      .setFenced(false))
+
+    metadataCache.setImage(delta.apply())
+
+    assertFalse(metadataCache.isBrokerFenced(0))
+
+    delta.replay(new BrokerRegistrationChangeRecord()
+      .setBrokerId(0)
+      .setFenced(1.toByte))
+
+    metadataCache.setImage(delta.apply())
+
+    assertTrue(metadataCache.isBrokerFenced(0))
+  }
+
+  @Test
+  def testIsBrokerInControlledShutdown(): Unit = {
+    val metadataCache = MetadataCache.kRaftMetadataCache(0)
+
+    val delta = new MetadataDelta(MetadataImage.EMPTY)
+    delta.replay(new RegisterBrokerRecord()
+      .setBrokerId(0)
+      .setInControlledShutdown(false))
+
+    metadataCache.setImage(delta.apply())
+
+    assertFalse(metadataCache.isBrokerShuttingDown(0))
+
+    delta.replay(new BrokerRegistrationChangeRecord()
+      .setBrokerId(0)
+      .setInControlledShutdown(1.toByte))
+
+    metadataCache.setImage(delta.apply())
+
+    assertTrue(metadataCache.isBrokerShuttingDown(0))
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index df95f701c5..651451afad 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -20,12 +20,13 @@ import java.net.InetAddress
 import java.util
 import java.util.concurrent.{CompletableFuture, Executors, LinkedBlockingQueue, TimeUnit}
 import java.util.{Optional, Properties}
-
 import kafka.api.LeaderAndIsr
 import kafka.log.{AppendOrigin, LogConfig}
+import kafka.server.metadata.KRaftMetadataCache
 import kafka.server.metadata.MockConfigRepository
 import kafka.utils.TestUtils.waitUntilTrue
 import kafka.utils.{MockTime, ShutdownableThread, TestUtils}
+import org.apache.kafka.common.metadata.RegisterBrokerRecord
 import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, TopicRecord}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -70,8 +71,9 @@ class ReplicaManagerConcurrencyTest {
   def testIsrExpandAndShrinkWithConcurrentProduce(): Unit = {
     val localId = 0
     val remoteId = 1
+    val metadataCache = MetadataCache.kRaftMetadataCache(localId)
     val channel = new ControllerChannel
-    val replicaManager = buildReplicaManager(localId, channel)
+    val replicaManager = buildReplicaManager(localId, channel, metadataCache)
 
     // Start with the remote replica out of the ISR
     val initialPartitionRegistration = registration(
@@ -84,7 +86,7 @@ class ReplicaManagerConcurrencyTest {
     val topicModel = new TopicModel(Uuid.randomUuid(), "foo", Map(0 -> initialPartitionRegistration))
     val topicPartition = new TopicPartition(topicModel.name, 0)
     val topicIdPartition = new TopicIdPartition(topicModel.topicId, topicPartition)
-    val controller = new ControllerModel(topicModel, channel, replicaManager)
+    val controller = new ControllerModel(Seq(localId, remoteId), topicModel, channel, replicaManager, metadataCache)
 
     submit(new Clock(time))
     replicaManager.startup()
@@ -140,7 +142,8 @@ class ReplicaManagerConcurrencyTest {
 
   private def buildReplicaManager(
     localId: Int,
-    channel: ControllerChannel
+    channel: ControllerChannel,
+    metadataCache: MetadataCache,
   ): ReplicaManager = {
     val logDir = TestUtils.tempDir()
 
@@ -168,7 +171,7 @@ class ReplicaManagerConcurrencyTest {
       scheduler = time.scheduler,
       logManager = logManager,
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, ""),
-      metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId),
+      metadataCache = metadataCache,
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
       alterPartitionManager = new MockAlterPartitionManager(channel)
     ) {
@@ -295,7 +298,7 @@ class ReplicaManagerConcurrencyTest {
   case object ShutdownEvent extends ControllerEvent
   case class AlterIsrEvent(
     future: CompletableFuture[LeaderAndIsr],
-    topicPartition: TopicPartition,
+    topicPartition: TopicIdPartition,
     leaderAndIsr: LeaderAndIsr
   ) extends ControllerEvent
 
@@ -307,7 +310,7 @@ class ReplicaManagerConcurrencyTest {
     }
 
     def alterIsr(
-      topicPartition: TopicPartition,
+      topicPartition: TopicIdPartition,
       leaderAndIsr: LeaderAndIsr
     ): CompletableFuture[LeaderAndIsr] = {
       val future = new CompletableFuture[LeaderAndIsr]()
@@ -325,9 +328,11 @@ class ReplicaManagerConcurrencyTest {
   }
 
   private class ControllerModel(
+    brokerIds: Seq[Int],
     topic: TopicModel,
     channel: ControllerChannel,
-    replicaManager: ReplicaManager
+    replicaManager: ReplicaManager,
+    metadataCache: KRaftMetadataCache
   ) extends ShutdownableThread(name = "controller", isInterruptible = false) {
     private var latestImage = MetadataImage.EMPTY
 
@@ -345,8 +350,15 @@ class ReplicaManagerConcurrencyTest {
       channel.poll() match {
         case InitializeEvent =>
           val delta = new MetadataDelta(latestImage)
+          brokerIds.foreach { brokerId =>
+            delta.replay(new RegisterBrokerRecord()
+              .setBrokerId(brokerId)
+              .setFenced(false)
+            )
+          }
           topic.initialize(delta)
           latestImage = delta.apply()
+          metadataCache.setImage(latestImage)
           replicaManager.applyDelta(delta.topicsDelta, latestImage)
 
         case AlterIsrEvent(future, topicPartition, leaderAndIsr) =>
@@ -380,7 +392,7 @@ class ReplicaManagerConcurrencyTest {
     }
 
     def alterIsr(
-      topicPartition: TopicPartition,
+      topicPartition: TopicIdPartition,
       leaderAndIsr: LeaderAndIsr,
       delta: MetadataDelta
     ): LeaderAndIsr = {
@@ -433,7 +445,7 @@ class ReplicaManagerConcurrencyTest {
 
   private class MockAlterPartitionManager(channel: ControllerChannel) extends AlterPartitionManager {
     override def submit(
-      topicPartition: TopicPartition,
+      topicPartition: TopicIdPartition,
       leaderAndIsr: LeaderAndIsr,
       controllerEpoch: Int
     ): CompletableFuture[LeaderAndIsr] = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index a5a820d651..6d17e93782 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -599,7 +599,7 @@ class RequestQuotaTest extends BaseRequestTest {
             tp, 10, 5, Collections.singletonList(3)))
 
         case ApiKeys.ALTER_PARTITION =>
-          new AlterPartitionRequest.Builder(new AlterPartitionRequestData())
+          new AlterPartitionRequest.Builder(new AlterPartitionRequestData(), true)
 
         case ApiKeys.UPDATE_FEATURES =>
           new UpdateFeaturesRequest.Builder(new UpdateFeaturesRequestData())
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e5d8152a99..05610413e9 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -28,8 +28,8 @@ import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{Callable, CompletableFuture, ExecutionException, Executors, TimeUnit}
 import java.util.{Arrays, Collections, Optional, Properties}
-
 import com.yammer.metrics.core.{Gauge, Meter}
+
 import javax.net.ssl.X509TrustManager
 import kafka.api._
 import kafka.cluster.{AlterPartitionListener, Broker, EndPoint}
@@ -47,6 +47,7 @@ import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.TopicIdPartition
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
 import org.apache.kafka.common.config.{ConfigException, ConfigResource}
 import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
@@ -1307,13 +1308,18 @@ object TestUtils extends Logging {
 
 
     override def submit(
-      topicPartition: TopicPartition,
+      topicPartition: TopicIdPartition,
       leaderAndIsr: LeaderAndIsr,
       controllerEpoch: Int
     ): CompletableFuture[LeaderAndIsr]= {
       val future = new CompletableFuture[LeaderAndIsr]()
       if (inFlight.compareAndSet(false, true)) {
-        isrUpdates += AlterPartitionItem(topicPartition, leaderAndIsr, future, controllerEpoch)
+        isrUpdates += AlterPartitionItem(
+          topicPartition,
+          leaderAndIsr,
+          future,
+          controllerEpoch
+        )
       } else {
         future.completeExceptionally(new OperationNotAttemptedException(
           s"Failed to enqueue AlterIsr request for $topicPartition since there is already an inflight request"))
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
index e18d68c587..e4bc2f3eb4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
@@ -17,8 +17,11 @@
 
 package org.apache.kafka.controller;
 
+
+import org.apache.kafka.common.message.RequestHeaderData;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
 
 import java.util.OptionalLong;
 
@@ -27,9 +30,6 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 
 public class ControllerRequestContext {
-    public static final ControllerRequestContext ANONYMOUS_CONTEXT =
-        new ControllerRequestContext(KafkaPrincipal.ANONYMOUS,
-            OptionalLong.empty());
 
     public static OptionalLong requestTimeoutMsToDeadlineNs(
         Time time,
@@ -39,17 +39,38 @@ public class ControllerRequestContext {
     }
 
     private final KafkaPrincipal principal;
-
     private final OptionalLong deadlineNs;
+    private final RequestHeaderData requestHeader;
 
     public ControllerRequestContext(
+        RequestHeaderData requestHeader,
         KafkaPrincipal principal,
         OptionalLong deadlineNs
     ) {
+        this.requestHeader = requestHeader;
         this.principal = principal;
         this.deadlineNs = deadlineNs;
     }
 
+    public ControllerRequestContext(
+        AuthorizableRequestContext requestContext,
+        OptionalLong deadlineNs
+    ) {
+        this(
+            new RequestHeaderData()
+                .setRequestApiKey((short) requestContext.requestType())
+                .setRequestApiVersion((short) requestContext.requestVersion())
+                .setCorrelationId(requestContext.correlationId())
+                .setClientId(requestContext.clientId()),
+            requestContext.principal(),
+            deadlineNs
+        );
+    }
+
+    public RequestHeaderData requestHeader() {
+        return requestHeader;
+    }
+
     public KafkaPrincipal principal() {
         return principal;
     }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 88146766e3..97c0cdd782 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1619,7 +1619,7 @@ public final class QuorumController implements Controller {
             return CompletableFuture.completedFuture(new AlterPartitionResponseData());
         }
         return appendWriteEvent("alterPartition", context.deadlineNs(),
-            () -> replicationControl.alterPartition(request));
+            () -> replicationControl.alterPartition(context, request));
     }
 
     @Override
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 1c81954617..8382cd9c16 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -120,10 +120,13 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECO
 import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
 import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
 import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
+import static org.apache.kafka.common.protocol.Errors.INELIGIBLE_REPLICA;
 import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
 import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION;
+import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED;
 import static org.apache.kafka.common.protocol.Errors.NONE;
 import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
+import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
 import static org.apache.kafka.common.protocol.Errors.TOPIC_AUTHORIZATION_FAILED;
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
@@ -915,22 +918,31 @@ public class ReplicationControlManager {
         return new HashSet<>(imbalancedPartitions);
     }
 
-    ControllerResult<AlterPartitionResponseData> alterPartition(AlterPartitionRequestData request) {
+    ControllerResult<AlterPartitionResponseData> alterPartition(
+        ControllerRequestContext context,
+        AlterPartitionRequestData request
+    ) {
+        short requestVersion = context.requestHeader().requestApiVersion();
         clusterControl.checkBrokerEpoch(request.brokerId(), request.brokerEpoch());
         AlterPartitionResponseData response = new AlterPartitionResponseData();
         List<ApiMessageAndVersion> records = new ArrayList<>();
         for (AlterPartitionRequestData.TopicData topicData : request.topics()) {
             AlterPartitionResponseData.TopicData responseTopicData =
-                new AlterPartitionResponseData.TopicData().setName(topicData.name());
+                new AlterPartitionResponseData.TopicData().
+                    setTopicName(topicData.topicName()).
+                    setTopicId(topicData.topicId());
             response.topics().add(responseTopicData);
-            Uuid topicId = topicsByName.get(topicData.name());
-            if (topicId == null || !topics.containsKey(topicId)) {
+
+            Uuid topicId = requestVersion > 1 ? topicData.topicId() : topicsByName.get(topicData.topicName());
+            if (topicId == null || topicId.equals(Uuid.ZERO_UUID) || !topics.containsKey(topicId)) {
+                Errors error = requestVersion > 1 ? UNKNOWN_TOPIC_ID : UNKNOWN_TOPIC_OR_PARTITION;
                 for (AlterPartitionRequestData.PartitionData partitionData : topicData.partitions()) {
                     responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                         setPartitionIndex(partitionData.partitionIndex()).
-                        setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()));
+                        setErrorCode(error.code()));
                 }
-                log.info("Rejecting AlterPartition request for unknown topic ID {}.", topicId);
+                log.info("Rejecting AlterPartition request for unknown topic ID {} or name {}.",
+                    topicData.topicId(), topicData.topicName());
                 continue;
             }
 
@@ -939,7 +951,15 @@ public class ReplicationControlManager {
                 int partitionId = partitionData.partitionIndex();
                 PartitionRegistration partition = topic.parts.get(partitionId);
 
-                Errors validationError = validateAlterPartitionData(request.brokerId(), topic, partitionId, partition, partitionData);
+                Errors validationError = validateAlterPartitionData(
+                    request.brokerId(),
+                    topic,
+                    partitionId,
+                    partition,
+                    clusterControl::active,
+                    context.requestHeader().requestApiVersion(),
+                    partitionData);
+
                 if (validationError != Errors.NONE) {
                     responseTopicData.partitions().add(
                         new AlterPartitionResponseData.PartitionData()
@@ -956,7 +976,7 @@ public class ReplicationControlManager {
                     partitionId,
                     clusterControl::active,
                     featureControl.metadataVersion().isLeaderRecoverySupported());
-                if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicData.name())) {
+                if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
                     builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
                 }
                 builder.setTargetIsr(partitionData.newIsr());
@@ -979,18 +999,20 @@ public class ReplicationControlManager {
                         // ISR change completes it, then the leader may change as part of
                         // the changes made during reassignment cleanup.
                         //
-                        // In this case, we report back FENCED_LEADER_EPOCH to the leader
+                        // In this case, we report back NEW_LEADER_ELECTED to the leader
                         // which made the AlterPartition request. This lets it know that it must
                         // fetch new metadata before trying again. This return code is
                         // unusual because we both return an error and generate a new
                         // metadata record. We usually only do one or the other.
+                        // FENCED_LEADER_EPOCH is used for request version below or equal to 1.
+                        Errors error = requestVersion > 1 ? NEW_LEADER_ELECTED : FENCED_LEADER_EPOCH;
                         log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment and triggered a " +
-                            "leadership change. Returning FENCED_LEADER_EPOCH.",
-                            request.brokerId(), topic.name, partitionId);
+                            "leadership change. Returning {}.",
+                            request.brokerId(), topic.name, partitionId, error);
                         responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                             setPartitionIndex(partitionId).
-                            setErrorCode(FENCED_LEADER_EPOCH.code()));
+                            setErrorCode(error.code()));
                         continue;
                     } else if (change.removingReplicas() != null ||
                             change.addingReplicas() != null) {
@@ -1026,6 +1048,7 @@ public class ReplicationControlManager {
      * @param topic current topic information store by the replication manager
      * @param partitionId partition id being altered
      * @param partition current partition registration for the partition being altered
+     * @param isEligibleReplica function telling if the replica is acceptable to join the ISR
      * @param partitionData partition data from the alter partition request
      *
      * @return Errors.NONE for valid alter partition data; otherwise the validation error
@@ -1035,6 +1058,8 @@ public class ReplicationControlManager {
         TopicControlInfo topic,
         int partitionId,
         PartitionRegistration partition,
+        Function<Integer, Boolean> isEligibleReplica,
+        short requestApiVersion,
         AlterPartitionRequestData.PartitionData partitionData
     ) {
         if (partition == null) {
@@ -1085,7 +1110,7 @@ public class ReplicationControlManager {
         if (leaderRecoveryState == LeaderRecoveryState.RECOVERING && newIsr.length > 1) {
             log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
                     "the ISR {} had more than one replica while the leader was still " +
-                    "recovering from an unlcean leader election {}.",
+                    "recovering from an unclean leader election {}.",
                     brokerId, topic.name, partitionId, partitionData.newIsr(),
                     leaderRecoveryState);
 
@@ -1093,7 +1118,6 @@ public class ReplicationControlManager {
         }
         if (partition.leaderRecoveryState == LeaderRecoveryState.RECOVERED &&
                 leaderRecoveryState == LeaderRecoveryState.RECOVERING) {
-
             log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
                     "the leader recovery state cannot change from RECOVERED to RECOVERING.",
                     brokerId, topic.name, partitionId);
@@ -1101,6 +1125,21 @@ public class ReplicationControlManager {
             return INVALID_REQUEST;
         }
 
+        List<Integer> ineligibleReplicas = partitionData.newIsr().stream()
+            .filter(replica -> !isEligibleReplica.apply(replica))
+            .collect(Collectors.toList());
+        if (!ineligibleReplicas.isEmpty()) {
+            log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
+                    "it specified ineligible replicas {} in the new ISR {}.",
+                    brokerId, topic.name, partitionId, ineligibleReplicas, partitionData.newIsr());
+
+            if (requestApiVersion > 1) {
+                return INELIGIBLE_REPLICA;
+            } else {
+                return OPERATION_NOT_ATTEMPTED;
+            }
+        }
+
         return Errors.NONE;
     }
 
@@ -1113,7 +1152,6 @@ public class ReplicationControlManager {
      * @param brokerId      The broker id.
      * @param records       The record list to append to.
      */
-
     void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) {
         BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId);
         if (brokerRegistration == null) {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
index d767302289..710a975778 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
@@ -103,7 +103,7 @@ public interface ClusterMetadataAuthorizer extends Authorizer {
         AclMutator aclMutator = aclMutatorOrException();
         aclBindings.forEach(b -> futures.add(new CompletableFuture<>()));
         ControllerRequestContext context = new ControllerRequestContext(
-            requestContext.principal(), OptionalLong.empty());
+            requestContext, OptionalLong.empty());
         aclMutator.createAcls(context, aclBindings).whenComplete((results, throwable) -> {
             if (throwable == null && results.size() != futures.size()) {
                 throwable = new UnknownServerException("Invalid size " +
@@ -143,7 +143,7 @@ public interface ClusterMetadataAuthorizer extends Authorizer {
         AclMutator aclMutator = aclMutatorOrException();
         filters.forEach(b -> futures.add(new CompletableFuture<>()));
         ControllerRequestContext context = new ControllerRequestContext(
-            requestContext.principal(), OptionalLong.empty());
+            requestContext, OptionalLong.empty());
         aclMutator.deleteAcls(context, filters).whenComplete((results, throwable) -> {
             if (throwable == null && results.size() != futures.size()) {
                 throwable = new UnknownServerException("Invalid size " +
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java b/metadata/src/test/java/org/apache/kafka/controller/ControllerRequestContextUtil.java
similarity index 53%
copy from metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
copy to metadata/src/test/java/org/apache/kafka/controller/ControllerRequestContextUtil.java
index e18d68c587..8d70a2d82f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ControllerRequestContextUtil.java
@@ -17,44 +17,32 @@
 
 package org.apache.kafka.controller;
 
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import org.apache.kafka.common.utils.Time;
-
 import java.util.OptionalLong;
+import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
-
-public class ControllerRequestContext {
+public class ControllerRequestContextUtil {
     public static final ControllerRequestContext ANONYMOUS_CONTEXT =
-        new ControllerRequestContext(KafkaPrincipal.ANONYMOUS,
+        new ControllerRequestContext(
+            new RequestHeaderData(),
+            KafkaPrincipal.ANONYMOUS,
             OptionalLong.empty());
 
-    public static OptionalLong requestTimeoutMsToDeadlineNs(
-        Time time,
-        int millisecondsOffset
-    ) {
-        return OptionalLong.of(time.nanoseconds() + NANOSECONDS.convert(millisecondsOffset, MILLISECONDS));
+    public static ControllerRequestContext anonymousContextFor(ApiKeys apiKeys) {
+        return anonymousContextFor(apiKeys, apiKeys.latestVersion());
     }
 
-    private final KafkaPrincipal principal;
-
-    private final OptionalLong deadlineNs;
-
-    public ControllerRequestContext(
-        KafkaPrincipal principal,
-        OptionalLong deadlineNs
+    public static ControllerRequestContext anonymousContextFor(
+        ApiKeys apiKeys,
+        short version
     ) {
-        this.principal = principal;
-        this.deadlineNs = deadlineNs;
-    }
-
-    public KafkaPrincipal principal() {
-        return principal;
-    }
-
-    public OptionalLong deadlineNs() {
-        return deadlineNs;
+        return new ControllerRequestContext(
+            new RequestHeaderData()
+                .setRequestApiKey(apiKeys.id)
+                .setRequestApiVersion(version),
+            KafkaPrincipal.ANONYMOUS,
+            OptionalLong.empty()
+        );
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 1729d95312..34ad4d9727 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -43,6 +43,7 @@ import java.util.stream.StreamSupport;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.RequestHeaderData;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.config.ConfigResource;
@@ -105,7 +106,7 @@ import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
 import static org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER0;
 import static org.apache.kafka.controller.ConfigurationControlManagerTest.SCHEMA;
 import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
-import static org.apache.kafka.controller.ControllerRequestContext.ANONYMOUS_CONTEXT;
+import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -400,7 +401,7 @@ public class QuorumControllerTest {
                 .setNewIsr(Arrays.asList(1, 2, 3));
 
             AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData()
-                .setName("foo");
+                .setTopicName("foo");
             topicData.partitions().add(partitionData);
 
             AlterPartitionRequestData alterPartitionRequest = new AlterPartitionRequestData()
@@ -857,7 +858,7 @@ public class QuorumControllerTest {
                 CountDownLatch countDownLatch = controller.pause();
                 long now = controller.time().nanoseconds();
                 ControllerRequestContext context0 = new ControllerRequestContext(
-                    KafkaPrincipal.ANONYMOUS, OptionalLong.of(now));
+                    new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(now));
                 CompletableFuture<CreateTopicsResponseData> createFuture =
                     controller.createTopics(context0, new CreateTopicsRequestData().setTimeoutMs(0).
                         setTopics(new CreatableTopicCollection(Collections.singleton(
@@ -993,7 +994,7 @@ public class QuorumControllerTest {
                 .collect(Collectors.toList());
 
             AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData()
-                .setName(topicName);
+                .setTopicName(topicName);
             topicData.partitions().addAll(alterPartitions);
 
             int leaderId = 0;
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 59b5488f6a..d33776ca10 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -61,12 +61,14 @@ import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 import org.apache.kafka.controller.ReplicationControlManager.KRaftClusterDescriber;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistration;
@@ -113,16 +115,20 @@ import static org.apache.kafka.common.config.TopicConfig.SEGMENT_BYTES_CONFIG;
 import static org.apache.kafka.common.protocol.Errors.ELECTION_NOT_NEEDED;
 import static org.apache.kafka.common.protocol.Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE;
 import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
+import static org.apache.kafka.common.protocol.Errors.INELIGIBLE_REPLICA;
 import static org.apache.kafka.common.protocol.Errors.INVALID_PARTITIONS;
 import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICATION_FACTOR;
 import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT;
 import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
+import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED;
 import static org.apache.kafka.common.protocol.Errors.NONE;
 import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
+import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
 import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION;
 import static org.apache.kafka.common.protocol.Errors.PREFERRED_LEADER_NOT_AVAILABLE;
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
+import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor;
 import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -319,10 +325,14 @@ public class ReplicationControlManagerTest {
 
             String topicName = replicationControl.getTopic(topicIdPartition.topicId()).name();
             TopicData topicData = new TopicData()
-                .setName(topicName)
+                .setTopicName(topicName)
+                .setTopicId(topicIdPartition.topicId())
                 .setPartitions(singletonList(partitionData));
 
+            ControllerRequestContext requestContext =
+                anonymousContextFor(ApiKeys.ALTER_PARTITION);
             ControllerResult<AlterPartitionResponseData> alterPartition = replicationControl.alterPartition(
+                requestContext,
                 new AlterPartitionRequestData()
                     .setBrokerId(leaderId)
                     .setBrokerEpoch(registration.epoch())
@@ -857,26 +867,63 @@ public class ReplicationControlManagerTest {
             new int[][] {new int[] {0, 1, 2}});
 
         TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
-        TopicPartition topicPartition = new TopicPartition("foo", 0);
         assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
         long brokerEpoch = ctx.currentBrokerEpoch(0);
         PartitionData shrinkIsrRequest = newAlterPartition(
             replicationControl, topicIdPartition, asList(0, 1), LeaderRecoveryState.RECOVERED);
-        ControllerResult<AlterPartitionResponseData> shrinkIsrResult = sendAlterIsr(
-            replicationControl, 0, brokerEpoch, "foo", shrinkIsrRequest);
+        ControllerResult<AlterPartitionResponseData> shrinkIsrResult = sendAlterPartition(
+            replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), shrinkIsrRequest);
         AlterPartitionResponseData.PartitionData shrinkIsrResponse = assertAlterPartitionResponse(
-            shrinkIsrResult, topicPartition, NONE);
+            shrinkIsrResult, topicIdPartition, NONE);
         assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
 
         PartitionData expandIsrRequest = newAlterPartition(
             replicationControl, topicIdPartition, asList(0, 1, 2), LeaderRecoveryState.RECOVERED);
-        ControllerResult<AlterPartitionResponseData> expandIsrResult = sendAlterIsr(
-            replicationControl, 0, brokerEpoch, "foo", expandIsrRequest);
+        ControllerResult<AlterPartitionResponseData> expandIsrResult = sendAlterPartition(
+            replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), expandIsrRequest);
         AlterPartitionResponseData.PartitionData expandIsrResponse = assertAlterPartitionResponse(
-            expandIsrResult, topicPartition, NONE);
+            expandIsrResult, topicIdPartition, NONE);
         assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, expandIsrResponse);
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testAlterPartitionHandleUnknownTopicIdOrName(short version) throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2);
+        ctx.unfenceBrokers(0, 1, 2);
+
+        String topicName = "foo";
+        Uuid topicId = Uuid.randomUuid();
+
+        AlterPartitionRequestData request = new AlterPartitionRequestData()
+            .setBrokerId(0)
+            .setBrokerEpoch(100)
+            .setTopics(asList(new AlterPartitionRequestData.TopicData()
+                .setTopicName(version <= 1 ? topicName : "")
+                .setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID)
+                .setPartitions(asList(new PartitionData()
+                    .setPartitionIndex(0)))));
+
+        ControllerRequestContext requestContext =
+            anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
+
+        ControllerResult<AlterPartitionResponseData> result =
+            replicationControl.alterPartition(requestContext, request);
+
+        Errors expectedError = version > 1 ? UNKNOWN_TOPIC_ID : UNKNOWN_TOPIC_OR_PARTITION;
+        AlterPartitionResponseData expectedResponse = new AlterPartitionResponseData()
+            .setTopics(asList(new AlterPartitionResponseData.TopicData()
+                .setTopicName(version <= 1 ? topicName : "")
+                .setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID)
+                .setPartitions(asList(new AlterPartitionResponseData.PartitionData()
+                    .setPartitionIndex(0)
+                    .setErrorCode(expectedError.code())))));
+
+        assertEquals(expectedResponse, result.response());
+    }
+
     @Test
     public void testInvalidAlterPartitionRequests() throws Exception {
         ReplicationControlTestContext ctx = new ReplicationControlTestContext();
@@ -887,7 +934,6 @@ public class ReplicationControlManagerTest {
             new int[][] {new int[] {0, 1, 2}});
 
         TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
-        TopicPartition topicPartition = new TopicPartition("foo", 0);
         int leaderId = 0;
         int notLeaderId = 1;
         assertEquals(OptionalInt.of(leaderId), ctx.currentLeader(topicIdPartition));
@@ -896,57 +942,57 @@ public class ReplicationControlManagerTest {
         // Invalid leader
         PartitionData invalidLeaderRequest = newAlterPartition(
             replicationControl, topicIdPartition, asList(0, 1), LeaderRecoveryState.RECOVERED);
-        ControllerResult<AlterPartitionResponseData> invalidLeaderResult = sendAlterIsr(
+        ControllerResult<AlterPartitionResponseData> invalidLeaderResult = sendAlterPartition(
             replicationControl, notLeaderId, ctx.currentBrokerEpoch(notLeaderId),
-            "foo", invalidLeaderRequest);
-        assertAlterPartitionResponse(invalidLeaderResult, topicPartition, Errors.INVALID_REQUEST);
+            topicIdPartition.topicId(), invalidLeaderRequest);
+        assertAlterPartitionResponse(invalidLeaderResult, topicIdPartition, Errors.INVALID_REQUEST);
 
         // Stale broker epoch
         PartitionData invalidBrokerEpochRequest = newAlterPartition(
             replicationControl, topicIdPartition, asList(0, 1), LeaderRecoveryState.RECOVERED);
-        assertThrows(StaleBrokerEpochException.class, () -> sendAlterIsr(
-            replicationControl, leaderId, brokerEpoch - 1, "foo", invalidBrokerEpochRequest));
+        assertThrows(StaleBrokerEpochException.class, () -> sendAlterPartition(
+            replicationControl, leaderId, brokerEpoch - 1, topicIdPartition.topicId(), invalidBrokerEpochRequest));
 
         // Invalid leader epoch
         PartitionData invalidLeaderEpochRequest = newAlterPartition(
             replicationControl, topicIdPartition, asList(0, 1), LeaderRecoveryState.RECOVERED);
         invalidLeaderEpochRequest.setLeaderEpoch(500);
-        ControllerResult<AlterPartitionResponseData> invalidLeaderEpochResult = sendAlterIsr(
+        ControllerResult<AlterPartitionResponseData> invalidLeaderEpochResult = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
-            "foo", invalidLeaderEpochRequest);
-        assertAlterPartitionResponse(invalidLeaderEpochResult, topicPartition, FENCED_LEADER_EPOCH);
+            topicIdPartition.topicId(), invalidLeaderEpochRequest);
+        assertAlterPartitionResponse(invalidLeaderEpochResult, topicIdPartition, FENCED_LEADER_EPOCH);
 
         // Invalid ISR (3 is not a valid replica)
         PartitionData invalidIsrRequest1 = newAlterPartition(
             replicationControl, topicIdPartition, asList(0, 1, 3), LeaderRecoveryState.RECOVERED);
-        ControllerResult<AlterPartitionResponseData> invalidIsrResult1 = sendAlterIsr(
+        ControllerResult<AlterPartitionResponseData> invalidIsrResult1 = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
-            "foo", invalidIsrRequest1);
-        assertAlterPartitionResponse(invalidIsrResult1, topicPartition, Errors.INVALID_REQUEST);
+            topicIdPartition.topicId(), invalidIsrRequest1);
+        assertAlterPartitionResponse(invalidIsrResult1, topicIdPartition, Errors.INVALID_REQUEST);
 
         // Invalid ISR (does not include leader 0)
         PartitionData invalidIsrRequest2 = newAlterPartition(
             replicationControl, topicIdPartition, asList(1, 2), LeaderRecoveryState.RECOVERED);
-        ControllerResult<AlterPartitionResponseData> invalidIsrResult2 = sendAlterIsr(
+        ControllerResult<AlterPartitionResponseData> invalidIsrResult2 = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
-            "foo", invalidIsrRequest2);
-        assertAlterPartitionResponse(invalidIsrResult2, topicPartition, Errors.INVALID_REQUEST);
+            topicIdPartition.topicId(), invalidIsrRequest2);
+        assertAlterPartitionResponse(invalidIsrResult2, topicIdPartition, Errors.INVALID_REQUEST);
 
         // Invalid ISR length and recovery state
         PartitionData invalidIsrRecoveryRequest = newAlterPartition(
             replicationControl, topicIdPartition, asList(0, 1), LeaderRecoveryState.RECOVERING);
-        ControllerResult<AlterPartitionResponseData> invalidIsrRecoveryResult = sendAlterIsr(
+        ControllerResult<AlterPartitionResponseData> invalidIsrRecoveryResult = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
-            "foo", invalidIsrRecoveryRequest);
-        assertAlterPartitionResponse(invalidIsrRecoveryResult, topicPartition, Errors.INVALID_REQUEST);
+            topicIdPartition.topicId(), invalidIsrRecoveryRequest);
+        assertAlterPartitionResponse(invalidIsrRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
 
-        // Invalid recovery state transtion from RECOVERED to RECOVERING
+        // Invalid recovery state transition from RECOVERED to RECOVERING
         PartitionData invalidRecoveryRequest = newAlterPartition(
             replicationControl, topicIdPartition, asList(0), LeaderRecoveryState.RECOVERING);
-        ControllerResult<AlterPartitionResponseData> invalidRecoveryResult = sendAlterIsr(
+        ControllerResult<AlterPartitionResponseData> invalidRecoveryResult = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
-            "foo", invalidRecoveryRequest);
-        assertAlterPartitionResponse(invalidRecoveryResult, topicPartition, Errors.INVALID_REQUEST);
+            topicIdPartition.topicId(), invalidRecoveryRequest);
+        assertAlterPartitionResponse(invalidRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
     }
 
     private PartitionData newAlterPartition(
@@ -965,11 +1011,11 @@ public class ReplicationControlManagerTest {
             .setLeaderRecoveryState(leaderRecoveryState.value());
     }
 
-    private ControllerResult<AlterPartitionResponseData> sendAlterIsr(
+    private ControllerResult<AlterPartitionResponseData> sendAlterPartition(
         ReplicationControlManager replicationControl,
         int brokerId,
         long brokerEpoch,
-        String topic,
+        Uuid topicId,
         AlterPartitionRequestData.PartitionData partitionData
     ) throws Exception {
         AlterPartitionRequestData request = new AlterPartitionRequestData()
@@ -977,29 +1023,30 @@ public class ReplicationControlManagerTest {
             .setBrokerEpoch(brokerEpoch);
 
         AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData()
-            .setName(topic);
+            .setTopicId(topicId);
         request.topics().add(topicData);
         topicData.partitions().add(partitionData);
 
-        ControllerResult<AlterPartitionResponseData> result = replicationControl.alterPartition(request);
+        ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.ALTER_PARTITION);
+        ControllerResult<AlterPartitionResponseData> result = replicationControl.alterPartition(requestContext, request);
         RecordTestUtils.replayAll(replicationControl, result.records());
         return result;
     }
 
     private AlterPartitionResponseData.PartitionData assertAlterPartitionResponse(
         ControllerResult<AlterPartitionResponseData> alterPartitionResult,
-        TopicPartition topicPartition,
+        TopicIdPartition topicIdPartition,
         Errors expectedError
     ) {
         AlterPartitionResponseData response = alterPartitionResult.response();
         assertEquals(1, response.topics().size());
 
         AlterPartitionResponseData.TopicData topicData = response.topics().get(0);
-        assertEquals(topicPartition.topic(), topicData.name());
+        assertEquals(topicIdPartition.topicId(), topicData.topicId());
         assertEquals(1, topicData.partitions().size());
 
         AlterPartitionResponseData.PartitionData partitionData = topicData.partitions().get(0);
-        assertEquals(topicPartition.partition(), partitionData.partitionIndex());
+        assertEquals(topicIdPartition.partitionId(), partitionData.partitionIndex());
         assertEquals(expectedError, Errors.forCode(partitionData.errorCode()));
         return partitionData;
     }
@@ -1312,8 +1359,9 @@ public class ReplicationControlManagerTest {
     private final static ListPartitionReassignmentsResponseData NONE_REASSIGNING =
         new ListPartitionReassignmentsResponseData().setErrorMessage(null);
 
-    @Test
-    public void testReassignPartitions() throws Exception {
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testReassignPartitions(short version) throws Exception {
         ReplicationControlTestContext ctx = new ReplicationControlTestContext();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3);
@@ -1398,21 +1446,167 @@ public class ReplicationControlManagerTest {
                         setErrorMessage(null)))))),
             cancelResult);
         log.info("running final alterPartition...");
+        ControllerRequestContext requestContext =
+            anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
         ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
+            requestContext,
             new AlterPartitionRequestData().setBrokerId(3).setBrokerEpoch(103).
-                setTopics(asList(new TopicData().setName("foo").setPartitions(asList(
-                    new PartitionData().setPartitionIndex(1).setPartitionEpoch(1).
-                        setLeaderEpoch(0).setNewIsr(asList(3, 0, 2, 1)))))));
+                setTopics(asList(new TopicData().
+                    setTopicName(version <= 1 ? "foo" : "").
+                    setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
+                    setPartitions(asList(new PartitionData().
+                        setPartitionIndex(1).
+                        setPartitionEpoch(1).
+                        setLeaderEpoch(0).
+                        setNewIsr(asList(3, 0, 2, 1)))))));
+        Errors expectedError = version > 1 ? NEW_LEADER_ELECTED : FENCED_LEADER_EPOCH;
         assertEquals(new AlterPartitionResponseData().setTopics(asList(
-            new AlterPartitionResponseData.TopicData().setName("foo").setPartitions(asList(
+            new AlterPartitionResponseData.TopicData().
+                setTopicName(version <= 1 ? "foo" : "").
+                setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
+                setPartitions(asList(
                 new AlterPartitionResponseData.PartitionData().
                     setPartitionIndex(1).
-                    setErrorCode(FENCED_LEADER_EPOCH.code()))))),
+                    setErrorCode(expectedError.code()))))),
             alterPartitionResult.response());
         ctx.replay(alterPartitionResult.records());
         assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testAlterPartitionShouldRejectFencedBrokers(short version) throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+        Uuid fooId = ctx.createTestTopic(
+            "foo",
+            new int[][] {new int[] {1, 2, 3, 4}}
+        ).topicId();
+
+        List<ApiMessageAndVersion> fenceRecords = new ArrayList<>();
+        replication.handleBrokerFenced(3, fenceRecords);
+        ctx.replay(fenceRecords);
+
+        assertEquals(
+            new PartitionRegistration(
+                new int[] {1, 2, 3, 4},
+                new int[] {1, 2, 4},
+                new int[] {},
+                new int[] {},
+                1,
+                LeaderRecoveryState.RECOVERED,
+                1,
+                1),
+            replication.getPartition(fooId, 0));
+
+        AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData()
+            .setBrokerId(1)
+            .setBrokerEpoch(101)
+            .setTopics(asList(new TopicData()
+                .setTopicName(version <= 1 ? "foo" : "")
+                .setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
+                .setPartitions(asList(new PartitionData()
+                    .setPartitionIndex(0)
+                    .setPartitionEpoch(1)
+                    .setLeaderEpoch(1)
+                    .setNewIsr(asList(1, 2, 3, 4))))));
+
+        ControllerRequestContext requestContext =
+            anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
+
+        ControllerResult<AlterPartitionResponseData> alterPartitionResult =
+            replication.alterPartition(requestContext, alterIsrRequest);
+
+        Errors expectedError = version <= 1 ? OPERATION_NOT_ATTEMPTED : INELIGIBLE_REPLICA;
+        assertEquals(
+            new AlterPartitionResponseData()
+                .setTopics(asList(new AlterPartitionResponseData.TopicData()
+                    .setTopicName(version <= 1 ? "foo" : "")
+                    .setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
+                    .setPartitions(asList(new AlterPartitionResponseData.PartitionData()
+                        .setPartitionIndex(0)
+                        .setErrorCode(expectedError.code()))))),
+            alterPartitionResult.response());
+
+        fenceRecords = new ArrayList<>();
+        replication.handleBrokerUnfenced(3, 103, fenceRecords);
+        ctx.replay(fenceRecords);
+
+        alterPartitionResult = replication.alterPartition(requestContext, alterIsrRequest);
+
+        assertEquals(
+            new AlterPartitionResponseData()
+                .setTopics(asList(new AlterPartitionResponseData.TopicData()
+                    .setTopicName(version <= 1 ? "foo" : "")
+                    .setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
+                    .setPartitions(asList(new AlterPartitionResponseData.PartitionData()
+                        .setPartitionIndex(0)
+                        .setLeaderId(1)
+                        .setLeaderEpoch(1)
+                        .setIsr(asList(1, 2, 3, 4))
+                        .setPartitionEpoch(2)
+                        .setErrorCode(NONE.code()))))),
+            alterPartitionResult.response());
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testAlterPartitionShouldRejectShuttingDownBrokers(short version) throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+        Uuid fooId = ctx.createTestTopic(
+            "foo",
+            new int[][] {new int[] {1, 2, 3, 4}}
+        ).topicId();
+
+        assertEquals(
+            new PartitionRegistration(
+                new int[] {1, 2, 3, 4},
+                new int[] {1, 2, 3, 4},
+                new int[] {},
+                new int[] {},
+                1,
+                LeaderRecoveryState.RECOVERED,
+                0,
+                0),
+            replication.getPartition(fooId, 0));
+
+        ctx.inControlledShutdownBrokers(3);
+
+        AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData()
+            .setBrokerId(1)
+            .setBrokerEpoch(101)
+            .setTopics(asList(new TopicData()
+                .setTopicName(version <= 1 ? "foo" : "")
+                .setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
+                .setPartitions(asList(new PartitionData()
+                    .setPartitionIndex(0)
+                    .setPartitionEpoch(0)
+                    .setLeaderEpoch(0)
+                    .setNewIsr(asList(1, 2, 3, 4))))));
+
+        ControllerRequestContext requestContext =
+            anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
+
+        ControllerResult<AlterPartitionResponseData> alterPartitionResult =
+            replication.alterPartition(requestContext, alterIsrRequest);
+
+        Errors expectedError = version <= 1 ? OPERATION_NOT_ATTEMPTED : INELIGIBLE_REPLICA;
+        assertEquals(
+            new AlterPartitionResponseData()
+                .setTopics(asList(new AlterPartitionResponseData.TopicData()
+                    .setTopicName(version <= 1 ? "foo" : "")
+                    .setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
+                    .setPartitions(asList(new AlterPartitionResponseData.PartitionData()
+                        .setPartitionIndex(0)
+                        .setErrorCode(expectedError.code()))))),
+            alterPartitionResult.response());
+    }
+
     @Test
     public void testCancelReassignPartitions() throws Exception {
         ReplicationControlTestContext ctx = new ReplicationControlTestContext();
@@ -1487,17 +1681,18 @@ public class ReplicationControlManagerTest {
             new ListPartitionReassignmentsTopics().setName("bar").
                 setPartitionIndexes(asList(0, 1, 2)))));
         ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
+            anonymousContextFor(ApiKeys.ALTER_PARTITION),
             new AlterPartitionRequestData().setBrokerId(4).setBrokerEpoch(104).
-                setTopics(asList(new TopicData().setName("bar").setPartitions(asList(
+                setTopics(asList(new TopicData().setTopicId(barId).setPartitions(asList(
                     new PartitionData().setPartitionIndex(0).setPartitionEpoch(2).
-                        setLeaderEpoch(1).setNewIsr(asList(4, 1, 2, 3, 0)))))));
+                        setLeaderEpoch(1).setNewIsr(asList(4, 1, 2, 0)))))));
         assertEquals(new AlterPartitionResponseData().setTopics(asList(
-            new AlterPartitionResponseData.TopicData().setName("bar").setPartitions(asList(
+            new AlterPartitionResponseData.TopicData().setTopicId(barId).setPartitions(asList(
                 new AlterPartitionResponseData.PartitionData().
                     setPartitionIndex(0).
                     setLeaderId(4).
                     setLeaderEpoch(1).
-                    setIsr(asList(4, 1, 2, 3, 0)).
+                    setIsr(asList(4, 1, 2, 0)).
                     setPartitionEpoch(3).
                     setErrorCode(NONE.code()))))),
             alterPartitionResult.response());
@@ -1785,8 +1980,9 @@ public class ReplicationControlManagerTest {
         ctx.unfenceBrokers(0, 1);
 
         ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
+            anonymousContextFor(ApiKeys.ALTER_PARTITION),
             new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102).
-                setTopics(asList(new AlterPartitionRequestData.TopicData().setName("foo").
+                setTopics(asList(new AlterPartitionRequestData.TopicData().setTopicId(fooId).
                     setPartitions(asList(
                         new AlterPartitionRequestData.PartitionData().
                             setPartitionIndex(0).setPartitionEpoch(0).
@@ -1795,7 +1991,7 @@ public class ReplicationControlManagerTest {
                             setPartitionIndex(2).setPartitionEpoch(0).
                             setLeaderEpoch(0).setNewIsr(asList(0, 2, 1)))))));
         assertEquals(new AlterPartitionResponseData().setTopics(asList(
-            new AlterPartitionResponseData.TopicData().setName("foo").setPartitions(asList(
+            new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(asList(
                 new AlterPartitionResponseData.PartitionData().
                     setPartitionIndex(0).
                     setLeaderId(2).
@@ -1871,13 +2067,14 @@ public class ReplicationControlManagerTest {
         ctx.unfenceBrokers(1);
 
         ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
+            anonymousContextFor(ApiKeys.ALTER_PARTITION),
             new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102).
-                setTopics(asList(new AlterPartitionRequestData.TopicData().setName("foo").
+                setTopics(asList(new AlterPartitionRequestData.TopicData().setTopicId(fooId).
                     setPartitions(asList(new AlterPartitionRequestData.PartitionData().
                         setPartitionIndex(0).setPartitionEpoch(0).
                         setLeaderEpoch(0).setNewIsr(asList(1, 2, 3)))))));
         assertEquals(new AlterPartitionResponseData().setTopics(asList(
-            new AlterPartitionResponseData.TopicData().setName("foo").setPartitions(asList(
+            new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(asList(
                 new AlterPartitionResponseData.PartitionData().
                     setPartitionIndex(0).
                     setLeaderId(2).
@@ -1903,13 +2100,14 @@ public class ReplicationControlManagerTest {
         ctx.unfenceBrokers(0);
 
         alterPartitionResult = replication.alterPartition(
+            anonymousContextFor(ApiKeys.ALTER_PARTITION),
             new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102).
-                setTopics(asList(new AlterPartitionRequestData.TopicData().setName("foo").
+                setTopics(asList(new AlterPartitionRequestData.TopicData().setTopicId(fooId).
                     setPartitions(asList(new AlterPartitionRequestData.PartitionData().
                         setPartitionIndex(2).setPartitionEpoch(0).
                         setLeaderEpoch(0).setNewIsr(asList(0, 2, 1)))))));
         assertEquals(new AlterPartitionResponseData().setTopics(asList(
-            new AlterPartitionResponseData.TopicData().setName("foo").setPartitions(asList(
+            new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(asList(
                 new AlterPartitionResponseData.PartitionData().
                     setPartitionIndex(2).
                     setLeaderId(2).