You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Sergey Nuyanzin (JIRA)" <ji...@apache.org> on 2018/05/17 10:53:00 UTC

[jira] [Comment Edited] (FLINK-9349) KafkaConnector Exception while fetching from multiple kafka topics

    [ https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16478886#comment-16478886 ] 

Sergey Nuyanzin edited comment on FLINK-9349 at 5/17/18 10:52 AM:
------------------------------------------------------------------

Hello 
I was able to write a test (based on existing) to reproduce this and one more related issue.
the second one is {noformat}Caused by: java.lang.NullPointerException
	at java.util.LinkedList$ListItr.next(LinkedList.java:893)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.doCommitInternalOffsetsToKafka(Kafka09Fetcher.java:228)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.commitInternalOffsetsToKafka(AbstractFetcher.java:293){noformat}
as a result of concurrent work with non thread safe subscribedPartitionStates (LinkedList). 
from my point of view there could be 2 possible solutions:
# added synchronization as you mentioned
# use threadsafe collection e.g. CopyOnWriteArrayList instead of LinkedList for subscribedPartitionStates 
 
both options pass the test
by the way the code snippet for the test is attached [^Flink9349Test.java] 

in the second option no synchronized is required and it might be an option if partitionStates are not frequent otherwise it makes sense to use synchronization
[~yuzhihong@gmail.com] could you please point to more appropriate approach?



was (Author: sergey nuyanzin):
Hello 
I was able to write a test (based on existing) to reproduce this and one more related issue.
the second one is {noformat}Caused by: java.lang.NullPointerException
	at java.util.LinkedList$ListItr.next(LinkedList.java:893)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.doCommitInternalOffsetsToKafka(Kafka09Fetcher.java:228)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.commitInternalOffsetsToKafka(AbstractFetcher.java:293){noformat}
as a result of concurrent work with non thread safe subscribedPartitionStates (LinkedList). 
from my point of view there could be 2 possible solutions:
# added synchronization as you mentioned
# use threadsafe collection e.g. CopyOnWriteArrayList instead of LinkedList for subscribedPartitionStates 
#  [^Flink9349Test.java] 
both options pass the test
by the way the code snippet for the test is attached

in the second option no synchronized is required and it might be an option if partitionStates are not frequent otherwise it makes sense to use synchronization
[~yuzhihong@gmail.com] could you please point to more appropriate approach?


> KafkaConnector Exception  while fetching from multiple kafka topics
> -------------------------------------------------------------------
>
>                 Key: FLINK-9349
>                 URL: https://issues.apache.org/jira/browse/FLINK-9349
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: Vishal Santoshi
>            Priority: Major
>         Attachments: Flink9349Test.java
>
>
> ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
>  
> It seems the List subscribedPartitionStates was being modified when runFetchLoop iterated the List.
> This can happen if, e.g., FlinkKafkaConsumer runs the following code concurrently:
>                 kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
>  
> {code:java}
>  java.util.ConcurrentModificationException
> 	at java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966)
> 	at java.util.LinkedList$ListItr.next(LinkedList.java:888)
> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)