You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2016/03/01 16:29:18 UTC

[jira] [Commented] (FLINK-3368) Kafka 0.8 consumer fails to recover from broker shutdowns

    [ https://issues.apache.org/jira/browse/FLINK-3368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15173899#comment-15173899 ] 

Stephan Ewen commented on FLINK-3368:
-------------------------------------

It has not yet been backported to the 0.10 branch, as far as I know. In 0.10, the entire job needs to go into recovery for that.

Seems these days people are all very busy with 1.0 release testing. If there is some breathing room after that, we could look into this.

> Kafka 0.8 consumer fails to recover from broker shutdowns
> ---------------------------------------------------------
>
>                 Key: FLINK-3368
>                 URL: https://issues.apache.org/jira/browse/FLINK-3368
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.0
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>            Priority: Blocker
>             Fix For: 1.0.0
>
>
> It seems that the Kafka Consumer (0.8) fails to restart a job after it failed due to a Kafka broker shutdown.
> {code}
> java.lang.Exception: Unable to get last offset for partitions [FetchPartition {topic=a, partition=13, offset=-915623761776}, FetchPartition {topic=b, partition=13, offset=-915623761776}, FetchPartition {topic=c, partition=13, offset=-915623761776}, FetchPartition {topic=d, partition=13, offset=-915623761776}, FetchPartition {topic=e, partition=13, offset=-915623761776}, FetchPartition {topic=f, partition=13, offset=-915623761776}, FetchPartition {topic=g, partition=13, offset=-915623761776}].
> Exception for partition 13: kafka.common.NotLeaderForPartitionException
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
> 	at java.lang.Class.newInstance(Class.java:442)
> 	at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
> 	at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
> 	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:551)
> 	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:379)
> {code}
> I haven't understood the cause of this issue, but I'll investigate it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)