You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/09/09 07:16:23 UTC

[kafka] branch 3.3 updated: KAFKA-14194: Fix NPE in Cluster.nodeIfOnline (#12584)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 1df0220dfd KAFKA-14194: Fix NPE in Cluster.nodeIfOnline (#12584)
1df0220dfd is described below

commit 1df0220dfd83bcfcd859e81e9da491c0e6219db2
Author: Andrew Dean <nd...@gmail.com>
AuthorDate: Mon Sep 5 02:56:23 2022 -0500

    KAFKA-14194: Fix NPE in Cluster.nodeIfOnline (#12584)
    
    When utilizing the rack-aware consumer configuration and rolling updates are being applied to the Kafka brokers the metadata updates can be in a transient state and a given topic-partition can be missing from the metadata. This seems to resolve itself after a bit of time but before it can resolve the `Cluster.nodeIfOnline` method throws an NPE. This patch checks to make sure that a given topic-partition has partition info available before using that partition info.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 clients/src/main/java/org/apache/kafka/common/Cluster.java   |  3 ++-
 .../src/test/java/org/apache/kafka/clients/MetadataTest.java | 12 ++++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 7d3f6f08a0..96e310df9a 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -252,7 +252,8 @@ public final class Cluster {
      */
     public Optional<Node> nodeIfOnline(TopicPartition partition, int id) {
         Node node = nodeById(id);
-        if (node != null && !Arrays.asList(partition(partition).offlineReplicas()).contains(node)) {
+        PartitionInfo partitionInfo = partition(partition);
+        if (node != null && partitionInfo != null && !Arrays.asList(partitionInfo.offlineReplicas()).contains(node)) {
             return Optional.of(node);
         } else {
             return Optional.empty();
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index a4383d8991..e900aa44e7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -791,6 +791,18 @@ public class MetadataTest {
         assertEquals(metadata.fetch().nodeById(1).id(), 1);
     }
 
+    @Test
+    public void testNodeIfOnlineNonExistentTopicPartition() {
+        MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+
+        assertEquals(metadata.fetch().nodeById(0).id(), 0);
+        assertNull(metadata.fetch().partition(tp));
+        assertEquals(metadata.fetch().nodeIfOnline(tp, 0), Optional.empty());
+    }
+
     @Test
     public void testLeaderMetadataInconsistentWithBrokerMetadata() {
         // Tests a reordering scenario which can lead to inconsistent leader state.