You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/08/17 20:11:17 UTC

[kafka] branch trunk updated: KAFKA-13198: Stop replicas when reassigned (#11216)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9bcf4a5  KAFKA-13198: Stop replicas when reassigned (#11216)
9bcf4a5 is described below

commit 9bcf4a525b9a08b1d0a5f8c134c46f70c84692a0
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Tue Aug 17 13:10:03 2021 -0700

    KAFKA-13198: Stop replicas when reassigned (#11216)
    
    Stop the replica and resign the coordinators when a replica gets reassigned away from a topic partition.
    
    1. Implement localChanges in TopicsDelta and TopicDelta to return all of the partitions that were deleted, became leader and became follower for the given broker id.
    2. Add tests for TopicsDelta::localChanges
    3. Resign coordinators that were moved away from the consumer offset and transaction topic partitions.
    4. Add replica manager tests for testing reassignment of replicas and removal of topic.
    5. Add a new type LocalReplicaChanges that encapsulates topic partitions deleted, became leader and became follower.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   6 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |   6 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  83 ++---
 .../server/metadata/BrokerMetadataPublisher.scala  |  28 +-
 .../kafka/server/KRaftClusterTest.scala            |  33 ++
 .../unit/kafka/server/ReplicaManagerTest.scala     | 351 +++++++++++++--------
 .../apache/kafka/image/LocalReplicaChanges.java    |  86 +++++
 .../java/org/apache/kafka/image/TopicDelta.java    |  72 +++--
 .../java/org/apache/kafka/image/TopicsDelta.java   |  39 +++
 .../org/apache/kafka/image/TopicsImageTest.java    | 239 +++++++++++++-
 10 files changed, 705 insertions(+), 238 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index fd1094b..d6e82e7 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -113,7 +113,7 @@ class BrokerServer(
 
   var dynamicConfigHandlers: Map[String, ConfigHandler] = null
 
-  var replicaManager: ReplicaManager = null
+  @volatile private[this] var _replicaManager: ReplicaManager = null
 
   var credentialProvider: CredentialProvider = null
   var tokenCache: DelegationTokenCache = null
@@ -173,6 +173,8 @@ class BrokerServer(
     true
   }
 
+  def replicaManager: ReplicaManager = _replicaManager
+
   def startup(): Unit = {
     if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
     try {
@@ -250,7 +252,7 @@ class BrokerServer(
       )
       alterIsrManager.start()
 
-      this.replicaManager = new ReplicaManager(config, metrics, time, None,
+      this._replicaManager = new ReplicaManager(config, metrics, time, None,
         kafkaScheduler, logManager, isShuttingDown, quotaManagers,
         brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager,
         threadNamePrefix)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 2c03bd2..0a565e1 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -115,7 +115,7 @@ class KafkaServer(
   var logDirFailureChannel: LogDirFailureChannel = null
   var logManager: LogManager = null
 
-  var replicaManager: ReplicaManager = null
+  @volatile private[this] var _replicaManager: ReplicaManager = null
   var adminManager: ZkAdminManager = null
   var tokenManager: DelegationTokenManager = null
 
@@ -170,6 +170,8 @@ class KafkaServer(
 
   private[kafka] def featureChangeListener = _featureChangeListener
 
+  def replicaManager: ReplicaManager = _replicaManager
+
   /**
    * Start up API for bringing up a single instance of the Kafka server.
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
@@ -308,7 +310,7 @@ class KafkaServer(
         }
         alterIsrManager.start()
 
-        replicaManager = createReplicaManager(isShuttingDown)
+        _replicaManager = createReplicaManager(isShuttingDown)
         replicaManager.startup()
 
         val brokerInfo = createBrokerInfo
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index bfce8c4..7b104e9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -60,8 +60,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.image.{MetadataImage, TopicsDelta}
-import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.{Map, Seq, Set, mutable}
@@ -84,8 +83,6 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
   }
 }
 
-case class LocalLeaderInfo(topicId: Uuid, partition: PartitionRegistration)
-
 /**
  * Result metadata of a log read operation on the log
  * @param info @FetchDataInfo returned by the @Log read
@@ -434,7 +431,9 @@ class ReplicaManager(val config: KafkaConfig,
    * @return                    A map from partitions to exceptions which occurred.
    *                            If no errors occurred, the map will be empty.
    */
-  protected def stopPartitions(partitionsToStop: Map[TopicPartition, Boolean]): Map[TopicPartition, Throwable] = {
+  protected def stopPartitions(
+    partitionsToStop: Map[TopicPartition, Boolean]
+  ): Map[TopicPartition, Throwable] = {
     // First stop fetchers for all partitions.
     val partitions = partitionsToStop.keySet
     replicaFetcherManager.removeFetcherForPartitions(partitions)
@@ -2074,32 +2073,6 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private[kafka] def calculateDeltaChanges(delta: TopicsDelta)
-    : (mutable.HashMap[TopicPartition, Boolean],
-       mutable.HashMap[TopicPartition, LocalLeaderInfo],
-       mutable.HashMap[TopicPartition, LocalLeaderInfo]) = {
-    val deleted = new mutable.HashMap[TopicPartition, Boolean]()
-    delta.deletedTopicIds().forEach { topicId =>
-      val topicImage = delta.image().getTopic(topicId)
-      topicImage.partitions().keySet().forEach { partitionId =>
-        deleted.put(new TopicPartition(topicImage.name(), partitionId), true)
-      }
-    }
-    val newLocalLeaders = new mutable.HashMap[TopicPartition, LocalLeaderInfo]()
-    val newLocalFollowers = new mutable.HashMap[TopicPartition, LocalLeaderInfo]()
-    delta.changedTopics().values().forEach { topicDelta =>
-      topicDelta.newLocalLeaders(config.nodeId).forEach { e =>
-        newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey),
-          LocalLeaderInfo(topicDelta.id(), e.getValue))
-      }
-      topicDelta.newLocalFollowers(config.nodeId).forEach { e =>
-        newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey),
-          LocalLeaderInfo(topicDelta.id(), e.getValue))
-      }
-    }
-    (deleted, newLocalLeaders, newLocalFollowers)
-  }
-
   /**
    * Apply a KRaft topic change delta.
    *
@@ -2107,15 +2080,16 @@ class ReplicaManager(val config: KafkaConfig,
    * @param delta           The delta to apply.
    */
   def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = {
-    // Before taking the lock, build some hash maps that we will need.
-    val (deleted, newLocalLeaders, newLocalFollowers) = calculateDeltaChanges(delta)
+    // Before taking the lock, compute the local changes
+    val localChanges = delta.localChanges(config.nodeId)
 
     replicaStateChangeLock.synchronized {
       // Handle deleted partitions. We need to do this first because we might subsequently
       // create new partitions with the same names as the ones we are deleting here.
-      if (!deleted.isEmpty) {
-        stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).")
-        stopPartitions(deleted).foreach { case (topicPartition, e) =>
+      if (!localChanges.deletes.isEmpty) {
+        val deletes = localChanges.deletes.asScala.map(tp => (tp, true)).toMap
+        stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
+        stopPartitions(deletes).foreach { case (topicPartition, e) =>
           if (e.isInstanceOf[KafkaStorageException]) {
             stateChangeLogger.error(s"Unable to delete replica ${topicPartition} because " +
               "the local replica for the partition is in an offline log directory")
@@ -2125,15 +2099,16 @@ class ReplicaManager(val config: KafkaConfig,
           }
         }
       }
+
       // Handle partitions which we are now the leader or follower for.
-      if (!newLocalLeaders.isEmpty || !newLocalFollowers.isEmpty) {
+      if (!localChanges.leaders.isEmpty || !localChanges.followers.isEmpty) {
         val lazyOffsetCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
         val changedPartitions = new mutable.HashSet[Partition]
-        if (!newLocalLeaders.isEmpty) {
-          applyLocalLeadersDelta(changedPartitions, delta, lazyOffsetCheckpoints, newLocalLeaders)
+        if (!localChanges.leaders.isEmpty) {
+          applyLocalLeadersDelta(changedPartitions, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala)
         }
-        if (!newLocalFollowers.isEmpty) {
-          applyLocalFollowersDelta(changedPartitions, newImage, delta, lazyOffsetCheckpoints, newLocalFollowers)
+        if (!localChanges.followers.isEmpty) {
+          applyLocalFollowersDelta(changedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.followers.asScala)
         }
         maybeAddLogDirFetchers(changedPartitions, lazyOffsetCheckpoints,
           name => Option(newImage.topics().getTopic(name)).map(_.id()))
@@ -2148,8 +2123,8 @@ class ReplicaManager(val config: KafkaConfig,
           if (localLog(tp).isEmpty)
             markPartitionOffline(tp)
         }
-        newLocalLeaders.keySet.foreach(markPartitionOfflineIfNeeded)
-        newLocalFollowers.keySet.foreach(markPartitionOfflineIfNeeded)
+        localChanges.leaders.keySet.forEach(markPartitionOfflineIfNeeded)
+        localChanges.followers.keySet.forEach(markPartitionOfflineIfNeeded)
 
         replicaFetcherManager.shutdownIdleFetcherThreads()
         replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
@@ -2157,10 +2132,12 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private def applyLocalLeadersDelta(changedPartitions: mutable.HashSet[Partition],
-                                     delta: TopicsDelta,
-                                     offsetCheckpoints: OffsetCheckpoints,
-                                     newLocalLeaders: mutable.HashMap[TopicPartition, LocalLeaderInfo]): Unit = {
+  private def applyLocalLeadersDelta(
+    changedPartitions: mutable.Set[Partition],
+    delta: TopicsDelta,
+    offsetCheckpoints: OffsetCheckpoints,
+    newLocalLeaders: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]
+  ): Unit = {
     stateChangeLogger.info(s"Transitioning ${newLocalLeaders.size} partition(s) to " +
       "local leaders.")
     replicaFetcherManager.removeFetcherForPartitions(newLocalLeaders.keySet)
@@ -2186,11 +2163,13 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private def applyLocalFollowersDelta(changedPartitions: mutable.HashSet[Partition],
-                                       newImage: MetadataImage,
-                                       delta: TopicsDelta,
-                                       offsetCheckpoints: OffsetCheckpoints,
-                                       newLocalFollowers: mutable.HashMap[TopicPartition, LocalLeaderInfo]): Unit = {
+  private def applyLocalFollowersDelta(
+    changedPartitions: mutable.Set[Partition],
+    newImage: MetadataImage,
+    delta: TopicsDelta,
+    offsetCheckpoints: OffsetCheckpoints,
+    newLocalFollowers: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]
+  ): Unit = {
     stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} partition(s) to " +
       "local followers.")
     val shuttingDown = isShuttingDown.get()
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 3f5ad74..4fe4938 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -152,11 +152,16 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
         // Handle the case where we have new local leaders or followers for the consumer
         // offsets topic.
         getTopicDelta(Topic.GROUP_METADATA_TOPIC_NAME, newImage, delta).foreach { topicDelta =>
-          topicDelta.newLocalLeaders(brokerId).forEach {
-            entry => groupCoordinator.onElection(entry.getKey(), entry.getValue().leaderEpoch)
+          val changes = topicDelta.localChanges(brokerId)
+
+          changes.deletes.forEach { topicPartition =>
+            groupCoordinator.onResignation(topicPartition.partition, None)
+          }
+          changes.leaders.forEach { (topicPartition, partitionInfo) =>
+            groupCoordinator.onElection(topicPartition.partition, partitionInfo.partition.leaderEpoch)
           }
-          topicDelta.newLocalFollowers(brokerId).forEach {
-            entry => groupCoordinator.onResignation(entry.getKey(), Some(entry.getValue().leaderEpoch))
+          changes.followers.forEach { (topicPartition, partitionInfo) =>
+            groupCoordinator.onResignation(topicPartition.partition, Some(partitionInfo.partition.leaderEpoch))
           }
         }
 
@@ -172,11 +177,16 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
         // If the transaction state topic changed in a way that's relevant to this broker,
         // notify the transaction coordinator.
         getTopicDelta(Topic.TRANSACTION_STATE_TOPIC_NAME, newImage, delta).foreach { topicDelta =>
-          topicDelta.newLocalLeaders(brokerId).forEach {
-            entry => txnCoordinator.onElection(entry.getKey(), entry.getValue().leaderEpoch)
+          val changes = topicDelta.localChanges(brokerId)
+
+          changes.deletes.forEach { topicPartition =>
+            txnCoordinator.onResignation(topicPartition.partition, None)
+          }
+          changes.leaders.forEach { (topicPartition, partitionInfo) =>
+            txnCoordinator.onElection(topicPartition.partition, partitionInfo.partition.leaderEpoch)
           }
-          topicDelta.newLocalFollowers(brokerId).forEach {
-            entry => txnCoordinator.onResignation(entry.getKey(), Some(entry.getValue().leaderEpoch))
+          changes.followers.forEach { (topicPartition, partitionInfo) =>
+            txnCoordinator.onResignation(topicPartition.partition, Some(partitionInfo.partition.leaderEpoch))
           }
         }
 
@@ -204,7 +214,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
           tag.foreach { t =>
             val newProperties = newImage.configs().configProperties(configResource)
             val maybeDefaultName = configResource.name() match {
-              case "" => ConfigEntityName.Default 
+              case "" => ConfigEntityName.Default
               case k => k
             }
             dynamicConfigHandlers(t).processConfigChanges(maybeDefaultName, newProperties)
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 180bf0b..39afbe4 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -402,6 +402,16 @@ class KRaftClusterTest {
           }
         }, "Timed out waiting for replica assignments for topic foo. " +
           s"Wanted: ${expectedMapping}. Got: ${currentMapping}")
+
+        checkReplicaManager(
+          cluster,
+          List(
+            (0, List(true, true, false, true)),
+            (1, List(true, true, false, true)),
+            (2, List(true, true, true, true)),
+            (3, List(false, false, true, true))
+          )
+        )
       } finally {
         admin.close()
       }
@@ -410,6 +420,29 @@ class KRaftClusterTest {
     }
   }
 
+  private def checkReplicaManager(cluster: KafkaClusterTestKit, expectedHosting: List[(Int, List[Boolean])]): Unit = {
+    for ((brokerId, partitionsIsHosted) <- expectedHosting) {
+      val broker = cluster.brokers().get(brokerId)
+
+      for ((isHosted, partitionId) <- partitionsIsHosted.zipWithIndex) {
+        val topicPartition = new TopicPartition("foo", partitionId)
+        if (isHosted) {
+          assertNotEquals(
+            HostedPartition.None,
+            broker.replicaManager.getPartition(topicPartition),
+            s"topicPartition = $topicPartition"
+          )
+        } else {
+          assertEquals(
+            HostedPartition.None,
+            broker.replicaManager.getPartition(topicPartition),
+            s"topicPartition = $topicPartition"
+          )
+        }
+      }
+    }
+  }
+
   private def translatePartitionInfoToSeq(partitions: util.List[TopicPartitionInfo]): Seq[Seq[Int]] = {
     partitions.asScala.map(partition => partition.replicas().asScala.map(_.id()).toSeq).toSeq
   }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 987b7c8..ffa7f77 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
 import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
-import org.apache.kafka.common.metadata.{PartitionRecord, RemoveTopicRecord, TopicRecord}
+import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -51,8 +51,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
-import org.apache.kafka.image.{ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, TopicImage, TopicsDelta, TopicsImage }
-import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
+import org.apache.kafka.image.{ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, TopicsDelta, TopicsImage }
 import org.easymock.EasyMock
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -2780,8 +2779,6 @@ class ReplicaManagerTest {
 
   val BAR_UUID = Uuid.fromString("vApAP6y7Qx23VOfKBzbOBQ")
 
-  val BAZ_UUID = Uuid.fromString("7wVsX2aaTk-bdGcOxLRyVQ")
-
   @Test
   def testGetOrCreatePartition(): Unit = {
     val brokerId = 0
@@ -2799,95 +2796,25 @@ class ReplicaManagerTest {
     assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, BAR_UUID))
   }
 
-  val TEST_IMAGE = {
-    val topicsById = new util.HashMap[Uuid, TopicImage]()
-    val topicsByName = new util.HashMap[String, TopicImage]()
-    val fooPartitions = new util.HashMap[Integer, PartitionRegistration]()
-    fooPartitions.put(0, new PartitionRegistration(Array(1, 2, 3),
-      Array(1, 2, 3), Replicas.NONE, Replicas.NONE, 1, 100, 200))
-    fooPartitions.put(1, new PartitionRegistration(Array(4, 5, 6),
-      Array(4, 5), Replicas.NONE, Replicas.NONE, 5, 300, 400))
-    val foo = new TopicImage("foo", FOO_UUID, fooPartitions)
-    val barPartitions = new util.HashMap[Integer, PartitionRegistration]()
-    barPartitions.put(0, new PartitionRegistration(Array(2, 3, 4),
-      Array(2, 3, 4), Replicas.NONE, Replicas.NONE, 3, 100, 200))
-    val bar = new TopicImage("bar", BAR_UUID, barPartitions)
-    topicsById.put(FOO_UUID, foo)
-    topicsByName.put("foo", foo)
-    topicsById.put(BAR_UUID, bar)
-    topicsByName.put("bar", bar)
-    new TopicsImage(topicsById, topicsByName)
-  }
-
-  val TEST_DELTA = {
-    val delta = new TopicsDelta(TEST_IMAGE)
-    delta.replay(new RemoveTopicRecord().setTopicId(FOO_UUID))
-    delta.replay(new TopicRecord().setName("baz").setTopicId(BAZ_UUID))
-    delta.replay(new PartitionRecord().setPartitionId(0).
-      setTopicId(BAZ_UUID).
-      setReplicas(util.Arrays.asList(1, 2, 4)).
-      setIsr(util.Arrays.asList(1, 2, 4)).
-      setRemovingReplicas(Collections.emptyList()).
-      setAddingReplicas(Collections.emptyList()).
-      setLeader(1).
-      setLeaderEpoch(123).
-      setPartitionEpoch(456))
-    delta.replay(new PartitionRecord().setPartitionId(1).
-      setTopicId(BAZ_UUID).
-      setReplicas(util.Arrays.asList(2, 4, 1)).
-      setIsr(util.Arrays.asList(2, 4, 1)).
-      setRemovingReplicas(Collections.emptyList()).
-      setAddingReplicas(Collections.emptyList()).
-      setLeader(2).
-      setLeaderEpoch(123).
-      setPartitionEpoch(456))
-    delta.replay(new PartitionRecord().setPartitionId(2).
-      setTopicId(BAZ_UUID).
-      setReplicas(util.Arrays.asList(3, 5, 2)).
-      setIsr(util.Arrays.asList(3, 5, 2)).
-      setRemovingReplicas(Collections.emptyList()).
-      setAddingReplicas(Collections.emptyList()).
-      setLeader(3).
-      setLeaderEpoch(456).
-      setPartitionEpoch(789))
-    delta
-  }
-
-  @Test
-  def testCalculateDeltaChanges(): Unit = {
-    val brokerId = 1
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), brokerId)
-    assertEquals((
-      Map(new TopicPartition("foo", 0) -> true,
-        new TopicPartition("foo", 1) -> true),
-      Map(new TopicPartition("baz", 0) -> LocalLeaderInfo(BAZ_UUID,
-        new PartitionRegistration(Array(1, 2, 4), Array(1, 2, 4),
-          Replicas.NONE, Replicas.NONE, 1, 123, 456))),
-      Map(new TopicPartition("baz", 1) -> LocalLeaderInfo(BAZ_UUID,
-        new PartitionRegistration(Array(2, 4, 1), Array(2, 4, 1),
-          Replicas.NONE, Replicas.NONE, 2, 123, 456)))),
-    replicaManager.calculateDeltaChanges(TEST_DELTA))
-  }
-
   @Test
   def testDeltaFromLeaderToFollower(): Unit = {
     val localId = 1
     val otherId = localId + 1
     val numOfRecords = 3
-    val epoch = 100
     val topicPartition = new TopicPartition("foo", 0)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
 
     try {
       // Make the local replica the leader
-      val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch))
-      replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch))
+      val leaderTopicsDelta = topicsCreateDelta(localId, true)
+      val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+      replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
 
       // Check the state of that partition and fetcher
       val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
       assertTrue(leaderPartition.isLeader)
       assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
-      assertEquals(epoch, leaderPartition.getLeaderEpoch)
+      assertEquals(0, leaderPartition.getLeaderEpoch)
 
       assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
 
@@ -2905,8 +2832,9 @@ class ReplicaManagerTest {
       assertEquals(Errors.NONE, leaderResponse.get.error)
 
       // Change the local replica to follower
-      val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1))
-      replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch + 1))
+      val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
 
       // Append on a follower should fail
       val followerResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
@@ -2915,7 +2843,7 @@ class ReplicaManagerTest {
       // Check the state of that partition and fetcher
       val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
       assertFalse(followerPartition.isLeader)
-      assertEquals(epoch + 1, followerPartition.getLeaderEpoch)
+      assertEquals(1, followerPartition.getLeaderEpoch)
 
       val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
       assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
@@ -2931,19 +2859,19 @@ class ReplicaManagerTest {
     val localId = 1
     val otherId = localId + 1
     val numOfRecords = 3
-    val epoch = 100
     val topicPartition = new TopicPartition("foo", 0)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
 
     try {
       // Make the local replica the follower
-      val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch))
-      replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch))
+      val followerTopicsDelta = topicsCreateDelta(localId, false)
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
 
       // Check the state of that partition and fetcher
       val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
       assertFalse(followerPartition.isLeader)
-      assertEquals(epoch, followerPartition.getLeaderEpoch)
+      assertEquals(0, followerPartition.getLeaderEpoch)
 
       val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
       assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
@@ -2953,8 +2881,9 @@ class ReplicaManagerTest {
       assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error)
 
       // Change the local replica to leader
-      val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch + 1))
-      replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch + 1))
+      val leaderTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, true)
+      val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+      replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
 
       // Send a produce request and advance the highwatermark
       val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
@@ -2972,7 +2901,7 @@ class ReplicaManagerTest {
       val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
       assertTrue(leaderPartition.isLeader)
       assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
-      assertEquals(epoch + 1, leaderPartition.getLeaderEpoch)
+      assertEquals(1, leaderPartition.getLeaderEpoch)
 
       assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
     } finally {
@@ -2986,30 +2915,30 @@ class ReplicaManagerTest {
   def testDeltaFollowerWithNoChange(): Unit = {
     val localId = 1
     val otherId = localId + 1
-    val epoch = 100
     val topicPartition = new TopicPartition("foo", 0)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
 
     try {
       // Make the local replica the follower
-      val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch))
-      replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch))
+      val followerTopicsDelta = topicsCreateDelta(localId, false)
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
 
       // Check the state of that partition and fetcher
       val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
       assertFalse(followerPartition.isLeader)
-      assertEquals(epoch, followerPartition.getLeaderEpoch)
+      assertEquals(0, followerPartition.getLeaderEpoch)
 
       val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
       assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
 
       // Apply the same delta again
-      replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch))
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
 
       // Check that the state stays the same
       val HostedPartition.Online(noChangePartition) = replicaManager.getPartition(topicPartition)
       assertFalse(noChangePartition.isLeader)
