You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2019/08/01 07:24:54 UTC
[flink] branch master updated: [FLINK-11321] Clarify NPE on
fetching nonexistent topic (#7487)
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ce5de6be [FLINK-11321] Clarify NPE on fetching nonexistent topic (#7487)
ce5de6be is described below
commit ce5de6be563ffe6e36b8f917a7f500ffc2b49c08
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Thu Aug 1 09:24:40 2019 +0200
[FLINK-11321] Clarify NPE on fetching nonexistent topic (#7487)
[FLINK-11321][connector/kafka] Throw explicit exception on fetching nonexistent topic.
Before the fix, NPE will be thrown if a consumer is trying to fetch from a nonexistent topic. The fix throws a RuntimeException with a clear message.
---
.../kafka/internal/Kafka09PartitionDiscoverer.java | 10 ++++++++--
.../kafka/internal/KafkaPartitionDiscoverer.java | 14 ++++++++++----
2 files changed, 18 insertions(+), 6 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
index 5e55995..a9ebaeb 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
@@ -69,12 +69,18 @@ public class Kafka09PartitionDiscoverer extends AbstractPartitionDiscoverer {
}
@Override
- protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws WakeupException {
+ protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws WakeupException, RuntimeException {
List<KafkaTopicPartition> partitions = new LinkedList<>();
try {
for (String topic : topics) {
- for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topic)) {
+ final List<PartitionInfo> kafkaPartitions = kafkaConsumer.partitionsFor(topic);
+
+ if (kafkaPartitions == null) {
+ throw new RuntimeException("Could not fetch partitions for %s. Make sure that the topic exists.".format(topic));
+ }
+
+ for (PartitionInfo partitionInfo : kafkaPartitions) {
partitions.add(new KafkaTopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaPartitionDiscoverer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaPartitionDiscoverer.java
index cc705ea..1c871bc 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaPartitionDiscoverer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaPartitionDiscoverer.java
@@ -69,18 +69,24 @@ public class KafkaPartitionDiscoverer extends AbstractPartitionDiscoverer {
}
@Override
- protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws AbstractPartitionDiscoverer.WakeupException {
- List<KafkaTopicPartition> partitions = new LinkedList<>();
+ protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws WakeupException, RuntimeException {
+ final List<KafkaTopicPartition> partitions = new LinkedList<>();
try {
for (String topic : topics) {
- for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topic)) {
+ final List<PartitionInfo> kafkaPartitions = kafkaConsumer.partitionsFor(topic);
+
+ if (kafkaPartitions == null) {
+ throw new RuntimeException("Could not fetch partitions for %s. Make sure that the topic exists.".format(topic));
+ }
+
+ for (PartitionInfo partitionInfo : kafkaPartitions) {
partitions.add(new KafkaTopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
}
} catch (org.apache.kafka.common.errors.WakeupException e) {
// rethrow our own wakeup exception
- throw new AbstractPartitionDiscoverer.WakeupException();
+ throw new WakeupException();
}
return partitions;