You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/18 10:18:42 UTC

[GitHub] [kafka] dengziming commented on a diff in pull request #12533: KAFKA-14170: Fix NPE in the deleteTopics() code path of KRaft Controller

dengziming commented on code in PR #12533:
URL: https://github.com/apache/kafka/pull/12533#discussion_r948947569


##########
metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java:
##########
@@ -191,7 +191,7 @@ void update(Uuid topicId, int partitionId, int[] prevIsr, int[] nextIsr,
     void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
         Map<Uuid, int[]> topicMap = isrMembers.get(brokerId);
         if (topicMap != null) {
-            if (brokerId == NO_LEADER) {
+            if (brokerId == NO_LEADER && topicMap.containsKey(topicId)) {
                 offlinePartitionCount.set(offlinePartitionCount.get() - topicMap.get(topicId).length);

Review Comment:
   Does this mean the `offlinePartitionCount` is no longer accurate? we don't update `offlinePartitionCount` when we delete a topic.



##########
core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala:
##########
@@ -32,10 +32,45 @@ import org.apache.kafka.common.requests.MetadataResponse
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
+
+import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTopicDeletionClusterHasOfflinePartitions(quorum: String): Unit = {
+    // Create a two topics with one partition/replica. Make one of them offline.
+    val offlineTopic = "topic-1"
+    val onlineTopic = "topic-2"
+    createTopicWithAssignment(offlineTopic, Map[Int, Seq[Int]](0 -> Seq(0)))
+    createTopicWithAssignment(onlineTopic, Map[Int, Seq[Int]](0 -> Seq(1)))
+    killBroker(0)
+    ensureConsistentKRaftMetadata()
+
+    // Ensure one topic partition is offline.
+    TestUtils.waitUntilTrue(() => {
+      aliveBrokers.head.metadataCache.getPartitionInfo(onlineTopic, 0).exists(_.leader() == 1)

Review Comment:
   We miss a "&&" between these 2 expressions.



##########
core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala:
##########
@@ -32,10 +32,45 @@ import org.apache.kafka.common.requests.MetadataResponse
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
+
+import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTopicDeletionClusterHasOfflinePartitions(quorum: String): Unit = {
+    // Create a two topics with one partition/replica. Make one of them offline.
+    val offlineTopic = "topic-1"
+    val onlineTopic = "topic-2"
+    createTopicWithAssignment(offlineTopic, Map[Int, Seq[Int]](0 -> Seq(0)))
+    createTopicWithAssignment(onlineTopic, Map[Int, Seq[Int]](0 -> Seq(1)))
+    killBroker(0)
+    ensureConsistentKRaftMetadata()
+
+    // Ensure one topic partition is offline.
+    TestUtils.waitUntilTrue(() => {
+      aliveBrokers.head.metadataCache.getPartitionInfo(onlineTopic, 0).exists(_.leader() == 1)
+      aliveBrokers.head.metadataCache.getPartitionInfo(offlineTopic, 0).exists(_.leader() == -1)

Review Comment:
   nit: using MetadataResponse.NO_LEADER_ID rather than -1



##########
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala:
##########
@@ -173,11 +173,11 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     adminClientConfig: Properties = new Properties
   ): scala.collection.immutable.Map[Int, Int] = {
     if (isKRaftTest()) {
-      resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin =>
+      resource(createAdminClient(aliveBrokers, listenerName, adminClientConfig)) { admin =>

Review Comment:
   Is this change necessary? I think AdminClient should be robust enough to recognize dead and alive brokers and handle it properly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org