You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Rico Bergmann (JIRA)" <ji...@apache.org> on 2015/08/05 13:37:04 UTC

[jira] [Commented] (FLINK-2325) PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that is created after starting the Source

    [ https://issues.apache.org/jira/browse/FLINK-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14655231#comment-14655231 ] 

Rico Bergmann commented on FLINK-2325:
--------------------------------------

I think on getting a message in the run method you should check, whether the partition number of the message is >= lastOffsets.length. This is useful for on-the-fly" created topics and in case of repartitioning of a topic.
Hope you understand what I mean ;-)

> PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that is created after starting the Source
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-2325
>                 URL: https://issues.apache.org/jira/browse/FLINK-2325
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 0.9
>            Reporter: Rico Bergmann
>            Assignee: Robert Metzger
>
> I'm creating a PersistentKafkaSource reading from a specified topic from Kafka, that is at the time the PersistentKafkaSource is started (via open(.)) not yet present. That's why the number of partitions, that is read in the open(.) function is 0, which leads to arrays of length 0 (lastOffsets and committedOffsets).
> May be it is better to check, whether numberOfPartitions returns 0 and if so, to take the default number of partitions from Kafka config?
> Stacktrace:
> java.lang.ArrayIndexOutOfBoundsException: 0
> 	at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:180)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> 	at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)