You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/03 08:55:00 UTC

[jira] [Commented] (FLINK-8497) KafkaConsumer throws NPE if topic doesn't exist

    [ https://issues.apache.org/jira/browse/FLINK-8497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462134#comment-16462134 ] 

ASF GitHub Bot commented on FLINK-8497:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5929#discussion_r185732772
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java ---
    @@ -74,7 +74,12 @@ protected void initializeConnections() {
     
     		try {
     			for (String topic : topics) {
    -				for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topic)) {
    +				List<PartitionInfo> topicPartitions = kafkaConsumer.partitionsFor(topic);
    +				if (topicPartitions == null) {
    +					throw new IllegalStateException("The topic " + topic + " does not exist");
    --- End diff --
    
    I fear that this might be too aggressive.
    IMO, it is fine that the user has, say 3 topics, but only one of them actually doesn't exist.
    
    What we should handle is the case where there is completely no partitions at all across all provided topics.
    Perhaps for this, we only write a log that some topic has no partitions?


> KafkaConsumer throws NPE if topic doesn't exist
> -----------------------------------------------
>
>                 Key: FLINK-8497
>                 URL: https://issues.apache.org/jira/browse/FLINK-8497
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: chris snow
>            Assignee: Aleksei Lesnov
>            Priority: Minor
>
> If I accidentally set the kafka consumer with a topic that doesn't exist:
> {code:java}
> FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011(
>    "does_not_exist",
>     new JSONKeyValueDeserializationSchema(false),
>     properties
>     );
> DataStream<String> input = env.addSource(kafkaConsumer);{code}
> Flink throws NPE
> {code:java}
> Caused by: java.lang.NullPointerException
> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)
> at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748){code}
> Maybe Flink could through an IllegalStateException("Topic not found")?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)