You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/09/09 07:16:23 UTC
[kafka] branch 3.3 updated: KAFKA-14194: Fix NPE in Cluster.nodeIfOnline (#12584)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 1df0220dfd KAFKA-14194: Fix NPE in Cluster.nodeIfOnline (#12584)
1df0220dfd is described below
commit 1df0220dfd83bcfcd859e81e9da491c0e6219db2
Author: Andrew Dean <nd...@gmail.com>
AuthorDate: Mon Sep 5 02:56:23 2022 -0500
KAFKA-14194: Fix NPE in Cluster.nodeIfOnline (#12584)
When utilizing the rack-aware consumer configuration and rolling updates are being applied to the Kafka brokers the metadata updates can be in a transient state and a given topic-partition can be missing from the metadata. This seems to resolve itself after a bit of time but before it can resolve the `Cluster.nodeIfOnline` method throws an NPE. This patch checks to make sure that a given topic-partition has partition info available before using that partition info.
Reviewers: David Jacot <dj...@confluent.io>
---
clients/src/main/java/org/apache/kafka/common/Cluster.java | 3 ++-
.../src/test/java/org/apache/kafka/clients/MetadataTest.java | 12 ++++++++++++
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 7d3f6f08a0..96e310df9a 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -252,7 +252,8 @@ public final class Cluster {
*/
public Optional<Node> nodeIfOnline(TopicPartition partition, int id) {
Node node = nodeById(id);
- if (node != null && !Arrays.asList(partition(partition).offlineReplicas()).contains(node)) {
+ PartitionInfo partitionInfo = partition(partition);
+ if (node != null && partitionInfo != null && !Arrays.asList(partitionInfo.offlineReplicas()).contains(node)) {
return Optional.of(node);
} else {
return Optional.empty();
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index a4383d8991..e900aa44e7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -791,6 +791,18 @@ public class MetadataTest {
assertEquals(metadata.fetch().nodeById(1).id(), 1);
}
+ @Test
+ public void testNodeIfOnlineNonExistentTopicPartition() {
+ MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+ TopicPartition tp = new TopicPartition("topic-1", 0);
+
+ assertEquals(metadata.fetch().nodeById(0).id(), 0);
+ assertNull(metadata.fetch().partition(tp));
+ assertEquals(metadata.fetch().nodeIfOnline(tp, 0), Optional.empty());
+ }
+
@Test
public void testLeaderMetadataInconsistentWithBrokerMetadata() {
// Tests a reordering scenario which can lead to inconsistent leader state.