-      assertEquals(epoch, noChangePartition.getLeaderEpoch)
+      assertEquals(0, noChangePartition.getLeaderEpoch)
 
       val noChangeFetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
       assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), noChangeFetcher.map(_.sourceBroker))
@@ -3021,24 +2950,172 @@ class ReplicaManagerTest {
   }
 
   @Test
+  def testDeltaFollowerToNotReplica(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
+
+    try {
+      // Make the local replica the follower
+      val followerTopicsDelta = topicsCreateDelta(localId, false)
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+
+      val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+      assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
+
+      // Apply changes that remove replica
+      val notReplicaTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), otherId, true)
+      val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply())
+      replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
+
+      // Check that the partition was removed
+      assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
+      assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+      assertEquals(None, replicaManager.logManager.getLog(topicPartition))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testDeltaFollowerRemovedTopic(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
+
+    try {
+      // Make the local replica the follower
+      val followerTopicsDelta = topicsCreateDelta(localId, false)
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+
+      val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+      assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
+
+      // Apply changes that remove topic and replica
+      val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics())
+      val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
+      replicaManager.applyDelta(removeMetadataImage, removeTopicsDelta)
+
+      // Check that the partition was removed
+      assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
+      assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+      assertEquals(None, replicaManager.logManager.getLog(topicPartition))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testDeltaLeaderToNotReplica(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
+
+    try {
+      // Make the local replica the follower
+      val leaderTopicsDelta = topicsCreateDelta(localId, true)
+      val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+      replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
+      assertTrue(leaderPartition.isLeader)
+      assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
+      assertEquals(0, leaderPartition.getLeaderEpoch)
+
+      assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+
+      // Apply changes that remove replica
+      val notReplicaTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), otherId, true)
+      val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply())
+      replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
+
+      // Check that the partition was removed
+      assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
+      assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+      assertEquals(None, replicaManager.logManager.getLog(topicPartition))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testDeltaLeaderToRemovedTopic(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
+
+    try {
+      // Make the local replica the follower
+      val leaderTopicsDelta = topicsCreateDelta(localId, true)
+      val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+      replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
+      assertTrue(leaderPartition.isLeader)
+      assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
+      assertEquals(0, leaderPartition.getLeaderEpoch)
+
+      assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+
+      // Apply changes that remove topic and replica
+      val removeTopicsDelta = topicsDeleteDelta(leaderMetadataImage.topics())
+      val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
+      replicaManager.applyDelta(removeMetadataImage, removeTopicsDelta)
+
+      // Check that the partition was removed
+      assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
+      assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+      assertEquals(None, replicaManager.logManager.getLog(topicPartition))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
   def testDeltaToFollowerCompletesProduce(): Unit = {
     val localId = 1
     val otherId = localId + 1
     val numOfRecords = 3
-    val epoch = 100
     val topicPartition = new TopicPartition("foo", 0)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
 
     try {
       // Make the local replica the leader
-      val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch))
-      replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch))
+      val leaderTopicsDelta = topicsCreateDelta(localId, true)
+      val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+      replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
 
       // Check the state of that partition and fetcher
       val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
       assertTrue(leaderPartition.isLeader)
       assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
