You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/03 04:56:00 UTC

[GitHub] [kafka] hachikuji opened a new pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

hachikuji opened a new pull request #10252:
URL: https://github.com/apache/kafka/pull/10252


   This patch implements additional handling logic for `RemoveTopic` records:
   
   - Update `MetadataPartitions` to ensure addition of deleted partitions to `localRemoved` set
   - Ensure topic configs are removed from `ConfigRepository`
   - Propagate deleted partitions to `GroupCoordinator` so that corresponding offset commits can be removed
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r589597256



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -171,25 +202,38 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     val prevPartition = newPartitionMap.put(partition.partitionIndex, partition)
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)

Review comment:
       We depend on changes to the `_localChanged` set in order to receive new leader and isr state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r586643126



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -204,9 +248,10 @@ class MetadataPartitionsBuilder(val brokerId: Int,
   }
 
   def build(): MetadataPartitions = {
-    val result = MetadataPartitions(newNameMap, newIdMap)
+    val result = new MetadataPartitions(newNameMap, newIdMap, newReverseIdMap)
     newNameMap = Collections.unmodifiableMap(newNameMap)

Review comment:
       That's a fair point. Maybe it would be better to raise an exception in `set` after the image had been built.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r587673099



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -483,7 +475,7 @@ private ApiError createTopic(CreatableTopic topic,
                         " times: " + e.getMessage());
             }
         }
-        Uuid topicId = new Uuid(random.nextLong(), random.nextLong());
+        Uuid topicId = Uuid.randomUuid();

Review comment:
       Yeah, I'm aware. But it felt like something that had to be done. The rate of topic creation is typically not high anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r587857619



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -142,28 +145,19 @@ class BrokerMetadataListener(brokerId: Int,
       case e: Exception => throw new RuntimeException("Unknown metadata record type " +
       s"${record.apiKey()} in batch ending at offset ${lastOffset}.")
     }
-    recordType match {
-      case REGISTER_BROKER_RECORD => handleRegisterBrokerRecord(imageBuilder,
-        record.asInstanceOf[RegisterBrokerRecord])
-      case UNREGISTER_BROKER_RECORD => handleUnregisterBrokerRecord(imageBuilder,
-        record.asInstanceOf[UnregisterBrokerRecord])
-      case TOPIC_RECORD => handleTopicRecord(imageBuilder,
-        record.asInstanceOf[TopicRecord])
-      case PARTITION_RECORD => handlePartitionRecord(imageBuilder,
-        record.asInstanceOf[PartitionRecord])
-      case CONFIG_RECORD => handleConfigRecord(record.asInstanceOf[ConfigRecord])
-      case PARTITION_CHANGE_RECORD => handlePartitionChangeRecord(imageBuilder,
-        record.asInstanceOf[PartitionChangeRecord])
-      case FENCE_BROKER_RECORD => handleFenceBrokerRecord(imageBuilder,
-        record.asInstanceOf[FenceBrokerRecord])
-      case UNFENCE_BROKER_RECORD => handleUnfenceBrokerRecord(imageBuilder,
-        record.asInstanceOf[UnfenceBrokerRecord])
-      case REMOVE_TOPIC_RECORD => handleRemoveTopicRecord(imageBuilder,
-        record.asInstanceOf[RemoveTopicRecord])
-      case QUOTA_RECORD => handleQuotaRecord(imageBuilder,
-        record.asInstanceOf[QuotaRecord])
-      // TODO: handle FEATURE_LEVEL_RECORD
-      case _ => throw new RuntimeException(s"Unsupported record type ${recordType}")
+
+    record match {
+      case rec: RegisterBrokerRecord => handleRegisterBrokerRecord(imageBuilder, rec)
+      case rec: UnregisterBrokerRecord => handleUnregisterBrokerRecord(imageBuilder, rec)
+      case rec: FenceBrokerRecord => handleFenceBrokerRecord(imageBuilder, rec)
+      case rec: UnfenceBrokerRecord => handleUnfenceBrokerRecord(imageBuilder, rec)
+      case rec: TopicRecord => handleTopicRecord(imageBuilder, rec)
+      case rec: PartitionRecord => handlePartitionRecord(imageBuilder, rec)

Review comment:
       Ordering assumptions are fair when it comes to the metadata log and the topic record is indeed created before the partition records. Interestingly, the topic config records come before the topic record. Although it seems safe with the current logic, I think a more intuitive order would be:
    
   1. TopicRecord
   2. ConfigRecord(s)
   3. PartitionRecord(s)
   
   Let me file a JIRA about this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r588882359



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -171,25 +202,38 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     val prevPartition = newPartitionMap.put(partition.partitionIndex, partition)
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)

Review comment:
       Is it redundant to add this partition to `_localChanged` if the broker is already the replica of that partition?

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -198,15 +242,18 @@ class MetadataPartitionsBuilder(val brokerId: Int,
           }
           changed.add(newPartitionMap)
           newNameMap.put(topicName, newPartitionMap)
+          prevPartition
         }
       }
