You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2018/05/03 08:54:54 UTC

[GitHub] flink pull request #5929: [FLINK-8497] [connectors] KafkaConsumer throws NPE...

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5929#discussion_r185732772
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java ---
    @@ -74,7 +74,12 @@ protected void initializeConnections() {
     
     		try {
     			for (String topic : topics) {
    -				for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topic)) {
    +				List<PartitionInfo> topicPartitions = kafkaConsumer.partitionsFor(topic);
    +				if (topicPartitions == null) {
    +					throw new IllegalStateException("The topic " + topic + " does not exist");
    --- End diff --
    
    I fear that this might be too aggressive.
    IMO, it is fine that the user has, say 3 topics, but only one of them actually doesn't exist.
    
    What we should handle is the case where there is completely no partitions at all across all provided topics.
    Perhaps for this, we only write a log that some topic has no partitions?


---