You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bo...@apache.org on 2021/05/26 07:01:11 UTC

[kafka] branch trunk updated: KAFKA-12260: Avoid hitting NPE for partitionsFor (#10017)

This is an automated email from the ASF dual-hosted git repository.

boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e4f2f6f  KAFKA-12260: Avoid hitting NPE for partitionsFor (#10017)
e4f2f6f is described below

commit e4f2f6f6e82cafbdea785d53521b96fe062e172d
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Tue May 25 23:59:30 2021 -0700

    KAFKA-12260: Avoid hitting NPE for partitionsFor (#10017)
    
    Remove null pointer from the public partitionsFor API.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  6 +++---
 .../kafka/clients/consumer/MockConsumer.java       |  2 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 22 ++++++++++++++++++++++
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  4 ++--
 .../ConnectorTopicsIntegrationTest.java            | 11 ++++++-----
 5 files changed, 34 insertions(+), 11 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 1f4bc7c..992edc6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1901,7 +1901,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *
      * @param topic The topic to get partition metadata for
      *
-     * @return The list of partitions
+     * @return The list of partitions, which will be empty when the given topic is not found
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
      * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
@@ -1924,7 +1924,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @param topic The topic to get partition metadata for
      * @param timeout The maximum of time to await topic metadata
      *
-     * @return The list of partitions
+     * @return The list of partitions, which will be empty when the given topic is not found
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
      * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
@@ -1948,7 +1948,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             Timer timer = time.timer(timeout);
             Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
                     new MetadataRequest.Builder(Collections.singletonList(topic), metadata.allowAutoTopicCreation()), timer);
-            return topicMetadata.get(topic);
+            return topicMetadata.getOrDefault(topic, Collections.emptyList());
         } finally {
             release();
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 345cdec..ed29afe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -376,7 +376,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     @Override
     public synchronized List<PartitionInfo> partitionsFor(String topic) {
         ensureNotClosed();
-        return this.partitions.get(topic);
+        return this.partitions.getOrDefault(topic, Collections.emptyList());
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index b125a91..59f72cd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1758,6 +1758,28 @@ public class KafkaConsumerTest {
     }
 
     @Test
+    public void testPartitionsForNonExistingTopic() {
+        Time time = new MockTime();
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Cluster cluster = metadata.fetch();
+
+        MetadataResponse updateResponse = RequestTestUtils.metadataResponse(cluster.nodes(),
+            cluster.clusterResource().clusterId(),
+            cluster.controller().id(),
+            Collections.emptyList());
+        client.prepareResponse(updateResponse);
+
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        assertEquals(Collections.emptyList(), consumer.partitionsFor("non-exist-topic"));
+    }
+
+    @Test
     public void testPartitionsForAuthenticationFailure() {
         final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
         assertThrows(AuthenticationException.class, () -> consumer.partitionsFor("some other topic"));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 6e2350f..b1920d5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -175,12 +175,12 @@ public class KafkaBasedLog<K, V> {
         List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
         long started = time.nanoseconds();
         long sleepMs = 100;
-        while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
+        while (partitionInfos.isEmpty() && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
             time.sleep(sleepMs);
             sleepMs = Math.min(2 * sleepMs, MAX_SLEEP_MS);
             partitionInfos = consumer.partitionsFor(topic);
         }
-        if (partitionInfos == null)
+        if (partitionInfos.isEmpty())
             throw new ConnectException("Could not look up partition metadata for offset backing store topic in" +
                     " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
                     " this is your first use of the topic it may have taken too long to create.");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
index 75374a9..8c4e156 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.integration;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@@ -40,7 +41,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -260,10 +260,11 @@ public class ConnectorTopicsIntegrationTest {
         Consumer<byte[], byte[]> verifiableConsumer = connect.kafka().createConsumer(
                 Collections.singletonMap("group.id", "verifiable-consumer-group-0"));
 
-        List<TopicPartition> partitions =
-                Optional.ofNullable(verifiableConsumer.partitionsFor(statusTopic))
-                .orElseThrow(() -> new AssertionError("Unable to retrieve partitions info for status topic"))
-                .stream()
+        List<PartitionInfo> partitionInfos = verifiableConsumer.partitionsFor(statusTopic);
+        if (partitionInfos.isEmpty()) {
+            throw new AssertionError("Unable to retrieve partitions info for status topic");
+        }
+        List<TopicPartition> partitions = partitionInfos.stream()
                 .map(info -> new TopicPartition(info.topic(), info.partition()))
                 .collect(Collectors.toList());
         verifiableConsumer.assign(partitions);