-      assertEquals(epoch, leaderPartition.getLeaderEpoch)
+      assertEquals(0, leaderPartition.getLeaderEpoch)
 
       assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
 
@@ -3046,8 +3123,9 @@ class ReplicaManagerTest {
       val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
 
       // Change the local replica to follower
-      val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1))
-      replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch + 1))
+      val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
 
       // Check that the produce failed because it changed to follower before replicating
       assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, leaderResponse.get.error)
@@ -3062,20 +3140,20 @@ class ReplicaManagerTest {
   def testDeltaToFollowerCompletesFetch(): Unit = {
     val localId = 1
     val otherId = localId + 1
-    val epoch = 100
     val topicPartition = new TopicPartition("foo", 0)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
 
     try {
       // Make the local replica the leader
-      val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch))
-      replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch))
+      val leaderTopicsDelta = topicsCreateDelta(localId, true)
+      val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+      replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
 
       // Check the state of that partition and fetcher
       val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
       assertTrue(leaderPartition.isLeader)
       assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
-      assertEquals(epoch, leaderPartition.getLeaderEpoch)
+      assertEquals(0, leaderPartition.getLeaderEpoch)
 
       assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
 
@@ -3091,8 +3169,9 @@ class ReplicaManagerTest {
       )
 
       // Change the local replica to follower
