You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bo...@apache.org on 2021/05/26 07:01:11 UTC
[kafka] branch trunk updated: KAFKA-12260: Avoid hitting NPE for
partitionsFor (#10017)
This is an automated email from the ASF dual-hosted git repository.
boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e4f2f6f KAFKA-12260: Avoid hitting NPE for partitionsFor (#10017)
e4f2f6f is described below
commit e4f2f6f6e82cafbdea785d53521b96fe062e172d
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Tue May 25 23:59:30 2021 -0700
KAFKA-12260: Avoid hitting NPE for partitionsFor (#10017)
Remove null pointer from the public partitionsFor API.
Reviewers: Chia-Ping Tsai <ch...@gmail.com>
---
.../kafka/clients/consumer/KafkaConsumer.java | 6 +++---
.../kafka/clients/consumer/MockConsumer.java | 2 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 22 ++++++++++++++++++++++
.../apache/kafka/connect/util/KafkaBasedLog.java | 4 ++--
.../ConnectorTopicsIntegrationTest.java | 11 ++++++-----
5 files changed, 34 insertions(+), 11 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 1f4bc7c..992edc6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1901,7 +1901,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param topic The topic to get partition metadata for
*
- * @return The list of partitions
+ * @return The list of partitions, which will be empty when the given topic is not found
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
@@ -1924,7 +1924,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param topic The topic to get partition metadata for
* @param timeout The maximum of time to await topic metadata
*
- * @return The list of partitions
+ * @return The list of partitions, which will be empty when the given topic is not found
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
@@ -1948,7 +1948,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
Timer timer = time.timer(timeout);
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topic), metadata.allowAutoTopicCreation()), timer);
- return topicMetadata.get(topic);
+ return topicMetadata.getOrDefault(topic, Collections.emptyList());
} finally {
release();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 345cdec..ed29afe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -376,7 +376,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
@Override
public synchronized List<PartitionInfo> partitionsFor(String topic) {
ensureNotClosed();
- return this.partitions.get(topic);
+ return this.partitions.getOrDefault(topic, Collections.emptyList());
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index b125a91..59f72cd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1758,6 +1758,28 @@ public class KafkaConsumerTest {
}
@Test
+ public void testPartitionsForNonExistingTopic() {
+ Time time = new MockTime();
+ SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Cluster cluster = metadata.fetch();
+
+ MetadataResponse updateResponse = RequestTestUtils.metadataResponse(cluster.nodes(),
+ cluster.clusterResource().clusterId(),
+ cluster.controller().id(),
+ Collections.emptyList());
+ client.prepareResponse(updateResponse);
+
+ ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ assertEquals(Collections.emptyList(), consumer.partitionsFor("non-exist-topic"));
+ }
+
+ @Test
public void testPartitionsForAuthenticationFailure() {
final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
assertThrows(AuthenticationException.class, () -> consumer.partitionsFor("some other topic"));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 6e2350f..b1920d5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -175,12 +175,12 @@ public class KafkaBasedLog<K, V> {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
long started = time.nanoseconds();
long sleepMs = 100;
- while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
+ while (partitionInfos.isEmpty() && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
time.sleep(sleepMs);
sleepMs = Math.min(2 * sleepMs, MAX_SLEEP_MS);
partitionInfos = consumer.partitionsFor(topic);
}
- if (partitionInfos == null)
+ if (partitionInfos.isEmpty())
throw new ConnectException("Could not look up partition metadata for offset backing store topic in" +
" allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
" this is your first use of the topic it may have taken too long to create.");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
index 75374a9..8c4e156 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.integration;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@@ -40,7 +41,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -260,10 +260,11 @@ public class ConnectorTopicsIntegrationTest {
Consumer<byte[], byte[]> verifiableConsumer = connect.kafka().createConsumer(
Collections.singletonMap("group.id", "verifiable-consumer-group-0"));
- List<TopicPartition> partitions =
- Optional.ofNullable(verifiableConsumer.partitionsFor(statusTopic))
- .orElseThrow(() -> new AssertionError("Unable to retrieve partitions info for status topic"))
- .stream()
+ List<PartitionInfo> partitionInfos = verifiableConsumer.partitionsFor(statusTopic);
+ if (partitionInfos.isEmpty()) {
+ throw new AssertionError("Unable to retrieve partitions info for status topic");
+ }
+ List<TopicPartition> partitions = partitionInfos.stream()
.map(info -> new TopicPartition(info.topic(), info.partition()))
.collect(Collectors.toList());
verifiableConsumer.assign(partitions);