You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/25 22:25:44 UTC

[kafka] branch 2.8 updated: KAFKA-12367; Ensure partition epoch is propagated to `Partition` state (#10200)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new a6ee7e6  KAFKA-12367; Ensure partition epoch is propagated to `Partition` state (#10200)
a6ee7e6 is described below

commit a6ee7e6fb81a31f58d570e8e2758ff4ec6c13660
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Feb 25 14:17:19 2021 -0800

    KAFKA-12367; Ensure partition epoch is propagated to `Partition` state (#10200)
    
    This patch fixes two problem with the AlterIsr handling of the quorum controller:
    
    - Ensure that partition epoch is updated correctly after partition change records and is
    propagated to Partition
    
    - Ensure that AlterIsr response includes partitions that were successfully updated
    
    As part of this patch, I've renamed BrokersToIsrs.TopicPartition to
    BrokersToIsrs.TopicIdPartition to avoid confusion with the TopicPartition object which is
    used virtually everywhere. I've attempted to address some of the testing gaps as welll.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |   2 +-
 .../kafka/server/metadata/MetadataPartitions.scala |  12 +-
 .../server/metadata/MetadataPartitionsTest.scala   |  54 ++++-
 .../server/RaftReplicaChangeDelegateTest.scala     | 146 +++++++++++++
 .../org/apache/kafka/controller/BrokersToIsrs.java |  18 +-
 .../controller/ReplicationControlManager.java      |  57 +++--
 .../apache/kafka/controller/BrokersToIsrsTest.java |  36 ++--
 .../kafka/controller/QuorumControllerTest.java     |   8 +-
 .../controller/ReplicationControlManagerTest.java  | 232 +++++++++++++++++++--
 9 files changed, 479 insertions(+), 86 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index cfd029b..5442fc8 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1402,7 +1402,7 @@ class Partition(val topicPartition: TopicPartition,
             case Errors.FENCED_LEADER_EPOCH =>
               debug(s"Failed to update ISR to $proposedIsrState since we sent an old leader epoch. Giving up.")
             case Errors.INVALID_UPDATE_VERSION =>
-              debug(s"Failed to update ISR to $proposedIsrState due to invalid zk version. Giving up.")
+              debug(s"Failed to update ISR to $proposedIsrState due to invalid version. Giving up.")
             case _ =>
               warn(s"Failed to update ISR to $proposedIsrState due to unexpected $error. Retrying.")
               sendAlterIsrRequest(proposedIsrState)
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala b/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
index bd84e7a..96ed8a5 100644
--- a/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
+++ b/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
@@ -39,6 +39,7 @@ object MetadataPartition {
       record.leaderEpoch(),
       record.replicas(),
       record.isr(),
+      record.partitionEpoch(),
       Collections.emptyList(), // TODO KAFKA-12285 handle offline replicas
       Collections.emptyList(),
       Collections.emptyList())
@@ -52,6 +53,7 @@ object MetadataPartition {
       partition.leaderEpoch(),
       partition.replicas(),
       partition.isr(),
+      partition.zkVersion(),
       partition.offlineReplicas(),
       prevPartition.flatMap(p => Some(p.addingReplicas)).getOrElse(Collections.emptyList()),
       prevPartition.flatMap(p => Some(p.removingReplicas)).getOrElse(Collections.emptyList())
@@ -65,6 +67,7 @@ case class MetadataPartition(topicName: String,
                              leaderEpoch: Int,
                              replicas: util.List[Integer],
                              isr: util.List[Integer],
+                             partitionEpoch: Int,
                              offlineReplicas: util.List[Integer],
                              addingReplicas: util.List[Integer],
                              removingReplicas: util.List[Integer]) {
@@ -79,13 +82,13 @@ case class MetadataPartition(topicName: String,
       setIsr(isr).
       setAddingReplicas(addingReplicas).
       setRemovingReplicas(removingReplicas).
-      setIsNew(isNew)
-    // Note: we don't set ZKVersion here.
+      setIsNew(isNew).
+      setZkVersion(partitionEpoch)
   }
 
   def isReplicaFor(brokerId: Int): Boolean = replicas.contains(Integer.valueOf(brokerId))
 
-  def copyWithChanges(record: PartitionChangeRecord): MetadataPartition = {
+  def merge(record: PartitionChangeRecord): MetadataPartition = {
     val (newLeader, newLeaderEpoch) = if (record.leader() == MetadataPartition.NO_LEADER_CHANGE) {
       (leaderId, leaderEpoch)
     } else {
@@ -102,6 +105,7 @@ case class MetadataPartition(topicName: String,
       newLeaderEpoch,
       replicas,
       newIsr,
+      partitionEpoch + 1,
       offlineReplicas,
       addingReplicas,
       removingReplicas)
@@ -132,7 +136,7 @@ class MetadataPartitionsBuilder(val brokerId: Int,
         case None => throw new RuntimeException(s"Unable to locate topic with name $name")
         case Some(partitionMap) => Option(partitionMap.get(record.partitionId())) match {
           case None => throw new RuntimeException(s"Unable to locate $name-${record.partitionId}")
-          case Some(partition) => set(partition.copyWithChanges(record))
+          case Some(partition) => set(partition.merge(record))
         }
       }
     }
diff --git a/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala b/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
index 1b4cff7..a708d73 100644
--- a/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
+++ b/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
@@ -18,11 +18,14 @@
 package kafka.server.metadata
 
 import java.util.Collections
+
 import org.apache.kafka.common.Uuid
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{Test, Timeout}
-
 import java.util.concurrent.TimeUnit
+
+import org.apache.kafka.common.metadata.PartitionChangeRecord
+
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
@@ -30,16 +33,14 @@ import scala.jdk.CollectionConverters._
 @Timeout(value = 120000, unit = TimeUnit.MILLISECONDS)
 class MetadataPartitionsTest {
 
-  val emptyPartitions = MetadataPartitions(Collections.emptyMap(), Collections.emptyMap())
+  private val emptyPartitions = MetadataPartitions(Collections.emptyMap(), Collections.emptyMap())
 
   private def newPartition(topicName: String,
                            partitionIndex: Int,
                            replicas: Option[Seq[Int]] = None,
                            isr: Option[Seq[Int]] = None): MetadataPartition = {
-    val effectiveReplicas = replicas
-      .getOrElse(List(partitionIndex, partitionIndex + 1, partitionIndex + 2))
-      .map(Int.box)
-      .toList.asJava
+    val effectiveReplicas = asJavaList(replicas
+      .getOrElse(List(partitionIndex, partitionIndex + 1, partitionIndex + 2)))
 
     val effectiveIsr = isr match {
       case None => effectiveReplicas
@@ -47,9 +48,11 @@ class MetadataPartitionsTest {
     }
     new MetadataPartition(topicName,
       partitionIndex,
-      partitionIndex % 3, 100,
+      effectiveReplicas.asScala.head,
+      leaderEpoch = 100,
       effectiveReplicas,
       effectiveIsr,
+      partitionEpoch = 200,
       Collections.emptyList(),
       Collections.emptyList(),
       Collections.emptyList())
@@ -149,4 +152,41 @@ class MetadataPartitionsTest {
     assertEquals(Some("bar"), image.topicIdToName(Uuid.fromString("a1I0JF3yRzWFyOuY3F_vHw")))
     assertEquals(None, image.topicIdToName(Uuid.fromString("gdMy05W7QWG4ZjWir1DjBw")))
   }
+
+  @Test
+  def testMergePartitionChangeRecord(): Unit = {
+    val initialMetadata = newPartition(
+      topicName = "foo",
+      partitionIndex = 0,
+      replicas = Some(Seq(1, 2, 3)),
+      isr = Some(Seq(1, 2, 3))
+    )
+    assertEquals(1, initialMetadata.leaderId)
+
+    // If only the ISR changes, then the leader epoch
+    // remains the same and the partition epoch is bumped.
+    val updatedIsr = initialMetadata.merge(new PartitionChangeRecord()
+      .setPartitionId(0)
+      .setIsr(asJavaList(Seq(1, 2))))
+    assertEquals(asJavaList(Seq(1, 2)), updatedIsr.isr)
+    assertEquals(initialMetadata.leaderEpoch, updatedIsr.leaderEpoch)
+    assertEquals(initialMetadata.partitionEpoch + 1, updatedIsr.partitionEpoch)
+    assertEquals(initialMetadata.leaderId, updatedIsr.leaderId)
+
+    // If the leader changes, then both the leader epoch
+    // and the partition epoch should get bumped.
+    val updatedLeader = initialMetadata.merge(new PartitionChangeRecord()
+      .setPartitionId(0)
+      .setLeader(2)
+      .setIsr(asJavaList(Seq(2, 3))))
+    assertEquals(asJavaList(Seq(2, 3)), updatedLeader.isr)
+    assertEquals(initialMetadata.leaderEpoch + 1, updatedLeader.leaderEpoch)
+    assertEquals(initialMetadata.partitionEpoch + 1, updatedLeader.partitionEpoch)
+    assertEquals(2, updatedLeader.leaderId)
+  }
+
+  private def asJavaList(replicas: Iterable[Int]): java.util.List[Integer] = {
+    replicas.map(Int.box).toList.asJava
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/server/RaftReplicaChangeDelegateTest.scala b/core/src/test/scala/unit/kafka/server/RaftReplicaChangeDelegateTest.scala
new file mode 100644
index 0000000..609f1cf
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/RaftReplicaChangeDelegateTest.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util.Collections
+
+import kafka.cluster.Partition
+import kafka.controller.StateChangeLogger
+import kafka.server.checkpoints.OffsetCheckpoints
+import kafka.server.metadata.{MetadataBroker, MetadataBrokers, MetadataPartition}
+import org.apache.kafka.common.message.LeaderAndIsrRequestData
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.{Node, TopicPartition}
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+import org.mockito.ArgumentMatchers
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+
+import scala.jdk.CollectionConverters._
+
+class RaftReplicaChangeDelegateTest {
+  private val listenerName = new ListenerName("PLAINTEXT")
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testLeaderAndIsrPropagation(isLeader: Boolean): Unit = {
+    val leaderId = 0
+    val topicPartition = new TopicPartition("foo", 5)
+    val replicas = Seq(0, 1, 2).map(Int.box).asJava
+
+    val helper = mockedHelper()
+    val partition = mock(classOf[Partition])
+    when(partition.topicPartition).thenReturn(topicPartition)
+
+    val highWatermarkCheckpoints = mock(classOf[OffsetCheckpoints])
+    when(highWatermarkCheckpoints.fetch(
+      anyString(),
+      ArgumentMatchers.eq(topicPartition)
+    )).thenReturn(None)
+
+    val metadataPartition = new MetadataPartition(
+      topicName = topicPartition.topic,
+      partitionIndex = topicPartition.partition,
+      leaderId = leaderId,
+      leaderEpoch = 27,
+      replicas = replicas,
+      isr = replicas,
+      partitionEpoch = 50,
+      offlineReplicas = Collections.emptyList(),
+      addingReplicas = Collections.emptyList(),
+      removingReplicas = Collections.emptyList()
+    )
+
+    val expectedLeaderAndIsr = new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
+      .setTopicName(topicPartition.topic)
+      .setPartitionIndex(topicPartition.partition)
+      .setIsNew(true)
+      .setLeader(leaderId)
+      .setLeaderEpoch(27)
+      .setReplicas(replicas)
+      .setIsr(replicas)
+      .setAddingReplicas(Collections.emptyList())
+      .setRemovingReplicas(Collections.emptyList())
+      .setZkVersion(50)
+
+    val delegate = new RaftReplicaChangeDelegate(helper)
+    val updatedPartitions = if (isLeader) {
+      when(partition.makeLeader(expectedLeaderAndIsr, highWatermarkCheckpoints))
+        .thenReturn(true)
+      delegate.makeLeaders(
+        prevPartitionsAlreadyExisting = Set.empty,
+        partitionStates = Map(partition -> metadataPartition),
+        highWatermarkCheckpoints,
+        metadataOffset = Some(500)
+      )
+    } else {
+      when(partition.makeFollower(expectedLeaderAndIsr, highWatermarkCheckpoints))
+        .thenReturn(true)
+      when(partition.leaderReplicaIdOpt).thenReturn(Some(leaderId))
+      delegate.makeFollowers(
+        prevPartitionsAlreadyExisting = Set.empty,
+        currentBrokers = aliveBrokers(replicas),
+        partitionStates = Map(partition -> metadataPartition),
+        highWatermarkCheckpoints,
+        metadataOffset = Some(500)
+      )
+    }
+
+    assertEquals(Set(partition), updatedPartitions)
+  }
+
+  private def aliveBrokers(replicas: java.util.List[Integer]): MetadataBrokers = {
+    def mkNode(replicaId: Int): Node = {
+      new Node(replicaId, "localhost", 9092 + replicaId, "")
+    }
+
+    val brokers = replicas.asScala.map { replicaId =>
+      replicaId -> MetadataBroker(
+        id = replicaId,
+        rack = "",
+        endpoints = Map(listenerName.value -> mkNode(replicaId)),
+        fenced = false
+      )
+    }.toMap
+
+    MetadataBrokers(brokers.values.toList.asJava, brokers.asJava)
+  }
+
+  private def mockedHelper(): RaftReplicaChangeDelegateHelper = {
+    val helper = mock(classOf[RaftReplicaChangeDelegateHelper])
+
+    val stateChangeLogger = mock(classOf[StateChangeLogger])
+    when(helper.stateChangeLogger).thenReturn(stateChangeLogger)
+    when(stateChangeLogger.isDebugEnabled).thenReturn(false)
+    when(stateChangeLogger.isTraceEnabled).thenReturn(false)
+
+    val replicaFetcherManager = mock(classOf[ReplicaFetcherManager])
+    when(helper.replicaFetcherManager).thenReturn(replicaFetcherManager)
+
+    val replicaAlterLogDirsManager = mock(classOf[ReplicaAlterLogDirsManager])
+    when(helper.replicaAlterLogDirsManager).thenReturn(replicaAlterLogDirsManager)
+
+    val config = mock(classOf[KafkaConfig])
+    when(config.interBrokerListenerName).thenReturn(listenerName)
+    when(helper.config).thenReturn(config)
+
+    helper
+  }
+
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index 6b219eb..46148e7 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -54,11 +54,11 @@ public class BrokersToIsrs {
 
     private final static int REPLICA_MASK = 0x7fff_ffff;
 
-    static class TopicPartition {
+    static class TopicIdPartition {
         private final Uuid topicId;
         private final int partitionId;
 
-        TopicPartition(Uuid topicId, int partitionId) {
+        TopicIdPartition(Uuid topicId, int partitionId) {
             this.topicId = topicId;
             this.partitionId = partitionId;
         }
@@ -73,8 +73,8 @@ public class BrokersToIsrs {
 
         @Override
         public boolean equals(Object o) {
-            if (!(o instanceof TopicPartition)) return false;
-            TopicPartition other = (TopicPartition) o;
+            if (!(o instanceof TopicIdPartition)) return false;
+            TopicIdPartition other = (TopicIdPartition) o;
             return other.topicId.equals(topicId) && other.partitionId == partitionId;
         }
 
@@ -89,13 +89,13 @@ public class BrokersToIsrs {
         }
     }
 
-    static class PartitionsOnReplicaIterator implements Iterator<TopicPartition> {
+    static class PartitionsOnReplicaIterator implements Iterator<TopicIdPartition> {
         private final Iterator<Entry<Uuid, int[]>> iterator;
         private final boolean leaderOnly;
         private int offset = 0;
         Uuid uuid = Uuid.ZERO_UUID;
         int[] replicas = EMPTY;
-        private TopicPartition next = null;
+        private TopicIdPartition next = null;
 
         PartitionsOnReplicaIterator(Map<Uuid, int[]> topicMap, boolean leaderOnly) {
             this.iterator = topicMap.entrySet().iterator();
@@ -115,18 +115,18 @@ public class BrokersToIsrs {
                 }
                 int replica = replicas[offset++];
                 if ((!leaderOnly) || (replica & LEADER_FLAG) != 0) {
-                    next = new TopicPartition(uuid, replica & REPLICA_MASK);
+                    next = new TopicIdPartition(uuid, replica & REPLICA_MASK);
                     return true;
                 }
             }
         }
 
         @Override
-        public TopicPartition next() {
+        public TopicIdPartition next() {
             if (!hasNext()) {
                 throw new NoSuchElementException();
             }
-            TopicPartition result = next;
+            TopicIdPartition result = next;
             next = null;
             return result;
         }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index ccf83bb..3d51bc4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -49,7 +49,7 @@ import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
 import org.apache.kafka.metadata.ApiMessageAndVersion;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistration;
@@ -102,13 +102,13 @@ public class ReplicationControlManager {
     }
 
     static class PartitionControlInfo {
-        private final int[] replicas;
-        private final int[] isr;
-        private final int[] removingReplicas;
-        private final int[] addingReplicas;
-        private final int leader;
-        private final int leaderEpoch;
-        private final int partitionEpoch;
+        public final int[] replicas;
+        public final int[] isr;
+        public final int[] removingReplicas;
+        public final int[] addingReplicas;
+        public final int leader;
+        public final int leaderEpoch;
+        public final int partitionEpoch;
 
         PartitionControlInfo(PartitionRecord record) {
             this(Replicas.toArray(record.replicas()),
@@ -581,6 +581,12 @@ public class ReplicationControlManager {
                         setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
                     continue;
                 }
+                if (request.brokerId() != partition.leader) {
+                    responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+                        setPartitionIndex(partitionData.partitionIndex()).
+                        setErrorCode(Errors.INVALID_REQUEST.code()));
+                    continue;
+                }
                 if (partitionData.leaderEpoch() != partition.leaderEpoch) {
                     responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
                         setPartitionIndex(partitionData.partitionIndex()).
@@ -611,6 +617,13 @@ public class ReplicationControlManager {
                     setPartitionId(partitionData.partitionIndex()).
                     setTopicId(topic.id).
                     setIsr(partitionData.newIsr()), (short) 0));
+                responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+                    setPartitionIndex(partitionData.partitionIndex()).
+                    setErrorCode(Errors.NONE.code()).
+                    setLeaderId(partition.leader).
+                    setLeaderEpoch(partition.leaderEpoch).
+                    setCurrentIsrVersion(partition.partitionEpoch + 1).
+                    setIsr(partitionData.newIsr()));
             }
         }
         return new ControllerResult<>(records, response);
@@ -663,21 +676,21 @@ public class ReplicationControlManager {
      * @param records               The record list to append to.
      */
     void handleNodeDeactivated(int brokerId, List<ApiMessageAndVersion> records) {
-        Iterator<TopicPartition> iterator = brokersToIsrs.iterator(brokerId, false);
+        Iterator<TopicIdPartition> iterator = brokersToIsrs.iterator(brokerId, false);
         while (iterator.hasNext()) {
-            TopicPartition topicPartition = iterator.next();
-            TopicControlInfo topic = topics.get(topicPartition.topicId());
+            TopicIdPartition topicIdPartition = iterator.next();
+            TopicControlInfo topic = topics.get(topicIdPartition.topicId());
             if (topic == null) {
-                throw new RuntimeException("Topic ID " + topicPartition.topicId() + " existed in " +
+                throw new RuntimeException("Topic ID " + topicIdPartition.topicId() + " existed in " +
                     "isrMembers, but not in the topics map.");
             }
-            PartitionControlInfo partition = topic.parts.get(topicPartition.partitionId());
+            PartitionControlInfo partition = topic.parts.get(topicIdPartition.partitionId());
             if (partition == null) {
-                throw new RuntimeException("Partition " + topicPartition +
+                throw new RuntimeException("Partition " + topicIdPartition +
                     " existed in isrMembers, but not in the partitions map.");
             }
             PartitionChangeRecord record = new PartitionChangeRecord().
-                setPartitionId(topicPartition.partitionId()).
+                setPartitionId(topicIdPartition.partitionId()).
                 setTopicId(topic.id);
             int[] newIsr = Replicas.copyWithout(partition.isr, brokerId);
             if (newIsr.length == 0) {
@@ -727,24 +740,24 @@ public class ReplicationControlManager {
      * @param records       The record list to append to.
      */
     void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records) {
-        Iterator<TopicPartition> iterator = brokersToIsrs.noLeaderIterator();
+        Iterator<TopicIdPartition> iterator = brokersToIsrs.noLeaderIterator();
         while (iterator.hasNext()) {
-            TopicPartition topicPartition = iterator.next();
-            TopicControlInfo topic = topics.get(topicPartition.topicId());
+            TopicIdPartition topicIdPartition = iterator.next();
+            TopicControlInfo topic = topics.get(topicIdPartition.topicId());
             if (topic == null) {
-                throw new RuntimeException("Topic ID " + topicPartition.topicId() + " existed in " +
+                throw new RuntimeException("Topic ID " + topicIdPartition.topicId() + " existed in " +
                     "isrMembers, but not in the topics map.");
             }
-            PartitionControlInfo partition = topic.parts.get(topicPartition.partitionId());
+            PartitionControlInfo partition = topic.parts.get(topicIdPartition.partitionId());
             if (partition == null) {
-                throw new RuntimeException("Partition " + topicPartition +
+                throw new RuntimeException("Partition " + topicIdPartition +
                     " existed in isrMembers, but not in the partitions map.");
             }
             // TODO: if this partition is configured for unclean leader election,
             // check the replica set rather than the ISR.
             if (Replicas.contains(partition.isr, brokerId)) {
                 records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
-                    setPartitionId(topicPartition.partitionId()).
+                    setPartitionId(topicIdPartition.partitionId()).
                     setTopicId(topic.id).
                     setLeader(brokerId), (short) 0));
             }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java b/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java
index 6f124ad..525bf1e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java
@@ -20,7 +20,7 @@ package org.apache.kafka.controller;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.controller.BrokersToIsrs.PartitionsOnReplicaIterator;
-import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -38,16 +38,16 @@ public class BrokersToIsrsTest {
         Uuid.fromString("U52uRe20RsGI0RvpcTx33Q")
     };
 
-    private static Set<TopicPartition> toSet(TopicPartition... partitions) {
-        HashSet<TopicPartition> set = new HashSet<>();
-        for (TopicPartition partition : partitions) {
+    private static Set<TopicIdPartition> toSet(TopicIdPartition... partitions) {
+        HashSet<TopicIdPartition> set = new HashSet<>();
+        for (TopicIdPartition partition : partitions) {
             set.add(partition);
         }
         return set;
     }
 
-    private static Set<TopicPartition> toSet(PartitionsOnReplicaIterator iterator) {
-        HashSet<TopicPartition> set = new HashSet<>();
+    private static Set<TopicIdPartition> toSet(PartitionsOnReplicaIterator iterator) {
+        HashSet<TopicIdPartition> set = new HashSet<>();
         while (iterator.hasNext()) {
             set.add(iterator.next());
         }
@@ -61,18 +61,18 @@ public class BrokersToIsrsTest {
         assertEquals(toSet(), toSet(brokersToIsrs.iterator(1, false)));
         brokersToIsrs.update(UUIDS[0], 0, null, new int[] {1, 2, 3}, -1, 1);
         brokersToIsrs.update(UUIDS[1], 1, null, new int[] {2, 3, 4}, -1, 4);
-        assertEquals(toSet(new TopicPartition(UUIDS[0], 0)),
+        assertEquals(toSet(new TopicIdPartition(UUIDS[0], 0)),
             toSet(brokersToIsrs.iterator(1, false)));
-        assertEquals(toSet(new TopicPartition(UUIDS[0], 0),
-                           new TopicPartition(UUIDS[1], 1)),
+        assertEquals(toSet(new TopicIdPartition(UUIDS[0], 0),
+                           new TopicIdPartition(UUIDS[1], 1)),
             toSet(brokersToIsrs.iterator(2, false)));
-        assertEquals(toSet(new TopicPartition(UUIDS[1], 1)),
+        assertEquals(toSet(new TopicIdPartition(UUIDS[1], 1)),
             toSet(brokersToIsrs.iterator(4, false)));
         assertEquals(toSet(), toSet(brokersToIsrs.iterator(5, false)));
         brokersToIsrs.update(UUIDS[1], 2, null, new int[] {3, 2, 1}, -1, 3);
-        assertEquals(toSet(new TopicPartition(UUIDS[0], 0),
-                new TopicPartition(UUIDS[1], 1),
-                new TopicPartition(UUIDS[1], 2)),
+        assertEquals(toSet(new TopicIdPartition(UUIDS[0], 0),
+                new TopicIdPartition(UUIDS[1], 1),
+                new TopicIdPartition(UUIDS[1], 2)),
             toSet(brokersToIsrs.iterator(2, false)));
     }
 
@@ -82,14 +82,14 @@ public class BrokersToIsrsTest {
         BrokersToIsrs brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
         brokersToIsrs.update(UUIDS[0], 0, null, new int[]{1, 2, 3}, -1, 1);
         brokersToIsrs.update(UUIDS[1], 1, null, new int[]{2, 3, 4}, -1, 4);
-        assertEquals(toSet(new TopicPartition(UUIDS[0], 0)),
+        assertEquals(toSet(new TopicIdPartition(UUIDS[0], 0)),
             toSet(brokersToIsrs.iterator(1, true)));
         assertEquals(toSet(), toSet(brokersToIsrs.iterator(2, true)));
-        assertEquals(toSet(new TopicPartition(UUIDS[1], 1)),
+        assertEquals(toSet(new TopicIdPartition(UUIDS[1], 1)),
             toSet(brokersToIsrs.iterator(4, true)));
         brokersToIsrs.update(UUIDS[0], 0, new int[]{1, 2, 3}, new int[]{1, 2, 3}, 1, 2);
         assertEquals(toSet(), toSet(brokersToIsrs.iterator(1, true)));
-        assertEquals(toSet(new TopicPartition(UUIDS[0], 0)),
+        assertEquals(toSet(new TopicIdPartition(UUIDS[0], 0)),
             toSet(brokersToIsrs.iterator(2, true)));
     }
 
@@ -98,12 +98,12 @@ public class BrokersToIsrsTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         BrokersToIsrs brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
         brokersToIsrs.update(UUIDS[0], 2, null, new int[]{1, 2, 3}, -1, 3);
-        assertEquals(toSet(new TopicPartition(UUIDS[0], 2)),
+        assertEquals(toSet(new TopicIdPartition(UUIDS[0], 2)),
             toSet(brokersToIsrs.iterator(3, true)));
         assertEquals(toSet(), toSet(brokersToIsrs.iterator(2, true)));
         assertEquals(toSet(), toSet(brokersToIsrs.noLeaderIterator()));
         brokersToIsrs.update(UUIDS[0], 2, new int[]{1, 2, 3}, new int[]{1, 2, 3}, 3, -1);
-        assertEquals(toSet(new TopicPartition(UUIDS[0], 2)),
+        assertEquals(toSet(new TopicIdPartition(UUIDS[0], 2)),
             toSet(brokersToIsrs.noLeaderIterator()));
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 16b52d1..fcc47f1 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCol
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
@@ -157,9 +157,9 @@ public class QuorumControllerTest {
                             setCurrentMetadataOffset(100000L)).get());
                 assertEquals(Errors.NONE.code(), active.createTopics(
                     createTopicsRequestData).get().topics().find("foo").errorCode());
-                CompletableFuture<TopicPartition> topicPartitionFuture = active.appendReadEvent(
+                CompletableFuture<TopicIdPartition> topicPartitionFuture = active.appendReadEvent(
                     "debugGetPartition", () -> {
-                        Iterator<TopicPartition> iterator = active.
+                        Iterator<TopicIdPartition> iterator = active.
                             replicationControl().brokersToIsrs().iterator(0, true);
                         assertTrue(iterator.hasNext());
                         return iterator.next();
@@ -168,7 +168,7 @@ public class QuorumControllerTest {
                 active.unregisterBroker(0).get();
                 topicPartitionFuture = active.appendReadEvent(
                     "debugGetPartition", () -> {
-                        Iterator<TopicPartition> iterator = active.
+                        Iterator<TopicIdPartition> iterator = active.
                             replicationControl().brokersToIsrs().noLeaderIterator();
                         assertTrue(iterator.hasNext());
                         return iterator.next();
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 9cc4173..a639614 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -17,20 +17,17 @@
 
 package org.apache.kafka.controller;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
 import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
-import org.apache.kafka.common.message.CreateTopicsRequestData;
-import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.protocol.Errors;
@@ -40,16 +37,30 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.metadata.ApiMessageAndVersion;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
-import static org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import static org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
 import static org.apache.kafka.controller.ReplicationControlManager.PartitionControlInfo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 
 @Timeout(40)
@@ -74,22 +85,22 @@ public class ReplicationControlManagerTest {
             clusterControl);
     }
 
-    private static void registerBroker(int brokerId, ClusterControlManager clusterControl) {
+    private static void registerBroker(int brokerId, ReplicationControlManager replicationControl) {
         RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
-            setBrokerEpoch(100).setBrokerId(brokerId);
+            setBrokerEpoch(brokerId + 100).setBrokerId(brokerId);
         brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
             setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
             setPort((short) 9092 + brokerId).
             setName("PLAINTEXT").
             setHost("localhost"));
-        clusterControl.replay(brokerRecord);
+        replicationControl.clusterControl.replay(brokerRecord);
     }
 
     private static void unfenceBroker(int brokerId,
                                       ReplicationControlManager replicationControl) throws Exception {
         ControllerResult<BrokerHeartbeatReply> result = replicationControl.
             processBrokerHeartbeat(new BrokerHeartbeatRequestData().
-                setBrokerId(brokerId).setBrokerEpoch(100).setCurrentMetadataOffset(1).
+                setBrokerId(brokerId).setBrokerEpoch(brokerId + 100).setCurrentMetadataOffset(1).
                 setWantFence(false).setWantShutDown(false), 0);
         assertEquals(new BrokerHeartbeatReply(true, false, false, false), result.response());
         ControllerTestUtils.replayAll(replicationControl.clusterControl, result.records());
@@ -109,11 +120,11 @@ public class ReplicationControlManagerTest {
                 setErrorMessage("Unable to replicate the partition 3 times: there are only 0 usable brokers"));
         assertEquals(expectedResponse, result.response());
 
-        registerBroker(0, replicationControl.clusterControl);
+        registerBroker(0, replicationControl);
         unfenceBroker(0, replicationControl);
-        registerBroker(1, replicationControl.clusterControl);
+        registerBroker(1, replicationControl);
         unfenceBroker(1, replicationControl);
-        registerBroker(2, replicationControl.clusterControl);
+        registerBroker(2, replicationControl);
         unfenceBroker(2, replicationControl);
         ControllerResult<CreateTopicsResponseData> result2 =
             replicationControl.createTopics(request);
@@ -180,7 +191,7 @@ public class ReplicationControlManagerTest {
     public void testRemoveLeaderships() throws Exception {
         ReplicationControlManager replicationControl = newReplicationControlManager();
         for (int i = 0; i < 6; i++) {
-            registerBroker(i, replicationControl.clusterControl);
+            registerBroker(i, replicationControl);
             unfenceBroker(i, replicationControl);
         }
         CreatableTopicResult result = createTestTopic(replicationControl, "foo",
@@ -190,9 +201,9 @@ public class ReplicationControlManagerTest {
                 new int[] {2, 3, 0},
                 new int[] {0, 2, 1}
             });
-        Set<TopicPartition> expectedPartitions = new HashSet<>();
-        expectedPartitions.add(new TopicPartition(result.topicId(), 0));
-        expectedPartitions.add(new TopicPartition(result.topicId(), 3));
+        Set<TopicIdPartition> expectedPartitions = new HashSet<>();
+        expectedPartitions.add(new TopicIdPartition(result.topicId(), 0));
+        expectedPartitions.add(new TopicIdPartition(result.topicId(), 3));
         assertEquals(expectedPartitions, ControllerTestUtils.
             iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
         List<ApiMessageAndVersion> records = new ArrayList<>();
@@ -201,4 +212,183 @@ public class ReplicationControlManagerTest {
         assertEquals(Collections.emptySet(), ControllerTestUtils.
             iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
     }
+
+    @Test
+    public void testShrinkAndExpandIsr() throws Exception {
+        ReplicationControlManager replicationControl = newReplicationControlManager();
+        for (int i = 0; i < 3; i++) {
+            registerBroker(i, replicationControl);
+            unfenceBroker(i, replicationControl);
+        }
+        CreatableTopicResult createTopicResult = createTestTopic(replicationControl, "foo",
+            new int[][] {new int[] {0, 1, 2}});
+
+        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
+        TopicPartition topicPartition = new TopicPartition("foo", 0);
+        assertEquals(OptionalInt.of(0), currentLeader(replicationControl, topicIdPartition));
+        long brokerEpoch = currentBrokerEpoch(replicationControl, 0);
+        AlterIsrRequestData.PartitionData shrinkIsrRequest = newAlterIsrPartition(
+            replicationControl, topicIdPartition, Arrays.asList(0, 1));
+        ControllerResult<AlterIsrResponseData> shrinkIsrResult = sendAlterIsr(
+            replicationControl, 0, brokerEpoch, "foo", shrinkIsrRequest);
+        AlterIsrResponseData.PartitionData shrinkIsrResponse = assertAlterIsrResponse(
+            shrinkIsrResult, topicPartition, Errors.NONE);
+        assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
+
+        AlterIsrRequestData.PartitionData expandIsrRequest = newAlterIsrPartition(
+            replicationControl, topicIdPartition, Arrays.asList(0, 1, 2));
+        ControllerResult<AlterIsrResponseData> expandIsrResult = sendAlterIsr(
+            replicationControl, 0, brokerEpoch, "foo", expandIsrRequest);
+        AlterIsrResponseData.PartitionData expandIsrResponse = assertAlterIsrResponse(
+            expandIsrResult, topicPartition, Errors.NONE);
+        assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, expandIsrResponse);
+    }
+
+    @Test
+    public void testInvalidAlterIsrRequests() throws Exception {
+        ReplicationControlManager replicationControl = newReplicationControlManager();
+        for (int i = 0; i < 3; i++) {
+            registerBroker(i, replicationControl);
+            unfenceBroker(i, replicationControl);
+        }
+        CreatableTopicResult createTopicResult = createTestTopic(replicationControl, "foo",
+            new int[][] {new int[] {0, 1, 2}});
+
+        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
+        TopicPartition topicPartition = new TopicPartition("foo", 0);
+        assertEquals(OptionalInt.of(0), currentLeader(replicationControl, topicIdPartition));
+        long brokerEpoch = currentBrokerEpoch(replicationControl, 0);
+
+        // Invalid leader
+        AlterIsrRequestData.PartitionData invalidLeaderRequest = newAlterIsrPartition(
+            replicationControl, topicIdPartition, Arrays.asList(0, 1));
+        ControllerResult<AlterIsrResponseData> invalidLeaderResult = sendAlterIsr(
+            replicationControl, 1, currentBrokerEpoch(replicationControl, 1),
+            "foo", invalidLeaderRequest);
+        assertAlterIsrResponse(invalidLeaderResult, topicPartition, Errors.INVALID_REQUEST);
+
+        // Stale broker epoch
+        AlterIsrRequestData.PartitionData invalidBrokerEpochRequest = newAlterIsrPartition(
+            replicationControl, topicIdPartition, Arrays.asList(0, 1));
+        assertThrows(StaleBrokerEpochException.class, () -> sendAlterIsr(
+            replicationControl, 0, brokerEpoch - 1, "foo", invalidBrokerEpochRequest));
+
+        // Invalid leader epoch
+        AlterIsrRequestData.PartitionData invalidLeaderEpochRequest = newAlterIsrPartition(
+            replicationControl, topicIdPartition, Arrays.asList(0, 1));
+        invalidLeaderEpochRequest.setLeaderEpoch(500);
+        ControllerResult<AlterIsrResponseData> invalidLeaderEpochResult = sendAlterIsr(
+            replicationControl, 1, currentBrokerEpoch(replicationControl, 1),
+            "foo", invalidLeaderEpochRequest);
+        assertAlterIsrResponse(invalidLeaderEpochResult, topicPartition, Errors.INVALID_REQUEST);
+
+        // Invalid ISR (3 is not a valid replica)
+        AlterIsrRequestData.PartitionData invalidIsrRequest1 = newAlterIsrPartition(
+            replicationControl, topicIdPartition, Arrays.asList(0, 1));
+        invalidIsrRequest1.setNewIsr(Arrays.asList(0, 1, 3));
+        ControllerResult<AlterIsrResponseData> invalidIsrResult1 = sendAlterIsr(
+            replicationControl, 1, currentBrokerEpoch(replicationControl, 1),
+            "foo", invalidIsrRequest1);
+        assertAlterIsrResponse(invalidIsrResult1, topicPartition, Errors.INVALID_REQUEST);
+
+        // Invalid ISR (does not include leader 0)
+        AlterIsrRequestData.PartitionData invalidIsrRequest2 = newAlterIsrPartition(
+            replicationControl, topicIdPartition, Arrays.asList(0, 1));
+        invalidIsrRequest2.setNewIsr(Arrays.asList(1, 2));
+        ControllerResult<AlterIsrResponseData> invalidIsrResult2 = sendAlterIsr(
+            replicationControl, 1, currentBrokerEpoch(replicationControl, 1),
+            "foo", invalidIsrRequest2);
+        assertAlterIsrResponse(invalidIsrResult2, topicPartition, Errors.INVALID_REQUEST);
+    }
+
+    private long currentBrokerEpoch(
+        ReplicationControlManager replicationControl,
+        int brokerId
+    ) {
+        Map<Integer, BrokerRegistration> registrations = replicationControl.clusterControl.brokerRegistrations();
+        BrokerRegistration registration = registrations.get(brokerId);
+        assertNotNull(registration, "No current registration for broker " + brokerId);
+        return registration.epoch();
+    }
+
+    private OptionalInt currentLeader(
+        ReplicationControlManager replicationControl,
+        TopicIdPartition topicIdPartition
+    ) {
+        PartitionControlInfo partitionControl =
+            replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
+        if (partitionControl.leader < 0) {
+            return OptionalInt.empty();
+        } else {
+            return OptionalInt.of(partitionControl.leader);
+        }
+    }
+
+    private AlterIsrRequestData.PartitionData newAlterIsrPartition(
+        ReplicationControlManager replicationControl,
+        TopicIdPartition topicIdPartition,
+        List<Integer> newIsr
+    ) {
+        PartitionControlInfo partitionControl =
+            replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
+        return new AlterIsrRequestData.PartitionData()
+            .setPartitionIndex(0)
+            .setLeaderEpoch(partitionControl.leaderEpoch)
+            .setCurrentIsrVersion(partitionControl.partitionEpoch)
+            .setNewIsr(newIsr);
+    }
+
+    private ControllerResult<AlterIsrResponseData> sendAlterIsr(
+        ReplicationControlManager replicationControl,
+        int brokerId,
+        long brokerEpoch,
+        String topic,
+        AlterIsrRequestData.PartitionData partitionData
+    ) throws Exception {
+        AlterIsrRequestData request = new AlterIsrRequestData()
+            .setBrokerId(brokerId)
+            .setBrokerEpoch(brokerEpoch);
+
+        AlterIsrRequestData.TopicData topicData = new AlterIsrRequestData.TopicData()
+            .setName(topic);
+        request.topics().add(topicData);
+        topicData.partitions().add(partitionData);
+
+        ControllerResult<AlterIsrResponseData> result = replicationControl.alterIsr(request);
+        ControllerTestUtils.replayAll(replicationControl, result.records());
+        return result;
+    }
+
+    private AlterIsrResponseData.PartitionData assertAlterIsrResponse(
+        ControllerResult<AlterIsrResponseData> alterIsrResult,
+        TopicPartition topicPartition,
+        Errors expectedError
+    ) {
+        AlterIsrResponseData response = alterIsrResult.response();
+        assertEquals(1, response.topics().size());
+
+        AlterIsrResponseData.TopicData topicData = response.topics().get(0);
+        assertEquals(topicPartition.topic(), topicData.name());
+        assertEquals(1, topicData.partitions().size());
+
+        AlterIsrResponseData.PartitionData partitionData = topicData.partitions().get(0);
+        assertEquals(topicPartition.partition(), partitionData.partitionIndex());
+        assertEquals(expectedError, Errors.forCode(partitionData.errorCode()));
+        return partitionData;
+    }
+
+    private void assertConsistentAlterIsrResponse(
+        ReplicationControlManager replicationControl,
+        TopicIdPartition topicIdPartition,
+        AlterIsrResponseData.PartitionData partitionData
+    ) {
+        PartitionControlInfo partitionControl =
+            replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
+        assertEquals(partitionControl.leader, partitionData.leaderId());
+        assertEquals(partitionControl.leaderEpoch, partitionData.leaderEpoch());
+        assertEquals(partitionControl.partitionEpoch, partitionData.currentIsrVersion());
+        List<Integer> expectedIsr = IntStream.of(partitionControl.isr).boxed().collect(Collectors.toList());
+        assertEquals(expectedIsr, partitionData.isr());
+    }
+
 }