-      val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1))
-      replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch + 1))
+      val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
 
       // Check that the produce failed because it changed to follower before replicating
       assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchCallback.assertFired.error)
@@ -3103,34 +3182,44 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
-  private def topicsImage(replica: Int, isLeader: Boolean, epoch: Int): TopicsImage = {
-    val leader = if (isLeader) replica else replica + 1
-    val topicsById = new util.HashMap[Uuid, TopicImage]()
-    val topicsByName = new util.HashMap[String, TopicImage]()
-    val fooPartitions = new util.HashMap[Integer, PartitionRegistration]()
-    fooPartitions.put(0, new PartitionRegistration(Array(replica, replica + 1),
-      Array(replica, replica + 1), Replicas.NONE, Replicas.NONE, leader, epoch, epoch))
-    val foo = new TopicImage("foo", FOO_UUID, fooPartitions)
 
-    topicsById.put(FOO_UUID, foo)
-    topicsByName.put("foo", foo)
+  private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean): TopicsDelta = {
+    val leader = if (isStartIdLeader) startId else startId + 1
+    val delta = new TopicsDelta(TopicsImage.EMPTY)
+    delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID))
+    delta.replay(
+      new PartitionRecord()
+        .setPartitionId(0)
+        .setTopicId(FOO_UUID)
+        .setReplicas(util.Arrays.asList(startId, startId + 1))
+        .setIsr(util.Arrays.asList(startId, startId + 1))
+        .setRemovingReplicas(Collections.emptyList())
+        .setAddingReplicas(Collections.emptyList())
+        .setLeader(leader)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(0)
+    )
 
