You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2015/03/10 20:25:38 UTC

[jira] [Created] (SAMZA-590) Dead Kafka broker ignores new leader

Chris Riccomini created SAMZA-590:
-------------------------------------

             Summary: Dead Kafka broker ignores new leader
                 Key: SAMZA-590
                 URL: https://issues.apache.org/jira/browse/SAMZA-590
             Project: Samza
          Issue Type: Bug
          Components: kafka
    Affects Versions: 0.9.0
            Reporter: Chris Riccomini
            Assignee: Chris Riccomini
             Fix For: 0.9.0


We recently discovered a bug in BrokerProxy, where the fetcher thread will continue trying to fetch from a dead leader, even if a new leader has been elected.

We were doing a slow rolling bounce of one of our Kafka clusters, where we take a broker offline, do extended maintenance on the machine, then bring the broker back up. We observed that when the broker was moved offline, and the other brokers took over leadership for its partitions, the BrokerProxy thread never saw this update.

The containers logged this every 10s:

{noformat}
2015-03-09 19:14:19 BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Turn on debugging to get a full stack trace.
2015-03-09 19:14:19 BrokerProxy [DEBUG] Exception detail:
java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
	at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:174)
	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:145)
	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:132)
	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:131)
	at java.lang.Thread.run(Thread.java:745)
{noformat}

The problem appears to be that the BrokerProxy thread never lets go of ownership of its TopicAndPartitions when a consumer failure occurs. It just tries to reconnect and fetch again. If the broker is down for a while, this results in the thread owning TopicAndPartitions that are now lead by other brokers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)