You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/03/08 19:28:30 UTC
[kafka] branch 2.8 updated: KAFKA-12403;
Ensure local state deleted on `RemoveTopicRecord` received (#10252)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 53cae73 KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received (#10252)
53cae73 is described below
commit 53cae73d552c11a012a1861ffdb69c1d03d3f8a9
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Mar 8 11:21:42 2021 -0800
KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received (#10252)
This patch implements additional handling logic for `RemoveTopic` records:
- Update `MetadataPartitions` to ensure addition of deleted partitions to `localRemoved` set
- Ensure topic configs are removed from `ConfigRepository`
- Propagate deleted partitions to `GroupCoordinator` so that corresponding offset commits can be removed
This patch also changes the controller topic id generation logic to use `Uuid.randomUuid` rather than `Random`.
Reviewers: Ismael Juma <is...@juma.me.uk>, Chia-Ping Tsai <ch...@gmail.com>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 1 -
.../server/metadata/BrokerMetadataListener.scala | 55 +++---
.../server/metadata/CachedConfigRepository.scala | 11 +-
.../kafka/server/metadata/MetadataPartitions.scala | 95 ++++++++---
.../server/metadata/MetadataPartitionsTest.scala | 158 +++++++++++++++--
.../metadata/BrokerMetadataListenerTest.scala | 187 +++++++++++++++++++++
.../apache/kafka/controller/QuorumController.java | 2 +-
.../controller/ReplicationControlManager.java | 10 +-
.../controller/ReplicationControlManagerTest.java | 1 -
9 files changed, 441 insertions(+), 79 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index e7f7a12..0ce1975 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -266,7 +266,6 @@ class BrokerServer(
groupCoordinator,
replicaManager,
transactionCoordinator,
- logManager,
threadNamePrefix,
clientQuotaMetadataManager)
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 9c2bcca..8d07f8e 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -20,7 +20,6 @@ import java.util
import java.util.concurrent.TimeUnit
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
-import kafka.log.LogManager
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{RaftReplicaManager, RequestHandlerHelper}
import org.apache.kafka.common.config.ConfigResource
@@ -45,7 +44,6 @@ class BrokerMetadataListener(brokerId: Int,
groupCoordinator: GroupCoordinator,
replicaManager: RaftReplicaManager,
txnCoordinator: TransactionCoordinator,
- logManager: LogManager,
threadNamePrefix: Option[String],
clientQuotaManager: ClientQuotaMetadataManager
) extends MetaLogListener with KafkaMetricsGroup {
@@ -79,6 +77,11 @@ class BrokerMetadataListener(brokerId: Int,
eventQueue.append(new HandleCommitsEvent(lastOffset, records))
}
+ // Visible for testing. It's useful to execute events synchronously
+ private[metadata] def execCommits(lastOffset: Long, records: util.List[ApiMessage]): Unit = {
+ new HandleCommitsEvent(lastOffset, records).run()
+ }
+
class HandleCommitsEvent(lastOffset: Long,
records: util.List[ApiMessage])
extends EventQueue.FailureLoggingEvent(log) {
@@ -142,28 +145,19 @@ class BrokerMetadataListener(brokerId: Int,
case e: Exception => throw new RuntimeException("Unknown metadata record type " +
s"${record.apiKey()} in batch ending at offset ${lastOffset}.")
}
- recordType match {
- case REGISTER_BROKER_RECORD => handleRegisterBrokerRecord(imageBuilder,
- record.asInstanceOf[RegisterBrokerRecord])
- case UNREGISTER_BROKER_RECORD => handleUnregisterBrokerRecord(imageBuilder,
- record.asInstanceOf[UnregisterBrokerRecord])
- case TOPIC_RECORD => handleTopicRecord(imageBuilder,
- record.asInstanceOf[TopicRecord])
- case PARTITION_RECORD => handlePartitionRecord(imageBuilder,
- record.asInstanceOf[PartitionRecord])
- case CONFIG_RECORD => handleConfigRecord(record.asInstanceOf[ConfigRecord])
- case PARTITION_CHANGE_RECORD => handlePartitionChangeRecord(imageBuilder,
- record.asInstanceOf[PartitionChangeRecord])
- case FENCE_BROKER_RECORD => handleFenceBrokerRecord(imageBuilder,
- record.asInstanceOf[FenceBrokerRecord])
- case UNFENCE_BROKER_RECORD => handleUnfenceBrokerRecord(imageBuilder,
- record.asInstanceOf[UnfenceBrokerRecord])
- case REMOVE_TOPIC_RECORD => handleRemoveTopicRecord(imageBuilder,
- record.asInstanceOf[RemoveTopicRecord])
- case QUOTA_RECORD => handleQuotaRecord(imageBuilder,
- record.asInstanceOf[QuotaRecord])
- // TODO: handle FEATURE_LEVEL_RECORD
- case _ => throw new RuntimeException(s"Unsupported record type ${recordType}")
+
+ record match {
+ case rec: RegisterBrokerRecord => handleRegisterBrokerRecord(imageBuilder, rec)
+ case rec: UnregisterBrokerRecord => handleUnregisterBrokerRecord(imageBuilder, rec)
+ case rec: FenceBrokerRecord => handleFenceBrokerRecord(imageBuilder, rec)
+ case rec: UnfenceBrokerRecord => handleUnfenceBrokerRecord(imageBuilder, rec)
+ case rec: TopicRecord => handleTopicRecord(imageBuilder, rec)
+ case rec: PartitionRecord => handlePartitionRecord(imageBuilder, rec)
+ case rec: PartitionChangeRecord => handlePartitionChangeRecord(imageBuilder, rec)
+ case rec: RemoveTopicRecord => handleRemoveTopicRecord(imageBuilder, rec)
+ case rec: ConfigRecord => handleConfigRecord(rec)
+ case rec: QuotaRecord => handleQuotaRecord(imageBuilder, rec)
+ case _ => throw new RuntimeException(s"Unhandled record $record with type $recordType")
}
}
@@ -222,9 +216,16 @@ class BrokerMetadataListener(brokerId: Int,
def handleRemoveTopicRecord(imageBuilder: MetadataImageBuilder,
record: RemoveTopicRecord): Unit = {
- val removedPartitions = imageBuilder.partitionsBuilder().
- removeTopicById(record.topicId())
- groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
+ imageBuilder.topicIdToName(record.topicId()) match {
+ case None =>
+ throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId}")
+
+ case Some(topicName) =>
+ info(s"Processing deletion of topic $topicName with id ${record.topicId}")
+ val removedPartitions = imageBuilder.partitionsBuilder().removeTopicById(record.topicId())
+ groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
+ configRepository.remove(new ConfigResource(ConfigResource.Type.TOPIC, topicName))
+ }
}
def handleQuotaRecord(imageBuilder: MetadataImageBuilder,
diff --git a/core/src/main/scala/kafka/server/metadata/CachedConfigRepository.scala b/core/src/main/scala/kafka/server/metadata/CachedConfigRepository.scala
index 2b52106..4c5257d 100644
--- a/core/src/main/scala/kafka/server/metadata/CachedConfigRepository.scala
+++ b/core/src/main/scala/kafka/server/metadata/CachedConfigRepository.scala
@@ -96,11 +96,16 @@ class CachedConfigRepository extends ConfigRepository {
override def config(configResource: ConfigResource): Properties = {
val properties = new Properties()
- Option(configMap.get(configResource)).foreach {
- _.entrySet().iterator().asScala.foreach { case e =>
- properties.put(e.getKey, e.getValue)
+ Option(configMap.get(configResource)).foreach { resourceConfigMap =>
+ resourceConfigMap.entrySet.iterator.asScala.foreach { entry =>
+ properties.put(entry.getKey, entry.getValue)
}
}
properties
}
+
+ def remove(configResource: ConfigResource): Unit = {
+ configMap.remove(configResource)
+ }
+
}
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala b/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
index 96ed8a5..d6b3b1b 100644
--- a/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
+++ b/core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
@@ -116,21 +116,48 @@ class MetadataPartitionsBuilder(val brokerId: Int,
val prevPartitions: MetadataPartitions) {
private var newNameMap = prevPartitions.copyNameMap()
private var newIdMap = prevPartitions.copyIdMap()
+ private var newReverseIdMap = prevPartitions.copyReverseIdMap()
private val changed = Collections.newSetFromMap[Any](new util.IdentityHashMap())
private val _localChanged = new util.HashSet[MetadataPartition]
private val _localRemoved = new util.HashSet[MetadataPartition]
def topicIdToName(id: Uuid): Option[String] = Option(newIdMap.get(id))
+ def topicNameToId(name: String): Option[Uuid] = Option(newReverseIdMap.get(name))
+
def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
- Option(newIdMap.remove(id)) match {
- case None => throw new RuntimeException(s"Unable to locate topic with ID $id")
- case Some(name) => newNameMap.remove(name).values().asScala
+ val name = Option(newIdMap.remove(id)).getOrElse {
+ throw new RuntimeException(s"Unable to locate topic with ID $id")
+ }
+
+ newReverseIdMap.remove(name)
+
+ val prevPartitionMap = newNameMap.remove(name)
+ if (prevPartitionMap == null) {
+ Seq.empty
+ } else {
+ changed.remove(prevPartitionMap)
+
+ val removedPartitions = prevPartitionMap.values
+ if (prevImageHasTopicId(id)) {
+ removedPartitions.forEach { partition =>
+ if (partition.isReplicaFor(brokerId)) {
+ _localRemoved.add(partition)
+ }
+ }
+ } else {
+ removedPartitions.forEach { partition =>
+ if (partition.isReplicaFor(brokerId)) {
+ _localChanged.remove(partition)
+ }
+ }
+ }
+ removedPartitions.asScala
}
}
def handleChange(record: PartitionChangeRecord): Unit = {
- Option(newIdMap.get(record.topicId())) match {
+ topicIdToName(record.topicId) match {
case None => throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId()}")
case Some(name) => Option(newNameMap.get(name)) match {
case None => throw new RuntimeException(s"Unable to locate topic with name $name")
@@ -144,10 +171,14 @@ class MetadataPartitionsBuilder(val brokerId: Int,
def addUuidMapping(name: String, id: Uuid): Unit = {
newIdMap.put(id, name)
+ newReverseIdMap.put(name, id)
}
def removeUuidMapping(id: Uuid): Unit = {
- newIdMap.remove(id)
+ val topicName = newIdMap.remove(id)
+ if (topicName != null) {
+ newReverseIdMap.remove(topicName)
+ }
}
def get(topicName: String, partitionId: Int): Option[MetadataPartition] = {
@@ -171,42 +202,58 @@ class MetadataPartitionsBuilder(val brokerId: Int,
val prevPartition = newPartitionMap.put(partition.partitionIndex, partition)
if (partition.isReplicaFor(brokerId)) {
_localChanged.add(partition)
- } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
- _localRemoved.add(prevPartition)
+ } else if (prevPartition != null) {
+ maybeAddToLocalRemoved(prevPartition)
}
newNameMap.put(partition.topicName, newPartitionMap)
}
+ private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = {
+ if (partition.isReplicaFor(brokerId)) {
+ val currentTopicId = newReverseIdMap.get(partition.topicName)
+ val prevImageHasTopic = if (currentTopicId != null) {
+ prevImageHasTopicId(currentTopicId)
+ } else {
+ prevPartitions.allTopicNames().contains(partition.topicName)
+ }
+
+ if (prevImageHasTopic) {
+ _localRemoved.add(partition)
+ }
+ }
+ }
+
+ private def prevImageHasTopicId(topicId: Uuid): Boolean = {
+ prevPartitions.topicIdToName(topicId).isDefined
+ }
+
def remove(topicName: String, partitionId: Int): Unit = {
val prevPartitionMap = newNameMap.get(topicName)
if (prevPartitionMap != null) {
- if (changed.contains(prevPartitionMap)) {
- val prevPartition = prevPartitionMap.remove(partitionId)
- if (prevPartition.isReplicaFor(brokerId)) {
- _localRemoved.add(prevPartition)
- }
+ val removedPartition = if (changed.contains(prevPartitionMap)) {
+ Option(prevPartitionMap.remove(partitionId))
} else {
- Option(prevPartitionMap.get(partitionId)).foreach { prevPartition =>
- if (prevPartition.isReplicaFor(brokerId)) {
- _localRemoved.add(prevPartition)
- }
+ Option(prevPartitionMap.get(partitionId)).map { prevPartition =>
val newPartitionMap = new util.HashMap[Int, MetadataPartition](prevPartitionMap.size() - 1)
prevPartitionMap.forEach { (prevPartitionId, prevPartition) =>
- if (!prevPartitionId.equals(partitionId)) {
+ if (prevPartitionId != partitionId) {
newPartitionMap.put(prevPartitionId, prevPartition)
}
}
changed.add(newPartitionMap)
newNameMap.put(topicName, newPartitionMap)
+ prevPartition
}
}
+ removedPartition.foreach(maybeAddToLocalRemoved)
}
}
def build(): MetadataPartitions = {
- val result = MetadataPartitions(newNameMap, newIdMap)
+ val result = new MetadataPartitions(newNameMap, newIdMap, newReverseIdMap)
newNameMap = Collections.unmodifiableMap(newNameMap)
newIdMap = Collections.unmodifiableMap(newIdMap)
+ newReverseIdMap = Collections.unmodifiableMap(newReverseIdMap)
result
}
@@ -232,15 +279,15 @@ case class MetadataPartitions(private val nameMap: util.Map[String, util.Map[Int
def topicNameToId(name: String): Option[Uuid] = Option(reverseIdMap.get(name))
def copyNameMap(): util.Map[String, util.Map[Int, MetadataPartition]] = {
- val copy = new util.HashMap[String, util.Map[Int, MetadataPartition]](nameMap.size())
- copy.putAll(nameMap)
- copy
+ new util.HashMap(nameMap)
}
def copyIdMap(): util.Map[Uuid, String] = {
- val copy = new util.HashMap[Uuid, String](idMap.size())
- copy.putAll(idMap)
- copy
+ new util.HashMap(idMap)
+ }
+
+ def copyReverseIdMap(): util.Map[String, Uuid] = {
+ new util.HashMap(reverseIdMap)
}
def allPartitions(): Iterator[MetadataPartition] = new AllPartitionsIterator(nameMap).asScala
diff --git a/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala b/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
index a708d73..fcd0925 100644
--- a/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
+++ b/core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
@@ -18,13 +18,12 @@
package kafka.server.metadata
import java.util.Collections
-
-import org.apache.kafka.common.Uuid
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{Test, Timeout}
import java.util.concurrent.TimeUnit
+import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.metadata.PartitionChangeRecord
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{Test, Timeout}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@@ -38,9 +37,12 @@ class MetadataPartitionsTest {
private def newPartition(topicName: String,
partitionIndex: Int,
replicas: Option[Seq[Int]] = None,
- isr: Option[Seq[Int]] = None): MetadataPartition = {
- val effectiveReplicas = asJavaList(replicas
- .getOrElse(List(partitionIndex, partitionIndex + 1, partitionIndex + 2)))
+ isr: Option[Seq[Int]] = None,
+ numBrokers: Int = 6): MetadataPartition = {
+ val effectiveReplicas = asJavaList(replicas.getOrElse {
+ val preferredLeaderId = partitionIndex % numBrokers
+ List(preferredLeaderId, preferredLeaderId + 1, preferredLeaderId + 2)
+ })
val effectiveIsr = isr match {
case None => effectiveReplicas
@@ -127,12 +129,9 @@ class MetadataPartitionsTest {
@Test
def testAllTopicNames(): Unit = {
val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
- builder.set(newPartition("foo", 0))
- builder.set(newPartition("foo", 1))
- builder.set(newPartition("foo", 2))
- builder.set(newPartition("bar", 0))
- builder.set(newPartition("baz", 0))
- builder.set(newPartition("baz", 1))
+ createTopic("foo", numPartitions = 3, builder)
+ createTopic("bar", numPartitions = 2, builder)
+ createTopic("baz", numPartitions = 3, builder)
val image = builder.build()
val expectedTopicNames = new mutable.HashSet[String]()
expectedTopicNames += "foo"
@@ -185,6 +184,139 @@ class MetadataPartitionsTest {
assertEquals(2, updatedLeader.leaderId)
}
+ @Test
+ def testTopicCreateAndDelete(): Unit = {
+ val topic = "foo"
+ val numPartitions = 3
+ val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet
+ val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+ val topicId = createTopic(topic, numPartitions, builder)
+ val localTopicPartitions = localChanged(builder)
+
+ assertTrue(localTopicPartitions.subsetOf(topicPartitions))
+ assertTrue(localTopicPartitions.nonEmpty)
+ assertNotEquals(topicPartitions, localTopicPartitions)
+
+ builder.removeTopicById(topicId)
+ assertEquals(None, builder.topicIdToName(topicId))
+ assertEquals(None, builder.topicNameToId(topic))
+ assertEquals(Set.empty, filterPartitions(builder, topicPartitions))
+ assertEquals(Set.empty, localRemoved(builder))
+ assertEquals(Set.empty, localChanged(builder))
+
+ val metadata = builder.build()
+ assertEquals(Set.empty, metadata.allTopicNames())
+ assertEquals(None, metadata.topicIdToName(topicId))
+ assertEquals(None, metadata.topicNameToId(topic))
+ assertEquals(Set.empty, metadata.topicPartitions(topic).toSet)
+ }
+
+ @Test
+ def testTopicRemoval(): Unit = {
+ val brokerId = 0
+ val topic = "foo"
+ val numPartitions = 3
+ val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet
+ val createBuilder = new MetadataPartitionsBuilder(brokerId, emptyPartitions)
+ val topicId = createTopic(topic, numPartitions, createBuilder)
+ val localTopicPartitions = localChanged(createBuilder)
+ val createMetadata = createBuilder.build()
+
+ assertTrue(localTopicPartitions.subsetOf(topicPartitions))
+ assertTrue(localTopicPartitions.nonEmpty)
+ assertNotEquals(topicPartitions, localTopicPartitions)
+
+ val deleteBuilder = new MetadataPartitionsBuilder(brokerId = 0, createMetadata)
+ deleteBuilder.removeTopicById(topicId)
+ assertEquals(None, deleteBuilder.topicIdToName(topicId))
+ assertEquals(None, deleteBuilder.topicNameToId(topic))
+ assertEquals(Set.empty, filterPartitions(deleteBuilder, topicPartitions))
+ assertEquals(localTopicPartitions, localRemoved(deleteBuilder))
+ assertEquals(Set.empty, localChanged(deleteBuilder))
+
+ val deleteMetadata = deleteBuilder.build()
+ assertEquals(Set.empty, deleteMetadata.allTopicNames())
+ assertEquals(None, deleteMetadata.topicIdToName(topicId))
+ assertEquals(None, deleteMetadata.topicNameToId(topic))
+ assertEquals(Set.empty, deleteMetadata.topicPartitions(topic).toSet)
+ }
+
+ @Test
+ def testTopicDeleteAndRecreate(): Unit = {
+ val topic = "foo"
+ val numPartitions = 3
+ val initialBuilder = new MetadataPartitionsBuilder(0, emptyPartitions)
+ val initialTopicId = createTopic(topic, numPartitions, initialBuilder)
+ val initialLocalTopicPartitions = initialBuilder.localChanged().map(_.toTopicPartition).toSet
+ val initialMetadata = initialBuilder.build()
+
+ val recreateBuilder = new MetadataPartitionsBuilder(brokerId = 0, initialMetadata)
+ recreateBuilder.removeTopicById(initialTopicId)
+ assertEquals(initialLocalTopicPartitions, localRemoved(recreateBuilder))
+
+ val recreatedNumPartitions = 10
+ val recreatedTopicId = createTopic(topic, recreatedNumPartitions, recreateBuilder)
+ val recreatedTopicPartitions = (0 until recreatedNumPartitions).map(new TopicPartition(topic, _)).toSet
+ val recreatedLocalTopicPartitions = localChanged(recreateBuilder)
+
+ assertTrue(recreatedLocalTopicPartitions.nonEmpty)
+ assertNotEquals(recreatedLocalTopicPartitions, recreatedTopicPartitions)
+ assertTrue(recreatedLocalTopicPartitions.subsetOf(recreatedTopicPartitions))
+ assertFalse(recreatedLocalTopicPartitions.subsetOf(initialLocalTopicPartitions))
+ assertEquals(Some(topic), recreateBuilder.topicIdToName(recreatedTopicId))
+ assertEquals(Some(recreatedTopicId), recreateBuilder.topicNameToId(topic))
+ assertEquals(recreatedTopicPartitions, filterPartitions(recreateBuilder, recreatedTopicPartitions))
+ assertEquals(initialLocalTopicPartitions, localRemoved(recreateBuilder))
+
+ val recreatedMetadata = recreateBuilder.build()
+ assertEquals(recreatedTopicPartitions, filterPartitions(recreatedMetadata, topic))
+ assertEquals(Some(recreatedTopicId), recreatedMetadata.topicNameToId(topic))
+ assertEquals(Some(topic), recreatedMetadata.topicIdToName(recreatedTopicId))
+ }
+
+ private def localRemoved(
+ builder: MetadataPartitionsBuilder
+ ): Set[TopicPartition] = {
+ builder.localRemoved().toSet[MetadataPartition].map(_.toTopicPartition)
+ }
+
+ private def localChanged(
+ builder: MetadataPartitionsBuilder
+ ): Set[TopicPartition] = {
+ builder.localChanged().toSet[MetadataPartition].map(_.toTopicPartition)
+ }
+
+ private def filterPartitions(
+ metadata: MetadataPartitions,
+ topic: String
+ ): Set[TopicPartition] = {
+ metadata.topicPartitions(topic).map(_.toTopicPartition).toSet
+ }
+
+ private def filterPartitions(
+ builder: MetadataPartitionsBuilder,
+ topicPartitions: Set[TopicPartition]
+ ): Set[TopicPartition] = {
+ topicPartitions.filter { topicPartition =>
+ builder.get(topicPartition.topic, topicPartition.partition).isDefined
+ }
+ }
+
+ private def createTopic(
+ topic: String,
+ numPartitions: Int,
+ builder: MetadataPartitionsBuilder
+ ): Uuid = {
+ val topicId = Uuid.randomUuid()
+ builder.addUuidMapping(topic, topicId)
+
+ (0 until numPartitions).foreach { partition =>
+ builder.set(newPartition(topic, partition))
+ }
+
+ topicId
+ }
+
private def asJavaList(replicas: Iterable[Int]): java.util.List[Integer] = {
replicas.map(Int.box).toList.asJava
}
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
new file mode 100644
index 0000000..545fe48
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.metadata
+
+import java.util.Properties
+
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.log.LogConfig
+import kafka.server.RaftReplicaManager
+import kafka.utils.Implicits._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.metadata.{ConfigRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
+import org.apache.kafka.common.protocol.ApiMessage
+import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+class BrokerMetadataListenerTest {
+
+ private val brokerId = 1
+ private val time = new MockTime()
+ private val configRepository = new CachedConfigRepository
+ private val metadataCache = new RaftMetadataCache(brokerId)
+ private val groupCoordinator = mock(classOf[GroupCoordinator])
+ private val replicaManager = mock(classOf[RaftReplicaManager])
+ private val txnCoordinator = mock(classOf[TransactionCoordinator])
+ private val clientQuotaManager = mock(classOf[ClientQuotaMetadataManager])
+ private var lastMetadataOffset = 0L
+
+ private val listener = new BrokerMetadataListener(
+ brokerId,
+ time,
+ metadataCache,
+ configRepository,
+ groupCoordinator,
+ replicaManager,
+ txnCoordinator,
+ threadNamePrefix = None,
+ clientQuotaManager
+ )
+
+ @Test
+ def testTopicCreationAndDeletion(): Unit = {
+ val topicId = Uuid.randomUuid()
+ val topic = "foo"
+ val numPartitions = 10
+ val config = Map(
+ LogConfig.CleanupPolicyProp -> LogConfig.Compact,
+ LogConfig.MaxCompactionLagMsProp -> "5000"
+ )
+ val localPartitions = createAndAssert(topicId, topic, config, numPartitions, numBrokers = 4)
+ deleteTopic(topicId, topic, numPartitions, localPartitions)
+ }
+
+ private def deleteTopic(
+ topicId: Uuid,
+ topic: String,
+ numPartitions: Int,
+ localPartitions: Set[TopicPartition]
+ ): Unit = {
+ val deleteRecord = new RemoveTopicRecord()
+ .setTopicId(topicId)
+ lastMetadataOffset += 1
+ listener.execCommits(lastOffset = lastMetadataOffset, List[ApiMessage](
+ deleteRecord,
+ ).asJava)
+
+ assertFalse(metadataCache.contains(topic))
+ assertEquals(new Properties, configRepository.topicConfig(topic))
+
+ verify(groupCoordinator).handleDeletedPartitions(ArgumentMatchers.argThat[Seq[TopicPartition]] { partitions =>
+ partitions.toSet == partitionSet(topic, numPartitions)
+ })
+
+ val deleteImageCapture: ArgumentCaptor[MetadataImageBuilder] =
+ ArgumentCaptor.forClass(classOf[MetadataImageBuilder])
+ verify(replicaManager).handleMetadataRecords(
+ deleteImageCapture.capture(),
+ ArgumentMatchers.eq(lastMetadataOffset),
+ any()
+ )
+
+ val deleteImage = deleteImageCapture.getValue
+ assertTrue(deleteImage.hasPartitionChanges)
+ val localRemoved = deleteImage.partitionsBuilder().localRemoved()
+ assertEquals(localPartitions, localRemoved.map(_.toTopicPartition).toSet)
+ }
+
+ private def createAndAssert(
+ topicId: Uuid,
+ topic: String,
+ topicConfig: Map[String, String],
+ numPartitions: Int,
+ numBrokers: Int
+ ): Set[TopicPartition] = {
+ val records = new java.util.ArrayList[ApiMessage]
+ records.add(new TopicRecord()
+ .setName(topic)
+ .setTopicId(topicId)
+ )
+
+ val localTopicPartitions = mutable.Set.empty[TopicPartition]
+ (0 until numPartitions).map { partitionId =>
+ val preferredLeaderId = partitionId % numBrokers
+ val replicas = asJavaList(Seq(
+ preferredLeaderId,
+ preferredLeaderId + 1,
+ preferredLeaderId + 2
+ ))
+
+ if (replicas.contains(brokerId)) {
+ localTopicPartitions.add(new TopicPartition(topic, partitionId))
+ }
+
+ records.add(new PartitionRecord()
+ .setTopicId(topicId)
+ .setPartitionId(partitionId)
+ .setLeader(preferredLeaderId)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(0)
+ .setReplicas(replicas)
+ .setIsr(replicas)
+ )
+ }
+
+ topicConfig.forKeyValue { (key, value) =>
+ records.add(new ConfigRecord()
+ .setResourceName(topic)
+ .setResourceType(ConfigResource.Type.TOPIC.id())
+ .setName(key)
+ .setValue(value)
+ )
+ }
+
+ lastMetadataOffset += records.size()
+ listener.execCommits(lastOffset = lastMetadataOffset, records)
+ assertTrue(metadataCache.contains(topic))
+ assertEquals(Some(numPartitions), metadataCache.numPartitions(topic))
+ assertEquals(topicConfig, configRepository.topicConfig(topic).asScala)
+
+ val imageCapture: ArgumentCaptor[MetadataImageBuilder] =
+ ArgumentCaptor.forClass(classOf[MetadataImageBuilder])
+ verify(replicaManager).handleMetadataRecords(
+ imageCapture.capture(),
+ ArgumentMatchers.eq(lastMetadataOffset),
+ any()
+ )
+
+ val createImage = imageCapture.getValue
+ assertTrue(createImage.hasPartitionChanges)
+ val localChanged = createImage.partitionsBuilder().localChanged()
+ assertEquals(localTopicPartitions, localChanged.map(_.toTopicPartition).toSet)
+
+ localTopicPartitions.toSet
+ }
+
+ private def partitionSet(topic: String, numPartitions: Int): Set[TopicPartition] = {
+ (0 until numPartitions).map(new TopicPartition(topic, _)).toSet
+ }
+
+ private def asJavaList(replicas: Iterable[Int]): java.util.List[Integer] = {
+ replicas.map(Int.box).toList.asJava
+ }
+
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 6ee1b7e..eb73eef 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -790,7 +790,7 @@ public final class QuorumController implements Controller {
snapshotRegistry, sessionTimeoutNs, replicaPlacementPolicy);
this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
this.replicationControl = new ReplicationControlManager(snapshotRegistry,
- logContext, new Random(), defaultReplicationFactor, defaultNumPartitions,
+ logContext, defaultReplicationFactor, defaultNumPartitions,
configurationControl, clusterControl);
this.logManager = logManager;
this.metaLogListener = new QuorumMetaLogListener();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 4a58b3a..66b7374 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -72,7 +72,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
-import java.util.Random;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
@@ -246,11 +245,6 @@ public class ReplicationControlManager {
private final Logger log;
/**
- * The random number generator used by this object.
- */
- private final Random random;
-
- /**
* The KIP-464 default replication factor that is used if a CreateTopics request does
* not specify one.
*/
@@ -289,14 +283,12 @@ public class ReplicationControlManager {
ReplicationControlManager(SnapshotRegistry snapshotRegistry,
LogContext logContext,
- Random random,
short defaultReplicationFactor,
int defaultNumPartitions,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl) {
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(ReplicationControlManager.class);
- this.random = random;
this.defaultReplicationFactor = defaultReplicationFactor;
this.defaultNumPartitions = defaultNumPartitions;
this.configurationControl = configurationControl;
@@ -516,7 +508,7 @@ public class ReplicationControlManager {
" times: " + e.getMessage());
}
}
- Uuid topicId = new Uuid(random.nextLong(), random.nextLong());
+ Uuid topicId = Uuid.randomUuid();
successes.put(topic.name(), new CreatableTopicResult().
setName(topic.name()).
setTopicId(topicId).
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 8b00c10..fa7fc00 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -84,7 +84,6 @@ public class ReplicationControlManagerTest {
new LogContext(), snapshotRegistry, Collections.emptyMap());
final ReplicationControlManager replicationControl = new ReplicationControlManager(snapshotRegistry,
new LogContext(),
- random,
(short) 3,
1,
configurationControl,