You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2019/08/01 07:24:54 UTC

[flink] branch master updated: [FLINK-11321] Clarify NPE on fetching nonexistent topic (#7487)

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ce5de6be [FLINK-11321] Clarify NPE on fetching nonexistent topic (#7487)
ce5de6be is described below

commit ce5de6be563ffe6e36b8f917a7f500ffc2b49c08
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Thu Aug 1 09:24:40 2019 +0200

    [FLINK-11321] Clarify NPE on fetching nonexistent topic (#7487)
    
    [FLINK-11321][connector/kafka] Throw explicit exception on fetching nonexistent topic.
    
    Before the fix, NPE will be thrown if a consumer is trying to fetch from a nonexistent topic. The fix throws a RuntimeException with a clear message.
---
 .../kafka/internal/Kafka09PartitionDiscoverer.java         | 10 ++++++++--
 .../kafka/internal/KafkaPartitionDiscoverer.java           | 14 ++++++++++----
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
index 5e55995..a9ebaeb 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
@@ -69,12 +69,18 @@ public class Kafka09PartitionDiscoverer extends AbstractPartitionDiscoverer {
 	}
 
 	@Override
-	protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws WakeupException {
+	protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws WakeupException, RuntimeException {
 		List<KafkaTopicPartition> partitions = new LinkedList<>();
 
 		try {
 			for (String topic : topics) {
-				for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topic)) {
+				final List<PartitionInfo> kafkaPartitions = kafkaConsumer.partitionsFor(topic);
+
+				if (kafkaPartitions == null) {
+					throw new RuntimeException("Could not fetch partitions for %s. Make sure that the topic exists.".format(topic));
+				}
+
+				for (PartitionInfo partitionInfo : kafkaPartitions) {
 					partitions.add(new KafkaTopicPartition(partitionInfo.topic(), partitionInfo.partition()));
 				}
 			}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaPartitionDiscoverer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaPartitionDiscoverer.java
index cc705ea..1c871bc 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaPartitionDiscoverer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaPartitionDiscoverer.java
@@ -69,18 +69,24 @@ public class KafkaPartitionDiscoverer extends AbstractPartitionDiscoverer {
 	}
 
 	@Override
-	protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws AbstractPartitionDiscoverer.WakeupException {
-		List<KafkaTopicPartition> partitions = new LinkedList<>();
+	protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws WakeupException, RuntimeException {
+		final List<KafkaTopicPartition> partitions = new LinkedList<>();
 
 		try {
 			for (String topic : topics) {
-				for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topic)) {
+				final List<PartitionInfo> kafkaPartitions = kafkaConsumer.partitionsFor(topic);
+
+				if (kafkaPartitions == null) {
+					throw new RuntimeException("Could not fetch partitions for %s. Make sure that the topic exists.".format(topic));
+				}
+
+				for (PartitionInfo partitionInfo : kafkaPartitions) {
 					partitions.add(new KafkaTopicPartition(partitionInfo.topic(), partitionInfo.partition()));
 				}
 			}
 		} catch (org.apache.kafka.common.errors.WakeupException e) {
 			// rethrow our own wakeup exception
-			throw new AbstractPartitionDiscoverer.WakeupException();
+			throw new WakeupException();
 		}
 
 		return partitions;