You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/31 21:35:39 UTC
[kafka] branch 3.3 updated: KAFKA-14170: Fix NPE in the deleteTopics() code path of KRaft Controller (#12533)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new c4fd780bd3 KAFKA-14170: Fix NPE in the deleteTopics() code path of KRaft Controller (#12533)
c4fd780bd3 is described below
commit c4fd780bd351f709017a4e338c354a4ccf8f9adb
Author: Akhilesh C <ak...@users.noreply.github.com>
AuthorDate: Thu Aug 18 17:14:17 2022 -0700
KAFKA-14170: Fix NPE in the deleteTopics() code path of KRaft Controller (#12533)
Fix a bug in ReplicationControlManager where we got a NullPointerException when removing a topic
with no offline replicas, and there were other topics that did have offline replicas.
Fix an issue in MetadataDelta#replay where we were replaying RemoveTopicRecord twice.
Reviewers: Colin P. McCabe <cm...@apache.org>, dengziming <de...@gmail.com>
---
.../kafka/integration/KafkaServerTestHarness.scala | 2 +-
.../kafka/server/DeleteTopicsRequestTest.scala | 36 ++++++++++++++++++++++
.../org/apache/kafka/controller/BrokersToIsrs.java | 2 +-
.../java/org/apache/kafka/image/MetadataDelta.java | 3 +-
4 files changed, 39 insertions(+), 4 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 4322650780..fe1922cc2b 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -232,7 +232,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
TestUtils.deleteTopicWithAdmin(
admin = admin,
topic = topic,
- brokers = brokers)
+ brokers = aliveBrokers)
}
} else {
adminZkClient.deleteTopic(topic)
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 644f21ff3f..629f203169 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -32,10 +32,46 @@ import org.apache.kafka.common.requests.MetadataResponse
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+
+import scala.collection.Seq
import scala.jdk.CollectionConverters._
class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testTopicDeletionClusterHasOfflinePartitions(quorum: String): Unit = {
+ // Create a two topics with one partition/replica. Make one of them offline.
+ val offlineTopic = "topic-1"
+ val onlineTopic = "topic-2"
+ createTopicWithAssignment(offlineTopic, Map[Int, Seq[Int]](0 -> Seq(0)))
+ createTopicWithAssignment(onlineTopic, Map[Int, Seq[Int]](0 -> Seq(1)))
+ killBroker(0)
+ ensureConsistentKRaftMetadata()
+
+ // Ensure one topic partition is offline.
+ TestUtils.waitUntilTrue(() => {
+ aliveBrokers.head.metadataCache.getPartitionInfo(onlineTopic, 0).exists(_.leader() == 1) &&
+ aliveBrokers.head.metadataCache.getPartitionInfo(offlineTopic, 0).exists(_.leader() ==
+ MetadataResponse.NO_LEADER_ID)
+ }, "Topic partition is not offline")
+
+ // Delete the newly created topic and topic with offline partition. See the deletion is
+ // successful.
+ deleteTopic(onlineTopic)
+ deleteTopic(offlineTopic)
+ ensureConsistentKRaftMetadata()
+
+ // Restart the dead broker.
+ restartDeadBrokers()
+
+ // Make sure the brokers no longer see any deleted topics.
+ TestUtils.waitUntilTrue(() =>
+ !aliveBrokers.forall(_.metadataCache.contains(onlineTopic)) &&
+ !aliveBrokers.forall(_.metadataCache.contains(offlineTopic)),
+ "The topics are found in the Broker's cache")
+ }
+
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testValidDeleteTopicRequests(quorum: String): Unit = {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index d12e663406..ec48cbc57a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -191,7 +191,7 @@ public class BrokersToIsrs {
void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
Map<Uuid, int[]> topicMap = isrMembers.get(brokerId);
if (topicMap != null) {
- if (brokerId == NO_LEADER) {
+ if (brokerId == NO_LEADER && topicMap.containsKey(topicId)) {
offlinePartitionCount.set(offlinePartitionCount.get() - topicMap.get(topicId).length);
}
topicMap.remove(topicId);
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index 25e141ea0d..d12885ce7b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -257,8 +257,7 @@ public final class MetadataDelta {
}
public void replay(RemoveTopicRecord record) {
- getOrCreateTopicsDelta().replay(record);
- String topicName = topicsDelta.replay(record);
+ String topicName = getOrCreateTopicsDelta().replay(record);
getOrCreateConfigsDelta().replay(record, topicName);
}