-    new TopicsImage(topicsById, topicsByName)
+    delta
   }
 
-  private def topicsDelta(replica: Int, isLeader: Boolean, epoch: Int): TopicsDelta = {
-    val leader = if (isLeader) replica else replica + 1
-    val delta = new TopicsDelta(TopicsImage.EMPTY)
-    delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID))
-    delta.replay(new PartitionRecord().setPartitionId(0).
-      setTopicId(FOO_UUID).
-      setReplicas(util.Arrays.asList(replica, replica + 1)).
-      setIsr(util.Arrays.asList(replica, replica + 1)).
-      setRemovingReplicas(Collections.emptyList()).
-      setAddingReplicas(Collections.emptyList()).
-      setLeader(leader).
-      setLeaderEpoch(epoch).
-      setPartitionEpoch(epoch))
+  private def topicsChangeDelta(topicsImage: TopicsImage, startId: Int, isStartIdLeader: Boolean): TopicsDelta = {
+    val leader = if (isStartIdLeader) startId else startId + 1
+    val delta = new TopicsDelta(topicsImage)
+    delta.replay(
+      new PartitionChangeRecord()
+        .setPartitionId(0)
+        .setTopicId(FOO_UUID)
+        .setReplicas(util.Arrays.asList(startId, startId + 1))
+        .setIsr(util.Arrays.asList(startId, startId + 1))
+        .setLeader(leader)
+    )
+    delta
+  }
+
+  private def topicsDeleteDelta(topicsImage: TopicsImage): TopicsDelta = {
+    val delta = new TopicsDelta(topicsImage)
+    delta.replay(new RemoveTopicRecord().setTopicId(FOO_UUID))
 
     delta
   }
