You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/03/08 19:28:30 UTC

[kafka] branch 2.8 updated: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received (#10252)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 53cae73  KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received (#10252)
53cae73 is described below

commit 53cae73d552c11a012a1861ffdb69c1d03d3f8a9
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Mar 8 11:21:42 2021 -0800

    KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received (#10252)
    
    This patch implements additional handling logic for `RemoveTopic` records:
    
    - Update `MetadataPartitions` to ensure addition of deleted partitions to `localRemoved` set
    - Ensure topic configs are removed from `ConfigRepository`
    - Propagate deleted partitions to `GroupCoordinator` so that corresponding offset commits can be removed
    
    This patch also changes the controller topic id generation logic to use `Uuid.randomUuid` rather than `Random`.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Chia-Ping Tsai <ch...@gmail.com>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   1 -
 .../server/metadata/BrokerMetadataListener.scala   |  55 +++---
 .../server/metadata/CachedConfigRepository.scala   |  11 +-
 .../kafka/server/metadata/MetadataPartitions.scala |  95 ++++++++---
 .../server/metadata/MetadataPartitionsTest.scala   | 158 +++++++++++++++--
 .../metadata/BrokerMetadataListenerTest.scala      | 187 +++++++++++++++++++++
 .../apache/kafka/controller/QuorumController.java  |   2 +-
 .../controller/ReplicationControlManager.java      |  10 +-
 .../controller/ReplicationControlManagerTest.java  |   1 -
 9 files changed, 441 insertions(+), 79 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index e7f7a12..0ce1975 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -266,7 +266,6 @@ class BrokerServer(
         groupCoordinator,
         replicaManager,
         transactionCoordinator,
-        logManager,
         threadNamePrefix,
         clientQuotaMetadataManager)
 
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 9c2bcca..8d07f8e 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -20,7 +20,6 @@ import java.util
 import java.util.concurrent.TimeUnit
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
-import kafka.log.LogManager
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{RaftReplicaManager, RequestHandlerHelper}
 import org.apache.kafka.common.config.ConfigResource
@@ -45,7 +44,6 @@ class BrokerMetadataListener(brokerId: Int,
                              groupCoordinator: GroupCoordinator,
                              replicaManager: RaftReplicaManager,
                              txnCoordinator: TransactionCoordinator,
-                             logManager: LogManager,
                              threadNamePrefix: Option[String],
                              clientQuotaManager: ClientQuotaMetadataManager
                             ) extends MetaLogListener with KafkaMetricsGroup {
@@ -79,6 +77,11 @@ class BrokerMetadataListener(brokerId: Int,
     eventQueue.append(new HandleCommitsEvent(lastOffset, records))
   }
 
+  // Visible for testing. It's useful to execute events synchronously
+  private[metadata] def execCommits(lastOffset: Long, records: util.List[ApiMessage]): Unit = {
+    new HandleCommitsEvent(lastOffset, records).run()
+  }
+
   class HandleCommitsEvent(lastOffset: Long,
                            records: util.List[ApiMessage])
       extends EventQueue.FailureLoggingEvent(log) {
@@ -142,28 +145,19 @@ class BrokerMetadataListener(brokerId: Int,
       case e: Exception => throw new RuntimeException("Unknown metadata record type " +
       s"${record.apiKey()} in batch ending at offset ${lastOffset}.")
     }
-    recordType match {
-      case REGISTER_BROKER_RECORD => handleRegisterBrokerRecord(imageBuilder,
-        record.asInstanceOf[RegisterBrokerRecord])
-      case UNREGISTER_BROKER_RECORD => handleUnregisterBrokerRecord(imageBuilder,
-        record.asInstanceOf[UnregisterBrokerRecord])
-      case TOPIC_RECORD => handleTopicRecord(imageBuilder,
-        record.asInstanceOf[TopicRecord])
-      case PARTITION_RECORD => handlePartitionRecord(imageBuilder,
-        record.asInstanceOf[PartitionRecord])
-      case CONFIG_RECORD => handleConfigRecord(record.asInstanceOf[ConfigRecord])
-      case PARTITION_CHANGE_RECORD => handlePartitionChangeRecord(imageBuilder,
-        record.asInstanceOf[PartitionChangeRecord])
-      case FENCE_BROKER_RECORD => handleFenceBrokerRecord(imageBuilder,
-        record.asInstanceOf[FenceBrokerRecord])
-      case UNFENCE_BROKER_RECORD => handleUnfenceBrokerRecord(imageBuilder,
-        record.asInstanceOf[UnfenceBrokerRecord])
-      case REMOVE_TOPIC_RECORD => handleRemoveTopicRecord(imageBuilder,
-        record.asInstanceOf[RemoveTopicRecord])
-      case QUOTA_RECORD => handleQuotaRecord(imageBuilder,
-        record.asInstanceOf[QuotaRecord])
-      // TODO: handle FEATURE_LEVEL_RECORD
-      case _ => throw new RuntimeException(s"Unsupported record type ${recordType}")
+
+    record match {
+      case rec: RegisterBrokerRecord => handleRegisterBrokerRecord(imageBuilder, rec)
+      case rec: UnregisterBrokerRecord => handleUnregisterBrokerRecord(imageBuilder, rec)
+      case rec: FenceBrokerRecord => handleFenceBrokerRecord(imageBuilder, rec)
+      case rec: UnfenceBrokerRecord => handleUnfenceBrokerRecord(imageBuilder, rec)
+      case rec: TopicRecord => handleTopicRecord(imageBuilder, rec)
+      case rec: PartitionRecord => handlePartitionRecord(imageBuilder, rec)
+      case rec: PartitionChangeRecord => handlePartitionChangeRecord(imageBuilder, rec)
+      case rec: RemoveTopicRecord => handleRemoveTopicRecord(imageBuilder, rec)
+      case rec: ConfigRecord => handleConfigRecord(rec)
+      case rec: QuotaRecord => handleQuotaRecord(imageBuilder, rec)
+      case _ => throw new RuntimeException(s"Unhandled record $record with type $recordType")
     }
   }
 
@@ -222,9 +216,16 @@ class BrokerMetadataListener(brokerId: Int,
 
   def handleRemoveTopicRecord(imageBuilder: MetadataImageBuilder,
                               record: RemoveTopicRecord): Unit = {
-    val removedPartitions = imageBuilder.partitionsBuilder().
-      removeTopicById(record.topicId())
-    groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
+    imageBuilder.topicIdToName(record.topicId()) match {
+      case None =>
+        throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId}")
+
+      case Some(topicName) =>
+        info(s"Processing deletion of topic $topicName with id ${record.topicId}")
+        val removedPartitions = imageBuilder.partitionsBuilder().removeTopicById(record.topicId())
+        groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
+        configRepository.remove(new ConfigResource(ConfigResource.Type.TOPIC, topicName))
+    }
   }
 
   def handleQuotaRecord(imageBuilder: MetadataImageBuilder,
diff --git a/core/src/main/scala/kafka/server/metadata/CachedConfigRepository.scala b/core/src/main/scala/kafka/server/metadata/CachedConfigRepository.scala
index 2b52106..4c5257d 100644
--- a/core/src/main/scala/kafka/server/metadata/CachedConfigRepository.scala
+++ b/core/src/main/scala/kafka/server/metadata/CachedConfigRepository.scala
@@ -96,11 +96,16 @@ class CachedConfigRepository extends ConfigRepository {
 
   override def config(configResource: ConfigResource): Properties = {
     val properties = new Properties()
-    Option(configMap.get(configResource)).foreach {
-      _.entrySet().iterator().asScala.foreach { case e =>
-        properties.put(e.getKey, e.getValue)
+    Option(configMap.get(configResource)).foreach { resourceConfigMap =>
+      resourceConfigMap.entrySet.iterator.asScala.foreach { entry =>
+        properties.put(entry.getKey, entry.getValue)
       }
     }
     properties
   }
+
+  def remove(configResource: ConfigResource): Unit = {
+    configMap.remove(configResource)
+  }
+
 }
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala b/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
index 96ed8a5..d6b3b1b 100644
--- a/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
+++ b/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
@@ -116,21 +116,48 @@ class MetadataPartitionsBuilder(val brokerId: Int,
                                 val prevPartitions: MetadataPartitions) {
   private var newNameMap = prevPartitions.copyNameMap()
   private var newIdMap = prevPartitions.copyIdMap()
+  private var newReverseIdMap = prevPartitions.copyReverseIdMap()
   private val changed = Collections.newSetFromMap[Any](new util.IdentityHashMap())
   private val _localChanged = new util.HashSet[MetadataPartition]
   private val _localRemoved = new util.HashSet[MetadataPartition]
 
   def topicIdToName(id: Uuid): Option[String] = Option(newIdMap.get(id))
 
+  def topicNameToId(name: String): Option[Uuid] = Option(newReverseIdMap.get(name))
+
   def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
-    Option(newIdMap.remove(id)) match {
-      case None => throw new RuntimeException(s"Unable to locate topic with ID $id")
-      case Some(name) => newNameMap.remove(name).values().asScala
+    val name = Option(newIdMap.remove(id)).getOrElse {
+      throw new RuntimeException(s"Unable to locate topic with ID $id")
+    }
+
+    newReverseIdMap.remove(name)
+
+    val prevPartitionMap = newNameMap.remove(name)
+    if (prevPartitionMap == null) {
+      Seq.empty
+    } else {
+      changed.remove(prevPartitionMap)
+
+      val removedPartitions = prevPartitionMap.values
+      if (prevImageHasTopicId(id)) {
+        removedPartitions.forEach { partition =>
+          if (partition.isReplicaFor(brokerId)) {
+            _localRemoved.add(partition)
+          }
+        }
+      } else {
+        removedPartitions.forEach { partition =>
+          if (partition.isReplicaFor(brokerId)) {
+            _localChanged.remove(partition)
+          }
+        }
+      }
+      removedPartitions.asScala
     }
   }
 
   def handleChange(record: PartitionChangeRecord): Unit = {
-    Option(newIdMap.get(record.topicId())) match {
+    topicIdToName(record.topicId) match {
       case None => throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId()}")
       case Some(name) => Option(newNameMap.get(name)) match {
         case None => throw new RuntimeException(s"Unable to locate topic with name $name")
@@ -144,10 +171,14 @@ class MetadataPartitionsBuilder(val brokerId: Int,
 
   def addUuidMapping(name: String, id: Uuid): Unit = {
     newIdMap.put(id, name)
+    newReverseIdMap.put(name, id)
   }
 
   def removeUuidMapping(id: Uuid): Unit = {
-    newIdMap.remove(id)
+    val topicName = newIdMap.remove(id)
+    if (topicName != null) {
+      newReverseIdMap.remove(topicName)
+    }
   }
 
   def get(topicName: String, partitionId: Int): Option[MetadataPartition] = {
@@ -171,42 +202,58 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     val prevPartition = newPartitionMap.put(partition.partitionIndex, partition)
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
-    } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
-      _localRemoved.add(prevPartition)
+    } else if (prevPartition != null) {
+      maybeAddToLocalRemoved(prevPartition)
     }
     newNameMap.put(partition.topicName, newPartitionMap)
   }
 
+  private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = {
+    if (partition.isReplicaFor(brokerId)) {
+      val currentTopicId = newReverseIdMap.get(partition.topicName)
+      val prevImageHasTopic = if (currentTopicId != null) {
+        prevImageHasTopicId(currentTopicId)
+      } else {
+        prevPartitions.allTopicNames().contains(partition.topicName)
+      }
+
+      if (prevImageHasTopic) {
+        _localRemoved.add(partition)
+      }
+    }
+  }
+
+  private def prevImageHasTopicId(topicId: Uuid): Boolean = {
+    prevPartitions.topicIdToName(topicId).isDefined
+  }
+
   def remove(topicName: String, partitionId: Int): Unit = {
     val prevPartitionMap = newNameMap.get(topicName)
     if (prevPartitionMap != null) {
-      if (changed.contains(prevPartitionMap)) {
-        val prevPartition = prevPartitionMap.remove(partitionId)
-        if (prevPartition.isReplicaFor(brokerId)) {
-          _localRemoved.add(prevPartition)
-        }
+      val removedPartition = if (changed.contains(prevPartitionMap)) {
+        Option(prevPartitionMap.remove(partitionId))
       } else {
-        Option(prevPartitionMap.get(partitionId)).foreach { prevPartition =>
-          if (prevPartition.isReplicaFor(brokerId)) {
-            _localRemoved.add(prevPartition)
-          }
+        Option(prevPartitionMap.get(partitionId)).map { prevPartition =>
           val newPartitionMap = new util.HashMap[Int, MetadataPartition](prevPartitionMap.size() - 1)
           prevPartitionMap.forEach { (prevPartitionId, prevPartition) =>
-            if (!prevPartitionId.equals(partitionId)) {
+            if (prevPartitionId != partitionId) {
               newPartitionMap.put(prevPartitionId, prevPartition)
             }
           }
           changed.add(newPartitionMap)
           newNameMap.put(topicName, newPartitionMap)
+          prevPartition
         }
       }
+      removedPartition.foreach(maybeAddToLocalRemoved)
     }
   }
 
   def build(): MetadataPartitions = {
-    val result = MetadataPartitions(newNameMap, newIdMap)
+    val result = new MetadataPartitions(newNameMap, newIdMap, newReverseIdMap)
     newNameMap = Collections.unmodifiableMap(newNameMap)
     newIdMap = Collections.unmodifiableMap(newIdMap)
+    newReverseIdMap = Collections.unmodifiableMap(newReverseIdMap)
     result
   }
 
@@ -232,15 +279,15 @@ case class MetadataPartitions(private val nameMap: util.Map[String, util.Map[Int
   def topicNameToId(name: String): Option[Uuid] = Option(reverseIdMap.get(name))
 
   def copyNameMap(): util.Map[String, util.Map[Int, MetadataPartition]] = {
-    val copy = new util.HashMap[String, util.Map[Int, MetadataPartition]](nameMap.size())
-    copy.putAll(nameMap)
-    copy
+    new util.HashMap(nameMap)
   }
 
   def copyIdMap(): util.Map[Uuid, String] = {
-    val copy = new util.HashMap[Uuid, String](idMap.size())
-    copy.putAll(idMap)
-    copy
+    new util.HashMap(idMap)
+  }
+
+  def copyReverseIdMap(): util.Map[String, Uuid] = {
+    new util.HashMap(reverseIdMap)
   }
 
   def allPartitions(): Iterator[MetadataPartition] = new AllPartitionsIterator(nameMap).asScala
diff --git a/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala b/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
index a708d73..fcd0925 100644
--- a/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
+++ b/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
@@ -18,13 +18,12 @@
 package kafka.server.metadata
 
 import java.util.Collections
-
-import org.apache.kafka.common.Uuid
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{Test, Timeout}
 import java.util.concurrent.TimeUnit
 
+import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.metadata.PartitionChangeRecord
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{Test, Timeout}
 
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
@@ -38,9 +37,12 @@ class MetadataPartitionsTest {
   private def newPartition(topicName: String,
                            partitionIndex: Int,
                            replicas: Option[Seq[Int]] = None,
-                           isr: Option[Seq[Int]] = None): MetadataPartition = {
-    val effectiveReplicas = asJavaList(replicas
-      .getOrElse(List(partitionIndex, partitionIndex + 1, partitionIndex + 2)))
+                           isr: Option[Seq[Int]] = None,
+                           numBrokers: Int = 6): MetadataPartition = {
+    val effectiveReplicas = asJavaList(replicas.getOrElse {
+      val preferredLeaderId = partitionIndex % numBrokers
+      List(preferredLeaderId, preferredLeaderId + 1, preferredLeaderId + 2)
+    })
 
     val effectiveIsr = isr match {
       case None => effectiveReplicas
@@ -127,12 +129,9 @@ class MetadataPartitionsTest {
   @Test
   def testAllTopicNames(): Unit = {
     val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
-    builder.set(newPartition("foo", 0))
-    builder.set(newPartition("foo", 1))
-    builder.set(newPartition("foo", 2))
-    builder.set(newPartition("bar", 0))
-    builder.set(newPartition("baz", 0))
-    builder.set(newPartition("baz", 1))
+    createTopic("foo", numPartitions = 3, builder)
+    createTopic("bar", numPartitions = 2, builder)
+    createTopic("baz", numPartitions = 3, builder)
     val image = builder.build()
     val expectedTopicNames = new mutable.HashSet[String]()
     expectedTopicNames += "foo"
@@ -185,6 +184,139 @@ class MetadataPartitionsTest {
     assertEquals(2, updatedLeader.leaderId)
   }
 
+  @Test
+  def testTopicCreateAndDelete(): Unit = {
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet
+    val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, builder)
+    val localTopicPartitions = localChanged(builder)
+
+    assertTrue(localTopicPartitions.subsetOf(topicPartitions))
+    assertTrue(localTopicPartitions.nonEmpty)
+    assertNotEquals(topicPartitions, localTopicPartitions)
+
+    builder.removeTopicById(topicId)
+    assertEquals(None, builder.topicIdToName(topicId))
+    assertEquals(None, builder.topicNameToId(topic))
+    assertEquals(Set.empty, filterPartitions(builder, topicPartitions))
+    assertEquals(Set.empty, localRemoved(builder))
+    assertEquals(Set.empty, localChanged(builder))
+
+    val metadata = builder.build()
+    assertEquals(Set.empty, metadata.allTopicNames())
+    assertEquals(None, metadata.topicIdToName(topicId))
+    assertEquals(None, metadata.topicNameToId(topic))
+    assertEquals(Set.empty, metadata.topicPartitions(topic).toSet)
+  }
+
+  @Test
+  def testTopicRemoval(): Unit = {
+    val brokerId = 0
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet
+    val createBuilder = new MetadataPartitionsBuilder(brokerId, emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, createBuilder)
+    val localTopicPartitions = localChanged(createBuilder)
+    val createMetadata = createBuilder.build()
+
+    assertTrue(localTopicPartitions.subsetOf(topicPartitions))
+    assertTrue(localTopicPartitions.nonEmpty)
+    assertNotEquals(topicPartitions, localTopicPartitions)
+
+    val deleteBuilder = new MetadataPartitionsBuilder(brokerId = 0, createMetadata)
+    deleteBuilder.removeTopicById(topicId)
+    assertEquals(None, deleteBuilder.topicIdToName(topicId))
+    assertEquals(None, deleteBuilder.topicNameToId(topic))
+    assertEquals(Set.empty, filterPartitions(deleteBuilder, topicPartitions))
+    assertEquals(localTopicPartitions, localRemoved(deleteBuilder))
+    assertEquals(Set.empty, localChanged(deleteBuilder))
+
+    val deleteMetadata = deleteBuilder.build()
+    assertEquals(Set.empty, deleteMetadata.allTopicNames())
+    assertEquals(None, deleteMetadata.topicIdToName(topicId))
+    assertEquals(None, deleteMetadata.topicNameToId(topic))
+    assertEquals(Set.empty, deleteMetadata.topicPartitions(topic).toSet)
+  }
+
+  @Test
+  def testTopicDeleteAndRecreate(): Unit = {
+    val topic = "foo"
+    val numPartitions = 3
+    val initialBuilder = new MetadataPartitionsBuilder(0, emptyPartitions)
+    val initialTopicId = createTopic(topic, numPartitions, initialBuilder)
+    val initialLocalTopicPartitions = initialBuilder.localChanged().map(_.toTopicPartition).toSet
+    val initialMetadata = initialBuilder.build()
+
+    val recreateBuilder = new MetadataPartitionsBuilder(brokerId = 0, initialMetadata)
+    recreateBuilder.removeTopicById(initialTopicId)
+    assertEquals(initialLocalTopicPartitions, localRemoved(recreateBuilder))
+
+    val recreatedNumPartitions = 10
+    val recreatedTopicId = createTopic(topic, recreatedNumPartitions, recreateBuilder)
+    val recreatedTopicPartitions = (0 until recreatedNumPartitions).map(new TopicPartition(topic, _)).toSet
+    val recreatedLocalTopicPartitions = localChanged(recreateBuilder)
+
+    assertTrue(recreatedLocalTopicPartitions.nonEmpty)
+    assertNotEquals(recreatedLocalTopicPartitions, recreatedTopicPartitions)
+    assertTrue(recreatedLocalTopicPartitions.subsetOf(recreatedTopicPartitions))
+    assertFalse(recreatedLocalTopicPartitions.subsetOf(initialLocalTopicPartitions))
+    assertEquals(Some(topic), recreateBuilder.topicIdToName(recreatedTopicId))
+    assertEquals(Some(recreatedTopicId), recreateBuilder.topicNameToId(topic))
+    assertEquals(recreatedTopicPartitions, filterPartitions(recreateBuilder, recreatedTopicPartitions))
+    assertEquals(initialLocalTopicPartitions, localRemoved(recreateBuilder))
+
+    val recreatedMetadata = recreateBuilder.build()
+    assertEquals(recreatedTopicPartitions, filterPartitions(recreatedMetadata, topic))
+    assertEquals(Some(recreatedTopicId), recreatedMetadata.topicNameToId(topic))
+    assertEquals(Some(topic), recreatedMetadata.topicIdToName(recreatedTopicId))
+  }
+
+  private def localRemoved(
+    builder: MetadataPartitionsBuilder
+  ): Set[TopicPartition] = {
+    builder.localRemoved().toSet[MetadataPartition].map(_.toTopicPartition)
+  }
+
+  private def localChanged(
+    builder: MetadataPartitionsBuilder
+  ): Set[TopicPartition] = {
+    builder.localChanged().toSet[MetadataPartition].map(_.toTopicPartition)
+  }
+
+  private def filterPartitions(
+    metadata: MetadataPartitions,
+    topic: String
+  ): Set[TopicPartition] = {
+    metadata.topicPartitions(topic).map(_.toTopicPartition).toSet
+  }
+
+  private def filterPartitions(
+    builder: MetadataPartitionsBuilder,
+    topicPartitions: Set[TopicPartition]
+  ): Set[TopicPartition] = {
+    topicPartitions.filter { topicPartition =>
+      builder.get(topicPartition.topic, topicPartition.partition).isDefined
+    }
+  }
+
+  private def createTopic(
+    topic: String,
+    numPartitions: Int,
+    builder: MetadataPartitionsBuilder
+  ): Uuid = {
+    val topicId = Uuid.randomUuid()
+    builder.addUuidMapping(topic, topicId)
+
+    (0 until numPartitions).foreach { partition =>
+      builder.set(newPartition(topic, partition))
+    }
+
+    topicId
+  }
+
   private def asJavaList(replicas: Iterable[Int]): java.util.List[Integer] = {
     replicas.map(Int.box).toList.asJava
   }
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
new file mode 100644
index 0000000..545fe48
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.metadata
+
+import java.util.Properties
+
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.log.LogConfig
+import kafka.server.RaftReplicaManager
+import kafka.utils.Implicits._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.metadata.{ConfigRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
+import org.apache.kafka.common.protocol.ApiMessage
+import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+class BrokerMetadataListenerTest {
+
+  private val brokerId = 1
+  private val time = new MockTime()
+  private val configRepository = new CachedConfigRepository
+  private val metadataCache = new RaftMetadataCache(brokerId)
+  private val groupCoordinator = mock(classOf[GroupCoordinator])
+  private val replicaManager = mock(classOf[RaftReplicaManager])
+  private val txnCoordinator = mock(classOf[TransactionCoordinator])
+  private val clientQuotaManager = mock(classOf[ClientQuotaMetadataManager])
+  private var lastMetadataOffset = 0L
+
+  private val listener = new BrokerMetadataListener(
+    brokerId,
+    time,
+    metadataCache,
+    configRepository,
+    groupCoordinator,
+    replicaManager,
+    txnCoordinator,
+    threadNamePrefix = None,
+    clientQuotaManager
+  )
+
+  @Test
+  def testTopicCreationAndDeletion(): Unit = {
+    val topicId = Uuid.randomUuid()
+    val topic = "foo"
+    val numPartitions = 10
+    val config = Map(
+      LogConfig.CleanupPolicyProp -> LogConfig.Compact,
+      LogConfig.MaxCompactionLagMsProp -> "5000"
+    )
+    val localPartitions = createAndAssert(topicId, topic, config, numPartitions, numBrokers = 4)
+    deleteTopic(topicId, topic, numPartitions, localPartitions)
+  }
+
+  private def deleteTopic(
+    topicId: Uuid,
+    topic: String,
+    numPartitions: Int,
+    localPartitions: Set[TopicPartition]
+  ): Unit = {
+    val deleteRecord = new RemoveTopicRecord()
+      .setTopicId(topicId)
+    lastMetadataOffset += 1
+    listener.execCommits(lastOffset = lastMetadataOffset, List[ApiMessage](
+      deleteRecord,
+    ).asJava)
+
+    assertFalse(metadataCache.contains(topic))
+    assertEquals(new Properties, configRepository.topicConfig(topic))
+
+    verify(groupCoordinator).handleDeletedPartitions(ArgumentMatchers.argThat[Seq[TopicPartition]] { partitions =>
+      partitions.toSet == partitionSet(topic, numPartitions)
+    })
+
+    val deleteImageCapture: ArgumentCaptor[MetadataImageBuilder] =
+      ArgumentCaptor.forClass(classOf[MetadataImageBuilder])
+    verify(replicaManager).handleMetadataRecords(
+      deleteImageCapture.capture(),
+      ArgumentMatchers.eq(lastMetadataOffset),
+      any()
+    )
+
+    val deleteImage = deleteImageCapture.getValue
+    assertTrue(deleteImage.hasPartitionChanges)
+    val localRemoved = deleteImage.partitionsBuilder().localRemoved()
+    assertEquals(localPartitions, localRemoved.map(_.toTopicPartition).toSet)
+  }
+
+  private def createAndAssert(
+    topicId: Uuid,
+    topic: String,
+    topicConfig: Map[String, String],
+    numPartitions: Int,
+    numBrokers: Int
+  ): Set[TopicPartition] = {
+    val records = new java.util.ArrayList[ApiMessage]
+    records.add(new TopicRecord()
+      .setName(topic)
+      .setTopicId(topicId)
+    )
+
+    val localTopicPartitions = mutable.Set.empty[TopicPartition]
+    (0 until numPartitions).map { partitionId =>
+      val preferredLeaderId = partitionId % numBrokers
+      val replicas = asJavaList(Seq(
+        preferredLeaderId,
+        preferredLeaderId + 1,
+        preferredLeaderId + 2
+      ))
+
+      if (replicas.contains(brokerId)) {
+        localTopicPartitions.add(new TopicPartition(topic, partitionId))
+      }
+
+      records.add(new PartitionRecord()
+        .setTopicId(topicId)
+        .setPartitionId(partitionId)
+        .setLeader(preferredLeaderId)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(0)
+        .setReplicas(replicas)
+        .setIsr(replicas)
+      )
+    }
+
+    topicConfig.forKeyValue { (key, value) =>
+      records.add(new ConfigRecord()
+        .setResourceName(topic)
+        .setResourceType(ConfigResource.Type.TOPIC.id())
+        .setName(key)
+        .setValue(value)
+      )
+    }
+
+    lastMetadataOffset += records.size()
+    listener.execCommits(lastOffset = lastMetadataOffset, records)
+    assertTrue(metadataCache.contains(topic))
+    assertEquals(Some(numPartitions), metadataCache.numPartitions(topic))
+    assertEquals(topicConfig, configRepository.topicConfig(topic).asScala)
+
+    val imageCapture: ArgumentCaptor[MetadataImageBuilder] =
+      ArgumentCaptor.forClass(classOf[MetadataImageBuilder])
+    verify(replicaManager).handleMetadataRecords(
+      imageCapture.capture(),
+      ArgumentMatchers.eq(lastMetadataOffset),
+      any()
+    )
+
+    val createImage = imageCapture.getValue
+    assertTrue(createImage.hasPartitionChanges)
+    val localChanged = createImage.partitionsBuilder().localChanged()
+    assertEquals(localTopicPartitions, localChanged.map(_.toTopicPartition).toSet)
+
+    localTopicPartitions.toSet
+  }
+
+  private def partitionSet(topic: String, numPartitions: Int): Set[TopicPartition] = {
+    (0 until numPartitions).map(new TopicPartition(topic, _)).toSet
+  }
+
+  private def asJavaList(replicas: Iterable[Int]): java.util.List[Integer] = {
+    replicas.map(Int.box).toList.asJava
+  }
+
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 6ee1b7e..eb73eef 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -790,7 +790,7 @@ public final class QuorumController implements Controller {
             snapshotRegistry, sessionTimeoutNs, replicaPlacementPolicy);
         this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
         this.replicationControl = new ReplicationControlManager(snapshotRegistry,
-            logContext, new Random(), defaultReplicationFactor, defaultNumPartitions,
+            logContext, defaultReplicationFactor, defaultNumPartitions,
             configurationControl, clusterControl);
         this.logManager = logManager;
         this.metaLogListener = new QuorumMetaLogListener();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 4a58b3a..66b7374 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -72,7 +72,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
-import java.util.Random;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
 import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
@@ -246,11 +245,6 @@ public class ReplicationControlManager {
     private final Logger log;
 
     /**
-     * The random number generator used by this object.
-     */
-    private final Random random;
-
-    /**
      * The KIP-464 default replication factor that is used if a CreateTopics request does
      * not specify one.
      */
@@ -289,14 +283,12 @@ public class ReplicationControlManager {
 
     ReplicationControlManager(SnapshotRegistry snapshotRegistry,
                               LogContext logContext,
-                              Random random,
                               short defaultReplicationFactor,
                               int defaultNumPartitions,
                               ConfigurationControlManager configurationControl,
                               ClusterControlManager clusterControl) {
         this.snapshotRegistry = snapshotRegistry;
         this.log = logContext.logger(ReplicationControlManager.class);
-        this.random = random;
         this.defaultReplicationFactor = defaultReplicationFactor;
         this.defaultNumPartitions = defaultNumPartitions;
         this.configurationControl = configurationControl;
@@ -516,7 +508,7 @@ public class ReplicationControlManager {
                         " times: " + e.getMessage());
             }
         }
-        Uuid topicId = new Uuid(random.nextLong(), random.nextLong());
+        Uuid topicId = Uuid.randomUuid();
         successes.put(topic.name(), new CreatableTopicResult().
             setName(topic.name()).
             setTopicId(topicId).
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 8b00c10..fa7fc00 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -84,7 +84,6 @@ public class ReplicationControlManagerTest {
             new LogContext(), snapshotRegistry, Collections.emptyMap());
         final ReplicationControlManager replicationControl = new ReplicationControlManager(snapshotRegistry,
             new LogContext(),
-            random,
             (short) 3,
             1,
             configurationControl,