You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/31 21:35:39 UTC

[kafka] branch 3.3 updated: KAFKA-14170: Fix NPE in the deleteTopics() code path of KRaft Controller (#12533)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new c4fd780bd3 KAFKA-14170: Fix NPE in the deleteTopics() code path of KRaft Controller (#12533)
c4fd780bd3 is described below

commit c4fd780bd351f709017a4e338c354a4ccf8f9adb
Author: Akhilesh C <ak...@users.noreply.github.com>
AuthorDate: Thu Aug 18 17:14:17 2022 -0700

    KAFKA-14170: Fix NPE in the deleteTopics() code path of KRaft Controller (#12533)
    
    Fix a bug in ReplicationControlManager where we got a NullPointerException when removing a topic
    with no offline replicas, and there were other topics that did have offline replicas.
    
    Fix an issue in MetadataDelta#replay where we were replaying RemoveTopicRecord twice.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, dengziming <de...@gmail.com>
---
 .../kafka/integration/KafkaServerTestHarness.scala |  2 +-
 .../kafka/server/DeleteTopicsRequestTest.scala     | 36 ++++++++++++++++++++++
 .../org/apache/kafka/controller/BrokersToIsrs.java |  2 +-
 .../java/org/apache/kafka/image/MetadataDelta.java |  3 +-
 4 files changed, 39 insertions(+), 4 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 4322650780..fe1922cc2b 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -232,7 +232,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
         TestUtils.deleteTopicWithAdmin(
           admin = admin,
           topic = topic,
-          brokers = brokers)
+          brokers = aliveBrokers)
       }
     } else {
       adminZkClient.deleteTopic(topic)
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 644f21ff3f..629f203169 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -32,10 +32,46 @@ import org.apache.kafka.common.requests.MetadataResponse
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
+
+import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTopicDeletionClusterHasOfflinePartitions(quorum: String): Unit = {
+    // Create a two topics with one partition/replica. Make one of them offline.
+    val offlineTopic = "topic-1"
+    val onlineTopic = "topic-2"
+    createTopicWithAssignment(offlineTopic, Map[Int, Seq[Int]](0 -> Seq(0)))
+    createTopicWithAssignment(onlineTopic, Map[Int, Seq[Int]](0 -> Seq(1)))
+    killBroker(0)
+    ensureConsistentKRaftMetadata()
+
+    // Ensure one topic partition is offline.
+    TestUtils.waitUntilTrue(() => {
+      aliveBrokers.head.metadataCache.getPartitionInfo(onlineTopic, 0).exists(_.leader() == 1) &&
+        aliveBrokers.head.metadataCache.getPartitionInfo(offlineTopic, 0).exists(_.leader() ==
+          MetadataResponse.NO_LEADER_ID)
+    }, "Topic partition is not offline")
+
+    // Delete the newly created topic and topic with offline partition. See the deletion is
+    // successful.
+    deleteTopic(onlineTopic)
+    deleteTopic(offlineTopic)
+    ensureConsistentKRaftMetadata()
+
+    // Restart the dead broker.
+    restartDeadBrokers()
+
+    // Make sure the brokers no longer see any deleted topics.
+    TestUtils.waitUntilTrue(() =>
+      !aliveBrokers.forall(_.metadataCache.contains(onlineTopic)) &&
+        !aliveBrokers.forall(_.metadataCache.contains(offlineTopic)),
+      "The topics are found in the Broker's cache")
+  }
+
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testValidDeleteTopicRequests(quorum: String): Unit = {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index d12e663406..ec48cbc57a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -191,7 +191,7 @@ public class BrokersToIsrs {
     void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
         Map<Uuid, int[]> topicMap = isrMembers.get(brokerId);
         if (topicMap != null) {
-            if (brokerId == NO_LEADER) {
+            if (brokerId == NO_LEADER && topicMap.containsKey(topicId)) {
                 offlinePartitionCount.set(offlinePartitionCount.get() - topicMap.get(topicId).length);
             }
             topicMap.remove(topicId);
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index 25e141ea0d..d12885ce7b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -257,8 +257,7 @@ public final class MetadataDelta {
     }
 
     public void replay(RemoveTopicRecord record) {
-        getOrCreateTopicsDelta().replay(record);
-        String topicName = topicsDelta.replay(record);
+        String topicName = getOrCreateTopicsDelta().replay(record);
         getOrCreateConfigsDelta().replay(record, topicName);
     }