diff --git a/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java b/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java
new file mode 100644
index 0000000..3afdd1a
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.PartitionRegistration;
+
+import java.util.Set;
+import java.util.Map;
+
+public final class LocalReplicaChanges {
+    private final Set<TopicPartition> deletes;
+    private final Map<TopicPartition, PartitionInfo> leaders;
+    private final Map<TopicPartition, PartitionInfo> followers;
+
+    LocalReplicaChanges(
+        Set<TopicPartition> deletes,
+        Map<TopicPartition, PartitionInfo> leaders,
+        Map<TopicPartition, PartitionInfo> followers
+    ) {
+        this.deletes = deletes;
+        this.leaders = leaders;
+        this.followers = followers;
+    }
+
+    public Set<TopicPartition> deletes() {
+        return deletes;
+    }
+
+    public Map<TopicPartition, PartitionInfo> leaders() {
+        return leaders;
+    }
+
+    public Map<TopicPartition, PartitionInfo> followers() {
+        return followers;
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+            "LocalReplicaChanges(deletes = %s, leaders = %s, followers = %s)",
+            deletes,
+            leaders,
+            followers
+        );
+    }
+
+    public static final class PartitionInfo {
+        private final Uuid topicId;
+        private final PartitionRegistration partition;
+
+        public PartitionInfo(Uuid topicId, PartitionRegistration partition) {
+            this.topicId = topicId;
+            this.partition = partition;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("PartitionInfo(topicId = %s, partition = %s)", topicId, partition);
+        }
+
+        public Uuid topicId() {
+            return topicId;
+        }
+
+        public PartitionRegistration partition() {
+            return partition;
+        }
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
index 87e4201..88fef30 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
@@ -18,17 +18,17 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.Replicas;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.HashSet;
 import java.util.Map.Entry;
-
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Represents changes to a topic in the metadata image.
@@ -93,43 +93,49 @@ public final class TopicDelta {
     }
 
     /**
-     * Find the partitions that we are now leading, whose partition epoch has changed.
+     * Find the partitions that have change based on the replica given.
+     *
+     * The changes identified are:
+     *   1. partitions for which the broker is not a replica anymore
+     *   2. partitions for which the broker is now the leader
+     *   3. partitions for which the broker is now a follower
      *
-     * @param brokerId  The broker id.
-     * @return          A list of (partition ID, partition registration) entries.
+     * @param brokerId the broker id
+     * @return the list of partitions which the broker should remove, become leader or become follower.
      */
-    public List<Entry<Integer, PartitionRegistration>> newLocalLeaders(int brokerId) {
-        List<Entry<Integer, PartitionRegistration>> results = new ArrayList<>();
+    public LocalReplicaChanges localChanges(int brokerId) {
+        Set<TopicPartition> deletes = new HashSet<>();
+        Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new HashMap<>();
+        Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new HashMap<>();
+
         for (Entry<Integer, PartitionRegistration> entry : partitionChanges.entrySet()) {
-            if (entry.getValue().leader == brokerId) {
+            if (!Replicas.contains(entry.getValue().replicas, brokerId)) {
                 PartitionRegistration prevPartition = image.partitions().get(entry.getKey());
-                if (prevPartition == null ||
-                        prevPartition.partitionEpoch != entry.getValue().partitionEpoch) {
-                    results.add(entry);
+                if (prevPartition != null && Replicas.contains(prevPartition.replicas, brokerId)) {
+                    deletes.add(new TopicPartition(name(), entry.getKey()));
                 }
-            }
-        }
-        return results;
-    }
-
-    /**
-     * Find the partitions that we are now following, whose partition epoch has changed.
-     *
-     * @param brokerId  The broker id.
-     * @return          A list of (partition ID, partition registration) entries.
-     */
-    public List<Entry<Integer, PartitionRegistration>> newLocalFollowers(int brokerId) {
-        List<Entry<Integer, PartitionRegistration>> results = new ArrayList<>();
-        for (Entry<Integer, PartitionRegistration> entry : partitionChanges.entrySet()) {
-            if (entry.getValue().leader != brokerId &&
-                    Replicas.contains(entry.getValue().replicas, brokerId)) {
+            } else if (entry.getValue().leader == brokerId) {
+                PartitionRegistration prevPartition = image.partitions().get(entry.getKey());
+                if (prevPartition == null || prevPartition.partitionEpoch != entry.getValue().partitionEpoch) {
+                    leaders.put(
+                        new TopicPartition(name(), entry.getKey()),
+                        new LocalReplicaChanges.PartitionInfo(id(), entry.getValue())
+                    );
+                }
+            } else if (
+                entry.getValue().leader != brokerId &&
+                Replicas.contains(entry.getValue().replicas, brokerId)
+            ) {
                 PartitionRegistration prevPartition = image.partitions().get(entry.getKey());
-                if (prevPartition == null ||
-                        prevPartition.partitionEpoch != entry.getValue().partitionEpoch) {
-                    results.add(entry);
+                if (prevPartition == null || prevPartition.partitionEpoch != entry.getValue().partitionEpoch) {
+                    followers.put(
+                        new TopicPartition(name(), entry.getKey()),
+                        new LocalReplicaChanges.PartitionInfo(id(), entry.getValue())
+                    );
                 }
             }
         }
-        return results;
+
+        return new LocalReplicaChanges(deletes, leaders, followers);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
index 111782b..a146bba 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
@@ -17,11 +17,13 @@
 
 package org.apache.kafka.image;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.metadata.Replicas;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -162,4 +164,41 @@ public final class TopicsDelta {
     public Set<Uuid> deletedTopicIds() {
         return deletedTopicIds;
     }
+
+    /**
+     * Find the topic partitions that have change based on the replica given.
+     *
+     * The changes identified are:
+     *   1. topic partitions for which the broker is not a replica anymore
+     *   2. topic partitions for which the broker is now the leader
+     *   3. topic partitions for which the broker is now a follower
+     *
+     * @param brokerId the broker id
+     * @return the list of topic partitions which the broker should remove, become leader or become follower.
+     */
+    public LocalReplicaChanges localChanges(int brokerId) {
+        Set<TopicPartition> deletes = new HashSet<>();
+        Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new HashMap<>();
+        Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new HashMap<>();
+
+        for (TopicDelta delta : changedTopics.values()) {
+            LocalReplicaChanges changes = delta.localChanges(brokerId);
+
+            deletes.addAll(changes.deletes());
+            leaders.putAll(changes.leaders());
+            followers.putAll(changes.followers());
+        }
+
+        // Add all of the removed topic partitions to the set of locally removed partitions
+        deletedTopicIds().forEach(topicId -> {
+            TopicImage topicImage = image().getTopic(topicId);
+            topicImage.partitions().forEach((partitionId, prevPartition) -> {
+                if (Replicas.contains(prevPartition.replicas, brokerId)) {
+                    deletes.add(new TopicPartition(topicImage.name(), partitionId));
+                }
+            });
+        });
+
+        return new LocalReplicaChanges(deletes, leaders, followers);
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 7cd2279..1b5fbef 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.image;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
@@ -34,6 +35,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -46,13 +48,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Timeout(value = 40)
 public class TopicsImageTest {
-    final static TopicsImage IMAGE1;
+    static final TopicsImage IMAGE1;
 
     static final List<ApiMessageAndVersion> DELTA1_RECORDS;
 
-    final static TopicsDelta DELTA1;
+    static final TopicsDelta DELTA1;
 
-    final static TopicsImage IMAGE2;
+    static final TopicsImage IMAGE2;
+
+    static final List<TopicImage> TOPIC_IMAGES1;
 
     private static TopicImage newTopicImage(String name, Uuid id, PartitionRegistration... partitions) {
         Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
@@ -80,16 +84,19 @@ public class TopicsImageTest {
     }
 
     static {
-        List<TopicImage> topics1 = Arrays.asList(
+        TOPIC_IMAGES1 = Arrays.asList(
             newTopicImage("foo", Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"),
-            new PartitionRegistration(new int[] {2, 3, 4},
-                new int[] {2, 3}, Replicas.NONE, Replicas.NONE, 2, 1, 345),
-            new PartitionRegistration(new int[] {3, 4, 5},
-                new int[] {3, 4, 5}, Replicas.NONE, Replicas.NONE, 3, 4, 684)),
+                new PartitionRegistration(new int[] {2, 3, 4},
+                    new int[] {2, 3}, Replicas.NONE, Replicas.NONE, 2, 1, 345),
+                new PartitionRegistration(new int[] {3, 4, 5},
+                    new int[] {3, 4, 5}, Replicas.NONE, Replicas.NONE, 3, 4, 684),
+                new PartitionRegistration(new int[] {2, 4, 5},
+                    new int[] {2, 4, 5}, Replicas.NONE, Replicas.NONE, 2, 10, 84)),
             newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"),
                 new PartitionRegistration(new int[] {0, 1, 2, 3, 4},
                     new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 0, 1, 345)));
-        IMAGE1 = new TopicsImage(newTopicsByIdMap(topics1), newTopicsByNameMap(topics1));
+
+        IMAGE1 = new TopicsImage(newTopicsByIdMap(TOPIC_IMAGES1), newTopicsByNameMap(TOPIC_IMAGES1));
 
         DELTA1_RECORDS = new ArrayList<>();
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveTopicRecord().
@@ -126,6 +133,220 @@ public class TopicsImageTest {
         IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2));
     }
 
+    private ApiMessageAndVersion newPartitionRecord(Uuid topicId, int partitionId, List<Integer> replicas) {
+        return new ApiMessageAndVersion(
+            new PartitionRecord()
+                .setPartitionId(partitionId)
+                .setTopicId(topicId)
+                .setReplicas(replicas)
+                .setIsr(replicas)
+                .setLeader(replicas.get(0))
+                .setLeaderEpoch(1)
+                .setPartitionEpoch(1),
+            PARTITION_RECORD.highestSupportedVersion()
+        );
+    }
+
+    private PartitionRegistration newPartition(int[] replicas) {
+        return new PartitionRegistration(replicas, replicas, Replicas.NONE, Replicas.NONE, replicas[0], 1, 1);
+    }
+
+    @Test
+    public void testBasicLocalChanges() {
+        int localId = 3;
+        /* Changes already include in DELTA1_RECORDS and IMAGE1:
+         * foo - topic id deleted
+         * bar-0 - stay as follower with different partition epoch
+         * baz-0 - new topic to leader
+         */
+        List<ApiMessageAndVersion> topicRecords = new ArrayList<>(DELTA1_RECORDS);
+
+        // Create a new foo topic with a different id
+        Uuid newFooId = Uuid.fromString("b66ybsWIQoygs01vdjH07A");
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new TopicRecord().setName("foo") .setTopicId(newFooId),
+                TOPIC_RECORD.highestSupportedVersion()
+            )
+        );
+        topicRecords.add(newPartitionRecord(newFooId, 0, Arrays.asList(0, 1, 2)));
+        topicRecords.add(newPartitionRecord(newFooId, 1, Arrays.asList(0, 1, localId)));
+
+        // baz-1 - new partion to follower
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionRecord()
+                    .setPartitionId(1)
+                    .setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
+                    .setReplicas(Arrays.asList(4, 2, localId))
+                    .setIsr(Arrays.asList(4, 2, localId))
+                    .setLeader(4)
+                    .setLeaderEpoch(2)
+                    .setPartitionEpoch(1),
+                PARTITION_RECORD.highestSupportedVersion()
+            )
+        );
+
+        TopicsDelta delta = new TopicsDelta(IMAGE1);
+        RecordTestUtils.replayAll(delta, topicRecords);
+
+        LocalReplicaChanges changes = delta.localChanges(localId);
+        assertEquals(
+            new HashSet<>(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 1))),
+            changes.deletes()
+        );
+        assertEquals(
+            new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))),
+            changes.leaders().keySet()
+        );
+        assertEquals(
+            new HashSet<>(
+                Arrays.asList(new TopicPartition("baz", 1), new TopicPartition("bar", 0), new TopicPartition("foo", 1))
+            ),
+            changes.followers().keySet()
+        );
+    }
+
+    @Test
+    public void testDeleteAfterChanges() {
+        int localId = 3;
+        Uuid zooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+
+        List<TopicImage> topics = new ArrayList<>();
+        topics.add(
+            newTopicImage(
+                "zoo",
+                zooId,
+                newPartition(new int[] {localId, 1, 2})
+            )
+        );
+        TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), newTopicsByNameMap(topics));
+
+        List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
+        // leader to follower
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionChangeRecord().setTopicId(zooId).setPartitionId(0).setLeader(1),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+        // remove zoo topic
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new RemoveTopicRecord().setTopicId(zooId),
+                REMOVE_TOPIC_RECORD.highestSupportedVersion()
+            )
+        );
+
+        TopicsDelta delta = new TopicsDelta(image);
+        RecordTestUtils.replayAll(delta, topicRecords);
+
+        LocalReplicaChanges changes = delta.localChanges(localId);
+        assertEquals(new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0))), changes.deletes());
+        assertEquals(Collections.emptyMap(), changes.leaders());
+        assertEquals(Collections.emptyMap(), changes.followers());
+    }
+
+    @Test
+    public void testLocalReassignmentChanges() {
+        int localId = 3;
+        Uuid zooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+
+        List<TopicImage> topics = new ArrayList<>();
+        topics.add(
+            newTopicImage(
+                "zoo",
+                zooId,
+                newPartition(new int[] {0, 1, localId}),
+                newPartition(new int[] {localId, 1, 2}),
+                newPartition(new int[] {0, 1, localId}),
+                newPartition(new int[] {localId, 1, 2}),
+                newPartition(new int[] {0, 1, 2}),
+                newPartition(new int[] {0, 1, 2})
+            )
+        );
+        TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), newTopicsByNameMap(topics));
+
+        List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
+        // zoo-0 - follower to leader
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionChangeRecord().setTopicId(zooId).setPartitionId(0).setLeader(localId),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+        // zoo-1 - leader to follower
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionChangeRecord().setTopicId(zooId).setPartitionId(1).setLeader(1),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+        // zoo-2 - follower to removed
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionChangeRecord()
+                  .setTopicId(zooId)
+                  .setPartitionId(2)
+                  .setIsr(Arrays.asList(0, 1, 2))
+                  .setReplicas(Arrays.asList(0, 1, 2)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+        // zoo-3 - leader to removed
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionChangeRecord()
+                  .setTopicId(zooId)
+                  .setPartitionId(3)
+                  .setLeader(0)
+                  .setIsr(Arrays.asList(0, 1, 2))
+                  .setReplicas(Arrays.asList(0, 1, 2)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+        // zoo-4 - not replica to leader
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionChangeRecord()
+                  .setTopicId(zooId)
+                  .setPartitionId(4)
+                  .setLeader(localId)
+                  .setIsr(Arrays.asList(localId, 1, 2))
+                  .setReplicas(Arrays.asList(localId, 1, 2)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+        // zoo-5 - not replica to follower
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionChangeRecord()
+                  .setTopicId(zooId)
+                  .setPartitionId(5)
+                  .setIsr(Arrays.asList(0, 1, localId))
+                  .setReplicas(Arrays.asList(0, 1, localId)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+
+        TopicsDelta delta = new TopicsDelta(image);
+        RecordTestUtils.replayAll(delta, topicRecords);
+
+        LocalReplicaChanges changes = delta.localChanges(localId);
+        assertEquals(
+            new HashSet<>(Arrays.asList(new TopicPartition("zoo", 2), new TopicPartition("zoo", 3))),
+            changes.deletes()
+        );
+        assertEquals(
+            new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0), new TopicPartition("zoo", 4))),
+            changes.leaders().keySet()
+        );
+        assertEquals(
+            new HashSet<>(Arrays.asList(new TopicPartition("zoo", 1), new TopicPartition("zoo", 5))),
+            changes.followers().keySet()
+        );
+    }
+
     @Test
     public void testEmptyImageRoundTrip() throws Throwable {
         testToImageAndBack(TopicsImage.EMPTY);