You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yan Fang (JIRA)" <ji...@apache.org> on 2015/03/10 21:49:39 UTC

[jira] [Commented] (SAMZA-590) Dead Kafka broker ignores new leader

    [ https://issues.apache.org/jira/browse/SAMZA-590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14355672#comment-14355672 ] 

Yan Fang commented on SAMZA-590:
--------------------------------

+1

> Dead Kafka broker ignores new leader
> ------------------------------------
>
>                 Key: SAMZA-590
>                 URL: https://issues.apache.org/jira/browse/SAMZA-590
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.9.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.9.0
>
>         Attachments: SAMZA-590-0.patch
>
>
> We recently discovered a bug in BrokerProxy, where the fetcher thread will continue trying to fetch from a dead leader, even if a new leader has been elected.
> We were doing a slow rolling bounce of one of our Kafka clusters, where we take a broker offline, do extended maintenance on the machine, then bring the broker back up. We observed that when the broker was moved offline, and the other brokers took over leadership for its partitions, the BrokerProxy thread never saw this update.
> The containers logged this every 10s:
> {noformat}
> 2015-03-09 19:14:19 BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Turn on debugging to get a full stack trace.
> 2015-03-09 19:14:19 BrokerProxy [DEBUG] Exception detail:
> java.nio.channels.ClosedChannelException
> 	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> 	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> 	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
> 	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
> 	at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:174)
> 	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:145)
> 	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:132)
> 	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
> 	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:131)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The problem appears to be that the BrokerProxy thread never lets go of ownership of its TopicAndPartitions when a consumer failure occurs. It just tries to reconnect and fetch again. If the broker is down for a while, this results in the thread owning TopicAndPartitions that are now lead by other brokers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)