+      removedPartition.foreach(maybeAddToLocalRemoved)
     }
   }
 
   def build(): MetadataPartitions = {
-    val result = MetadataPartitions(newNameMap, newIdMap)
+    val result = new MetadataPartitions(newNameMap, newIdMap, newReverseIdMap)

Review comment:
       Is it worth checking consistency of those collections? It can produce quick failure if we are going to build invalid image.

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -171,25 +202,38 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     val prevPartition = newPartitionMap.put(partition.partitionIndex, partition)
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
-    } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
-      _localRemoved.add(prevPartition)
+    } else if (prevPartition != null) {
+      maybeAddToLocalRemoved(prevPartition)
     }
     newNameMap.put(partition.topicName, newPartitionMap)
   }
 
+  private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = {
+    if (partition.isReplicaFor(brokerId)) {
+      val currentTopicId = newReverseIdMap.get(partition.topicName)
+      val prevImageHasTopic = if (currentTopicId != null) {
+        prevImageHasTopicId(currentTopicId)
+      } else {
+        prevPartitions.allTopicNames().contains(partition.topicName)
+      }
+
+      if (prevImageHasTopic) {
+        _localRemoved.add(partition)
+      }
+    }
+  }
+
+  private def prevImageHasTopicId(topicId: Uuid): Boolean = {
+    prevPartitions.topicIdToName(topicId).isDefined
+  }
+
   def remove(topicName: String, partitionId: Int): Unit = {
     val prevPartitionMap = newNameMap.get(topicName)
     if (prevPartitionMap != null) {
-      if (changed.contains(prevPartitionMap)) {
-        val prevPartition = prevPartitionMap.remove(partitionId)
-        if (prevPartition.isReplicaFor(brokerId)) {
-          _localRemoved.add(prevPartition)
-        }
+      val removedPartition = if (changed.contains(prevPartitionMap)) {
+        Option(prevPartitionMap.remove(partitionId))
       } else {
-        Option(prevPartitionMap.get(partitionId)).foreach { prevPartition =>
-          if (prevPartition.isReplicaFor(brokerId)) {
-            _localRemoved.add(prevPartition)
-          }
+        Option(prevPartitionMap.get(partitionId)).map { prevPartition =>
           val newPartitionMap = new util.HashMap[Int, MetadataPartition](prevPartitionMap.size() - 1)
           prevPartitionMap.forEach { (prevPartitionId, prevPartition) =>
             if (!prevPartitionId.equals(partitionId)) {

Review comment:
       As we are in scala code, could it be replaced by `prevPartitionId != partitionId`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r589621057



##########
File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.metadata
+
+import java.util.Properties
+
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.log.LogConfig
+import kafka.server.RaftReplicaManager
+import kafka.utils.Implicits._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.metadata.{ConfigRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
+import org.apache.kafka.common.protocol.ApiMessage
+import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+class BrokerMetadataListenerTest {

Review comment:
       It seems not all messages are covered in this class? If so, is there a jira to complete those test cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r586646515



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -172,23 +203,36 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
     } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {

Review comment:
       Yes, it is possible. In this case, we should not remove the local state, so it does not get added to `_localRemoved`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r586208095



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -124,8 +125,30 @@ class MetadataPartitionsBuilder(val brokerId: Int,
 
   def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
     Option(newIdMap.remove(id)) match {
-      case None => throw new RuntimeException(s"Unable to locate topic with ID $id")
-      case Some(name) => newNameMap.remove(name).values().asScala
+      case None =>
+        throw new RuntimeException(s"Unable to locate topic with ID $id")
+
+      case Some(name) =>
+        newReverseIdMap.remove(name)
+
+        val prevPartitionMap = newNameMap.remove(name)

Review comment:
       Could you add comment to explain why prevPartitionMap is NOT null? `newNameMap` is updated by `PartitionRecord` that it is different to `newReverseIdMap` and `newIdMap`.

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -172,23 +203,36 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
     } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
-      _localRemoved.add(prevPartition)
+      maybeAddToLocalRemoved(prevPartition)
     }
     newNameMap.put(partition.topicName, newPartitionMap)
   }
 
+  private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit= {
+    val currentTopicId = newReverseIdMap.get(partition.topicName)
+    val prevImageContainsTopic = if (currentTopicId != null) {
+      prevPartitions.topicIdToName(currentTopicId).isDefined
+    } else {
+      prevPartitions.allTopicNames().contains(partition.topicName)
+    }
+
+    if (prevImageContainsTopic) {
+      _localRemoved.add(partition)
+    }
+  }
+
   def remove(topicName: String, partitionId: Int): Unit = {
     val prevPartitionMap = newNameMap.get(topicName)
     if (prevPartitionMap != null) {
       if (changed.contains(prevPartitionMap)) {
         val prevPartition = prevPartitionMap.remove(partitionId)
         if (prevPartition.isReplicaFor(brokerId)) {

Review comment:
       Does it need null check? The following code calls `Option(prevPartitionMap.get(partitionId))` so it seems to me the null check is necessary.

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -172,23 +203,36 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
     } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {

Review comment:
       Is it possible that both previous partition (`prevPartition`) and new partition are replica for this broker?

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -243,6 +288,12 @@ case class MetadataPartitions(private val nameMap: util.Map[String, util.Map[Int
     copy
   }
 
+  def copyReverseIdMap(): util.Map[String, Uuid] = {

Review comment:
       How about using `new util.HashMap[String, Uuid](reverseIdMap)`?

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -124,8 +125,30 @@ class MetadataPartitionsBuilder(val brokerId: Int,
 
   def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
     Option(newIdMap.remove(id)) match {
-      case None => throw new RuntimeException(s"Unable to locate topic with ID $id")
-      case Some(name) => newNameMap.remove(name).values().asScala
+      case None =>
+        throw new RuntimeException(s"Unable to locate topic with ID $id")

Review comment:
       It seems there are many similar check in this class. Could we have a helper method to deal with this case?

##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -222,9 +216,16 @@ class BrokerMetadataListener(brokerId: Int,
 
   def handleRemoveTopicRecord(imageBuilder: MetadataImageBuilder,
                               record: RemoveTopicRecord): Unit = {
-    val removedPartitions = imageBuilder.partitionsBuilder().
-      removeTopicById(record.topicId())
-    groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
+    imageBuilder.topicIdToName(record.topicId()) match {
+      case None =>
+        throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId}")
+
+      case Some(topicName) =>
+        info(s"Processing deletion of topic $topicName with id ${record.topicId}")
+        val removedPartitions = imageBuilder.partitionsBuilder().removeTopicById(record.topicId())
+        groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
+        configRepository.remove(new ConfigResource(ConfigResource.Type.TOPIC, topicName))

Review comment:
       It seems to me removing config should be called even if topic id does not exist since we set config without checking existence of topic.

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -124,8 +125,30 @@ class MetadataPartitionsBuilder(val brokerId: Int,
 
   def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
     Option(newIdMap.remove(id)) match {
-      case None => throw new RuntimeException(s"Unable to locate topic with ID $id")
-      case Some(name) => newNameMap.remove(name).values().asScala
+      case None =>
+        throw new RuntimeException(s"Unable to locate topic with ID $id")
+
+      case Some(name) =>
+        newReverseIdMap.remove(name)
+
+        val prevPartitionMap = newNameMap.remove(name)
+        changed.remove(prevPartitionMap)
+
+        val removedPartitions = prevPartitionMap.values
+        if (prevPartitions.topicIdToName(id).isDefined) {
+          removedPartitions.forEach { partition =>
+            if (partition.isReplicaFor(brokerId)) {

Review comment:
       Could we reuse `maybeAddToLocalRemoved` to handle `localRemoved` set?

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -204,9 +248,10 @@ class MetadataPartitionsBuilder(val brokerId: Int,
   }
 
   def build(): MetadataPartitions = {
-    val result = MetadataPartitions(newNameMap, newIdMap)
+    val result = new MetadataPartitions(newNameMap, newIdMap, newReverseIdMap)
     newNameMap = Collections.unmodifiableMap(newNameMap)

Review comment:
       just curious, why we don't wrap values of `newNameMap` to immutable map? It seems `MetadataPartitionsBuilder#set(MetadataPartition)`is able to cause change after making metadata image.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r586693956



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -124,8 +125,30 @@ class MetadataPartitionsBuilder(val brokerId: Int,
 
   def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
     Option(newIdMap.remove(id)) match {
-      case None => throw new RuntimeException(s"Unable to locate topic with ID $id")
-      case Some(name) => newNameMap.remove(name).values().asScala
+      case None =>
+        throw new RuntimeException(s"Unable to locate topic with ID $id")

Review comment:
       There is one similar case in `handleChange`, but it is not removing the topicId. Not sure it is worthwhile adding a helper for a duplicated exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r587126260



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -142,28 +145,19 @@ class BrokerMetadataListener(brokerId: Int,
       case e: Exception => throw new RuntimeException("Unknown metadata record type " +
       s"${record.apiKey()} in batch ending at offset ${lastOffset}.")
     }
-    recordType match {
-      case REGISTER_BROKER_RECORD => handleRegisterBrokerRecord(imageBuilder,
-        record.asInstanceOf[RegisterBrokerRecord])
-      case UNREGISTER_BROKER_RECORD => handleUnregisterBrokerRecord(imageBuilder,
-        record.asInstanceOf[UnregisterBrokerRecord])
-      case TOPIC_RECORD => handleTopicRecord(imageBuilder,
-        record.asInstanceOf[TopicRecord])
-      case PARTITION_RECORD => handlePartitionRecord(imageBuilder,
-        record.asInstanceOf[PartitionRecord])
-      case CONFIG_RECORD => handleConfigRecord(record.asInstanceOf[ConfigRecord])
-      case PARTITION_CHANGE_RECORD => handlePartitionChangeRecord(imageBuilder,
-        record.asInstanceOf[PartitionChangeRecord])
-      case FENCE_BROKER_RECORD => handleFenceBrokerRecord(imageBuilder,
-        record.asInstanceOf[FenceBrokerRecord])
-      case UNFENCE_BROKER_RECORD => handleUnfenceBrokerRecord(imageBuilder,
-        record.asInstanceOf[UnfenceBrokerRecord])
-      case REMOVE_TOPIC_RECORD => handleRemoveTopicRecord(imageBuilder,
-        record.asInstanceOf[RemoveTopicRecord])
-      case QUOTA_RECORD => handleQuotaRecord(imageBuilder,
-        record.asInstanceOf[QuotaRecord])
-      // TODO: handle FEATURE_LEVEL_RECORD

Review comment:
       Is this TODO no longer relevant?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10252:
URL: https://github.com/apache/kafka/pull/10252


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r589627144



##########
File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.metadata
+
+import java.util.Properties
+
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.log.LogConfig
+import kafka.server.RaftReplicaManager
+import kafka.utils.Implicits._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.metadata.{ConfigRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
+import org.apache.kafka.common.protocol.ApiMessage
+import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+class BrokerMetadataListenerTest {

Review comment:
       Good idea. I filed https://issues.apache.org/jira/browse/KAFKA-12437.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r586709124



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -124,8 +125,30 @@ class MetadataPartitionsBuilder(val brokerId: Int,
 
   def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
     Option(newIdMap.remove(id)) match {
-      case None => throw new RuntimeException(s"Unable to locate topic with ID $id")
-      case Some(name) => newNameMap.remove(name).values().asScala
+      case None =>
+        throw new RuntimeException(s"Unable to locate topic with ID $id")
+
+      case Some(name) =>
+        newReverseIdMap.remove(name)
+
+        val prevPartitionMap = newNameMap.remove(name)
+        changed.remove(prevPartitionMap)
+
+        val removedPartitions = prevPartitionMap.values
+        if (prevPartitions.topicIdToName(id).isDefined) {
+          removedPartitions.forEach { partition =>
+            if (partition.isReplicaFor(brokerId)) {

Review comment:
       I couldn't come up with a nice way to do this without avoiding unnecessary/redundant checks. I did pull out a little helper for checking if the previous image has a particular topic id.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#issuecomment-791038017


   Well, I _was_ going to write some integration tests, but it seems we are still awaiting some infrastructure for that. I tested it out manually and it works correctly. I see the brokers deleting the partition data as expected. I tested deletion as well as recreation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r587128413



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -483,7 +475,7 @@ private ApiError createTopic(CreatableTopic topic,
                         " times: " + e.getMessage());
             }
         }
-        Uuid topicId = new Uuid(random.nextLong(), random.nextLong());
+        Uuid topicId = Uuid.randomUuid();

Review comment:
       Note that we have switched from a regular `Random` to a `SecureRandom`. Probably a good idea, but it could have perf implications.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r588524835



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -142,28 +145,19 @@ class BrokerMetadataListener(brokerId: Int,
       case e: Exception => throw new RuntimeException("Unknown metadata record type " +
       s"${record.apiKey()} in batch ending at offset ${lastOffset}.")
     }
-    recordType match {
-      case REGISTER_BROKER_RECORD => handleRegisterBrokerRecord(imageBuilder,
-        record.asInstanceOf[RegisterBrokerRecord])
-      case UNREGISTER_BROKER_RECORD => handleUnregisterBrokerRecord(imageBuilder,
-        record.asInstanceOf[UnregisterBrokerRecord])
-      case TOPIC_RECORD => handleTopicRecord(imageBuilder,
-        record.asInstanceOf[TopicRecord])
-      case PARTITION_RECORD => handlePartitionRecord(imageBuilder,
-        record.asInstanceOf[PartitionRecord])
-      case CONFIG_RECORD => handleConfigRecord(record.asInstanceOf[ConfigRecord])
-      case PARTITION_CHANGE_RECORD => handlePartitionChangeRecord(imageBuilder,
-        record.asInstanceOf[PartitionChangeRecord])
-      case FENCE_BROKER_RECORD => handleFenceBrokerRecord(imageBuilder,
-        record.asInstanceOf[FenceBrokerRecord])
-      case UNFENCE_BROKER_RECORD => handleUnfenceBrokerRecord(imageBuilder,
-        record.asInstanceOf[UnfenceBrokerRecord])
-      case REMOVE_TOPIC_RECORD => handleRemoveTopicRecord(imageBuilder,
-        record.asInstanceOf[RemoveTopicRecord])
-      case QUOTA_RECORD => handleQuotaRecord(imageBuilder,
-        record.asInstanceOf[QuotaRecord])
-      // TODO: handle FEATURE_LEVEL_RECORD
-      case _ => throw new RuntimeException(s"Unsupported record type ${recordType}")
+
+    record match {
+      case rec: RegisterBrokerRecord => handleRegisterBrokerRecord(imageBuilder, rec)
+      case rec: UnregisterBrokerRecord => handleUnregisterBrokerRecord(imageBuilder, rec)
+      case rec: FenceBrokerRecord => handleFenceBrokerRecord(imageBuilder, rec)
+      case rec: UnfenceBrokerRecord => handleUnfenceBrokerRecord(imageBuilder, rec)
+      case rec: TopicRecord => handleTopicRecord(imageBuilder, rec)
+      case rec: PartitionRecord => handlePartitionRecord(imageBuilder, rec)

Review comment:
       FYI: https://issues.apache.org/jira/browse/KAFKA-12433




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r588878150



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -171,25 +200,38 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     val prevPartition = newPartitionMap.put(partition.partitionIndex, partition)
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
-    } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
-      _localRemoved.add(prevPartition)
+    } else if (prevPartition != null) {
+      maybeAddToLocalRemoved(prevPartition)
     }
     newNameMap.put(partition.topicName, newPartitionMap)
   }
 
+  private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = {
+    if (partition.isReplicaFor(brokerId)) {
+      val currentTopicId = newReverseIdMap.get(partition.topicName)

Review comment:
       Thanks for nice explanation. I have another question for this check. It seems to me three collections in `prevPartitions` should be consistent. For example: a topic which exists one of collection should also exists in other two (vice versa). If above comment is right, why we need this `if-else`? Calling `prevPartitions.contains(partition.topicName)` appears to be enough?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r586695760



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -124,8 +125,30 @@ class MetadataPartitionsBuilder(val brokerId: Int,
 
   def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
     Option(newIdMap.remove(id)) match {
-      case None => throw new RuntimeException(s"Unable to locate topic with ID $id")
-      case Some(name) => newNameMap.remove(name).values().asScala
+      case None =>
+        throw new RuntimeException(s"Unable to locate topic with ID $id")
+
+      case Some(name) =>
+        newReverseIdMap.remove(name)
+
+        val prevPartitionMap = newNameMap.remove(name)

Review comment:
       Maybe we can just be a little defensive here and check it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r589600950



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -198,15 +242,18 @@ class MetadataPartitionsBuilder(val brokerId: Int,
           }
           changed.add(newPartitionMap)
           newNameMap.put(topicName, newPartitionMap)
+          prevPartition
         }
       }
+      removedPartition.foreach(maybeAddToLocalRemoved)
     }
   }
 
   def build(): MetadataPartitions = {
-    val result = MetadataPartitions(newNameMap, newIdMap)
+    val result = new MetadataPartitions(newNameMap, newIdMap, newReverseIdMap)

Review comment:
       I would say yes if there was a cheap way to do it. However, I don't think it's unreasonable to depend on internally maintained invariants here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r587205708



##########
File path: core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
##########
@@ -185,6 +184,134 @@ class MetadataPartitionsTest {
     assertEquals(2, updatedLeader.leaderId)
   }
 
+  @Test
+  def testTopicCreateAndDelete(): Unit = {
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet
+    val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, builder)
+    val localTopicPartitions = localChanged(builder)
+
+    assertTrue(localTopicPartitions.forall(topicPartitions.contains))

Review comment:
       How about `assertTrue(localTopicPartitions.subsetOf(topicPartitions))` (same to other test cases)?

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -171,25 +200,38 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     val prevPartition = newPartitionMap.put(partition.partitionIndex, partition)
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
-    } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
-      _localRemoved.add(prevPartition)
+    } else if (prevPartition != null) {
+      maybeAddToLocalRemoved(prevPartition)
     }
     newNameMap.put(partition.topicName, newPartitionMap)
   }
 
+  private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = {
+    if (partition.isReplicaFor(brokerId)) {
+      val currentTopicId = newReverseIdMap.get(partition.topicName)

Review comment:
       Is it possible that `newReverseIdMap` has no related id? For example, `PartitionRecord` is processed before `TopicRecord` or `TopicRecord` was discarded (due to error)?

##########
File path: core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
##########
@@ -185,6 +184,134 @@ class MetadataPartitionsTest {
     assertEquals(2, updatedLeader.leaderId)
   }
 
+  @Test
+  def testTopicCreateAndDelete(): Unit = {
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet
+    val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, builder)
+    val localTopicPartitions = localChanged(builder)
+
+    assertTrue(localTopicPartitions.forall(topicPartitions.contains))
+    assertTrue(localTopicPartitions.nonEmpty)
+    assertNotEquals(topicPartitions, localTopicPartitions)
+
+    builder.removeTopicById(topicId)
+    assertEquals(None, builder.topicIdToName(topicId))
+    assertEquals(Set.empty, filterPartitions(builder, topicPartitions))
+    assertEquals(Set.empty, localRemoved(builder))
+    assertEquals(Set.empty, localChanged(builder))
+
+    val metadata = builder.build()
+    assertEquals(Set.empty, metadata.allTopicNames())
+    assertEquals(None, metadata.topicIdToName(topicId))
+    assertEquals(Set.empty, metadata.topicPartitions(topic).toSet)
+  }
+
+  @Test
+  def testTopicRemoval(): Unit = {
+    val brokerId = 0
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet
+    val createBuilder = new MetadataPartitionsBuilder(brokerId, emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, createBuilder)
+    val localTopicPartitions = localChanged(createBuilder)
+    val createMetadata = createBuilder.build()
+
+    assertTrue(localTopicPartitions.subsetOf(topicPartitions))
+    assertTrue(localTopicPartitions.nonEmpty)
+    assertNotEquals(topicPartitions, localTopicPartitions)
+
+    val deleteBuilder = new MetadataPartitionsBuilder(brokerId = 0, createMetadata)
+    deleteBuilder.removeTopicById(topicId)
+    assertEquals(None, deleteBuilder.topicIdToName(topicId))
+    assertEquals(Set.empty, filterPartitions(deleteBuilder, topicPartitions))
+    assertEquals(localTopicPartitions, localRemoved(deleteBuilder))
+    assertEquals(Set.empty, localChanged(deleteBuilder))
+
+    val deleteMetadata = deleteBuilder.build()
+    assertEquals(Set.empty, deleteMetadata.allTopicNames())
+    assertEquals(None, deleteMetadata.topicIdToName(topicId))

Review comment:
       ditto. Could you check that `reverseIdMap` should has no id related to "topic"?

##########
File path: core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
##########
@@ -185,6 +184,134 @@ class MetadataPartitionsTest {
     assertEquals(2, updatedLeader.leaderId)
   }
 
+  @Test
+  def testTopicCreateAndDelete(): Unit = {
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet
+    val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, builder)
+    val localTopicPartitions = localChanged(builder)
+
+    assertTrue(localTopicPartitions.forall(topicPartitions.contains))
+    assertTrue(localTopicPartitions.nonEmpty)
+    assertNotEquals(topicPartitions, localTopicPartitions)
+
+    builder.removeTopicById(topicId)
+    assertEquals(None, builder.topicIdToName(topicId))
+    assertEquals(Set.empty, filterPartitions(builder, topicPartitions))
+    assertEquals(Set.empty, localRemoved(builder))
+    assertEquals(Set.empty, localChanged(builder))
+
+    val metadata = builder.build()
+    assertEquals(Set.empty, metadata.allTopicNames())
+    assertEquals(None, metadata.topicIdToName(topicId))
+    assertEquals(Set.empty, metadata.topicPartitions(topic).toSet)
+  }
+
+  @Test
+  def testTopicRemoval(): Unit = {
+    val brokerId = 0
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet
+    val createBuilder = new MetadataPartitionsBuilder(brokerId, emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, createBuilder)
+    val localTopicPartitions = localChanged(createBuilder)
+    val createMetadata = createBuilder.build()
+
+    assertTrue(localTopicPartitions.subsetOf(topicPartitions))
+    assertTrue(localTopicPartitions.nonEmpty)
+    assertNotEquals(topicPartitions, localTopicPartitions)
+
+    val deleteBuilder = new MetadataPartitionsBuilder(brokerId = 0, createMetadata)
+    deleteBuilder.removeTopicById(topicId)
+    assertEquals(None, deleteBuilder.topicIdToName(topicId))
+    assertEquals(Set.empty, filterPartitions(deleteBuilder, topicPartitions))
+    assertEquals(localTopicPartitions, localRemoved(deleteBuilder))
+    assertEquals(Set.empty, localChanged(deleteBuilder))
+
+    val deleteMetadata = deleteBuilder.build()
+    assertEquals(Set.empty, deleteMetadata.allTopicNames())
+    assertEquals(None, deleteMetadata.topicIdToName(topicId))
+    assertEquals(Set.empty, deleteMetadata.topicPartitions(topic).toSet)
+  }
+
+  @Test
+  def testTopicDeleteAndRecreate(): Unit = {
+    val topic = "foo"
+    val numPartitions = 3
+    val initialBuilder = new MetadataPartitionsBuilder(0, emptyPartitions)
+    val initialTopicId = createTopic(topic, numPartitions, initialBuilder)
+    val initialLocalTopicPartitions = initialBuilder.localChanged().map(_.toTopicPartition).toSet
+    val initialMetadata = initialBuilder.build()
+
+    val recreateBuilder = new MetadataPartitionsBuilder(brokerId = 0, initialMetadata)
+    recreateBuilder.removeTopicById(initialTopicId)
+    assertEquals(initialLocalTopicPartitions, localRemoved(recreateBuilder))
+
+    val recreatedNumPartitions = 10
+    val recreatedTopicId = createTopic(topic, recreatedNumPartitions, recreateBuilder)
+    val recreatedTopicPartitions = (0 until recreatedNumPartitions).map(new TopicPartition(topic, _)).toSet
+    val recreatedLocalTopicPartitions = localChanged(recreateBuilder)
+
+    assertTrue(recreatedLocalTopicPartitions.nonEmpty)
+    assertNotEquals(recreatedLocalTopicPartitions, recreatedTopicPartitions)
+    assertTrue(recreatedLocalTopicPartitions.subsetOf(recreatedTopicPartitions))
+    assertFalse(recreatedLocalTopicPartitions.subsetOf(initialLocalTopicPartitions))
+    assertEquals(Some(topic), recreateBuilder.topicIdToName(recreatedTopicId))

Review comment:
       Could you check whether the id related to "topic" is changed to "recreatedTopicId"?

##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -142,28 +145,19 @@ class BrokerMetadataListener(brokerId: Int,
       case e: Exception => throw new RuntimeException("Unknown metadata record type " +
       s"${record.apiKey()} in batch ending at offset ${lastOffset}.")
     }
-    recordType match {
-      case REGISTER_BROKER_RECORD => handleRegisterBrokerRecord(imageBuilder,
-        record.asInstanceOf[RegisterBrokerRecord])
-      case UNREGISTER_BROKER_RECORD => handleUnregisterBrokerRecord(imageBuilder,
-        record.asInstanceOf[UnregisterBrokerRecord])
-      case TOPIC_RECORD => handleTopicRecord(imageBuilder,
-        record.asInstanceOf[TopicRecord])
-      case PARTITION_RECORD => handlePartitionRecord(imageBuilder,
-        record.asInstanceOf[PartitionRecord])
-      case CONFIG_RECORD => handleConfigRecord(record.asInstanceOf[ConfigRecord])
-      case PARTITION_CHANGE_RECORD => handlePartitionChangeRecord(imageBuilder,
-        record.asInstanceOf[PartitionChangeRecord])
-      case FENCE_BROKER_RECORD => handleFenceBrokerRecord(imageBuilder,
-        record.asInstanceOf[FenceBrokerRecord])
-      case UNFENCE_BROKER_RECORD => handleUnfenceBrokerRecord(imageBuilder,
-        record.asInstanceOf[UnfenceBrokerRecord])
-      case REMOVE_TOPIC_RECORD => handleRemoveTopicRecord(imageBuilder,
-        record.asInstanceOf[RemoveTopicRecord])
-      case QUOTA_RECORD => handleQuotaRecord(imageBuilder,
-        record.asInstanceOf[QuotaRecord])
-      // TODO: handle FEATURE_LEVEL_RECORD
-      case _ => throw new RuntimeException(s"Unsupported record type ${recordType}")
+
+    record match {
+      case rec: RegisterBrokerRecord => handleRegisterBrokerRecord(imageBuilder, rec)
+      case rec: UnregisterBrokerRecord => handleUnregisterBrokerRecord(imageBuilder, rec)
+      case rec: FenceBrokerRecord => handleFenceBrokerRecord(imageBuilder, rec)
+      case rec: UnfenceBrokerRecord => handleUnfenceBrokerRecord(imageBuilder, rec)
+      case rec: TopicRecord => handleTopicRecord(imageBuilder, rec)
+      case rec: PartitionRecord => handlePartitionRecord(imageBuilder, rec)

Review comment:
       Do we "assume" `PartitionRecord` is processed after `TopicRecord`? [BrokerMetadataListener#handlePartitionRecord](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala#L188) check existence of topic id first. It can throw exception if `PartitionRecord` is processed "before" `TopicRecord` (previous image has no related data as it is new topic).
   
   On the other hand, [RaftMetadataCache#updateMetadata](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala#L364) sets partition info before updating topic info.

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -171,25 +200,38 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     val prevPartition = newPartitionMap.put(partition.partitionIndex, partition)
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
-    } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
-      _localRemoved.add(prevPartition)
+    } else if (prevPartition != null) {
+      maybeAddToLocalRemoved(prevPartition)
     }
     newNameMap.put(partition.topicName, newPartitionMap)
   }
 
+  private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = {
+    if (partition.isReplicaFor(brokerId)) {
+      val currentTopicId = newReverseIdMap.get(partition.topicName)

Review comment:
       For another, could it be replaced by `prevPartitions.contains(partition.topicName)`? It seems all we want to check is the existence of topic name in previous image.

##########
File path: core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
##########
@@ -185,6 +184,134 @@ class MetadataPartitionsTest {
     assertEquals(2, updatedLeader.leaderId)
   }
 
+  @Test
+  def testTopicCreateAndDelete(): Unit = {
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet
+    val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, builder)
+    val localTopicPartitions = localChanged(builder)
+
+    assertTrue(localTopicPartitions.forall(topicPartitions.contains))
+    assertTrue(localTopicPartitions.nonEmpty)
+    assertNotEquals(topicPartitions, localTopicPartitions)
+
+    builder.removeTopicById(topicId)
+    assertEquals(None, builder.topicIdToName(topicId))
+    assertEquals(Set.empty, filterPartitions(builder, topicPartitions))
+    assertEquals(Set.empty, localRemoved(builder))
+    assertEquals(Set.empty, localChanged(builder))
+
+    val metadata = builder.build()
+    assertEquals(Set.empty, metadata.allTopicNames())
+    assertEquals(None, metadata.topicIdToName(topicId))

Review comment:
       Could you check `reverseIdMap` also? It should return `none`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r587677099



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -483,7 +475,7 @@ private ApiError createTopic(CreatableTopic topic,
                         " times: " + e.getMessage());
             }
         }
-        Uuid topicId = new Uuid(random.nextLong(), random.nextLong());
+        Uuid topicId = Uuid.randomUuid();

Review comment:
       Can we note this in the PR description?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#issuecomment-791027372


   I am working on some integration tests here. Hopefully then we can wrap this up.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r586724166



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -172,23 +203,36 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
     } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
-      _localRemoved.add(prevPartition)
+      maybeAddToLocalRemoved(prevPartition)
     }
     newNameMap.put(partition.topicName, newPartitionMap)
   }
 
+  private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit= {
+    val currentTopicId = newReverseIdMap.get(partition.topicName)
+    val prevImageContainsTopic = if (currentTopicId != null) {
+      prevPartitions.topicIdToName(currentTopicId).isDefined
+    } else {
+      prevPartitions.allTopicNames().contains(partition.topicName)
+    }
+
+    if (prevImageContainsTopic) {
+      _localRemoved.add(partition)
+    }
+  }
+
   def remove(topicName: String, partitionId: Int): Unit = {
     val prevPartitionMap = newNameMap.get(topicName)
     if (prevPartitionMap != null) {
       if (changed.contains(prevPartitionMap)) {
         val prevPartition = prevPartitionMap.remove(partitionId)
         if (prevPartition.isReplicaFor(brokerId)) {

Review comment:
       Yeah, good point. Looking at how it is used in `RaftMetadataCache.updateMetadata`, I don't see how we can assume the partition previously existed. There are really no ordering guarantees that we can get for that case. I find it pretty confusing that we try to let this class seamlessly handle metadata updates from both the zk and raft controllers. I would really rather get rid of `RaftMetadataCache.updateMetadata` altogether and let this class be only for the raft controller.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r587850882



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -171,25 +200,38 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     val prevPartition = newPartitionMap.put(partition.partitionIndex, partition)
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
-    } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
-      _localRemoved.add(prevPartition)
+    } else if (prevPartition != null) {
+      maybeAddToLocalRemoved(prevPartition)
     }
     newNameMap.put(partition.topicName, newPartitionMap)
   }
 
+  private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = {
+    if (partition.isReplicaFor(brokerId)) {
+      val currentTopicId = newReverseIdMap.get(partition.topicName)

Review comment:
       The intent is to only return the change in `_localRemoved` if the topic existed in the previous image. If we only check topic name, then successive deletions and recreations might leave some partitions in `_localRemoved` that were not in the previous image.
   
   It's worth noting that this is strictly more defensive than the current replay logic requires. A new image is built for each batch of records from the controller, and we would never see a topic deleted and recreated (or vice versa) in the same batch. This is an implicit contract though and not protected by the builder API, so I thought we might as well try to make the logic more resilient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r586724166



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -172,23 +203,36 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
     } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
-      _localRemoved.add(prevPartition)
+      maybeAddToLocalRemoved(prevPartition)
     }
     newNameMap.put(partition.topicName, newPartitionMap)
   }
 
+  private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit= {
+    val currentTopicId = newReverseIdMap.get(partition.topicName)
+    val prevImageContainsTopic = if (currentTopicId != null) {
+      prevPartitions.topicIdToName(currentTopicId).isDefined
+    } else {
+      prevPartitions.allTopicNames().contains(partition.topicName)
+    }
+
+    if (prevImageContainsTopic) {
+      _localRemoved.add(partition)
+    }
+  }
+
   def remove(topicName: String, partitionId: Int): Unit = {
     val prevPartitionMap = newNameMap.get(topicName)
     if (prevPartitionMap != null) {
       if (changed.contains(prevPartitionMap)) {
         val prevPartition = prevPartitionMap.remove(partitionId)
         if (prevPartition.isReplicaFor(brokerId)) {

Review comment:
       Yeah, good point. Looking at how it is used in `RaftMetadataCache.updateMetadata`, I don't see how we can assume the partition previously existed. There are really no ordering guarantees that we can get for that case. I find it pretty confusing that we try to let this class seamlessly handle metadata updates from both the zk and raft controllers. I would really rather get rid of `RaftMetadataCache.updateMetadata` altogether.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r586728427



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -222,9 +216,16 @@ class BrokerMetadataListener(brokerId: Int,
 
   def handleRemoveTopicRecord(imageBuilder: MetadataImageBuilder,
                               record: RemoveTopicRecord): Unit = {
-    val removedPartitions = imageBuilder.partitionsBuilder().
-      removeTopicById(record.topicId())
-    groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
+    imageBuilder.topicIdToName(record.topicId()) match {
+      case None =>
+        throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId}")
+
+      case Some(topicName) =>
+        info(s"Processing deletion of topic $topicName with id ${record.topicId}")
+        val removedPartitions = imageBuilder.partitionsBuilder().removeTopicById(record.topicId())
+        groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
+        configRepository.remove(new ConfigResource(ConfigResource.Type.TOPIC, topicName))

Review comment:
       I see what you mean. The `RemoveTopic` schema does not include the topic name, so our hands are tied here at the moment. The `Config` schema, on the other hand, uses topic name, which is a consequence of the way that we treat configs. Probably `RemoveTopic` should also indicate the topic name. I will file a separate JIRA to think about this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org