You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2015/03/10 20:31:38 UTC

[jira] [Updated] (SAMZA-590) Dead Kafka broker ignores new leader

     [ https://issues.apache.org/jira/browse/SAMZA-590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Riccomini updated SAMZA-590:
----------------------------------
    Attachment: SAMZA-590-0.patch

Attaching patch. RB at:

https://reviews.apache.org/r/31909

# Move "abdicate" method to be a top-level method within the class.
# Added an abdicateAll method that abdicates all TopicAndPartitions.
# Invoke abdicateAll whenever a consumer failure occurs.
# Wrote a unit test to validate that the BrokerProxy relinquishes ownership for all TopicAndPartitions on failure, and that it continues to refresh dropped TopicAndPartitions (in case its broker comes alive again) afterwards.

Also, ran this code in one of our jobs on our unstable cluster (that we're bouncing as described in description), and found that the BrokerProxy properly relinquished ownership.

{noformat}
2015-03-10 17:42:31 DefaultFetchSimpleConsumer [INFO] Reconnect due to socket error: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
2015-03-10 17:42:31 BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace.
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Abdicating for [topic1,14]
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 VerifiableProperties [INFO] Verifying properties
2015-03-10 17:42:31 VerifiableProperties [INFO] Property client.id is overridden to samza_consumer-job_name-i001-1425999995883-2
2015-03-10 17:42:31 VerifiableProperties [INFO] Property metadata.broker.list is overridden to broker-vip:10251
2015-03-10 17:42:31 VerifiableProperties [INFO] Property request.timeout.ms is overridden to 60000
2015-03-10 17:42:31 ClientUtils$ [INFO] Fetching metadata from broker id:0,host:broker-vip,port:10251 with correlation id 1 for 1 topic(s) Set(topic1)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 SyncProducer [INFO] Connected to broker-vip:10251 for producing
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 SyncProducer [INFO] Disconnecting from broker-vip:10251
2015-03-10 17:42:31 BrokerProxy [INFO] Creating new SimpleConsumer for host broker1:10251 for system kafka
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 GetOffset [INFO] Validating offset 18198544 for topic and partition [topic1,14]
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
2015-03-10 17:42:32 GetOffset [INFO] Able to successfully read from offset 18198544 for topic and partition [topic1,14]. Using it to instantiate consumer.
2015-03-10 17:42:32 BrokerProxy [INFO] Starting BrokerProxy for broker1:10251
2015-03-10 17:42:32 BrokerProxy [INFO] Creating new SimpleConsumer for host broker2:10251 for system kafka
{noformat}

> Dead Kafka broker ignores new leader
> ------------------------------------
>
>                 Key: SAMZA-590
>                 URL: https://issues.apache.org/jira/browse/SAMZA-590
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.9.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.9.0
>
>         Attachments: SAMZA-590-0.patch
>
>
> We recently discovered a bug in BrokerProxy, where the fetcher thread will continue trying to fetch from a dead leader, even if a new leader has been elected.
> We were doing a slow rolling bounce of one of our Kafka clusters, where we take a broker offline, do extended maintenance on the machine, then bring the broker back up. We observed that when the broker was moved offline, and the other brokers took over leadership for its partitions, the BrokerProxy thread never saw this update.
> The containers logged this every 10s:
> {noformat}
> 2015-03-09 19:14:19 BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Turn on debugging to get a full stack trace.
> 2015-03-09 19:14:19 BrokerProxy [DEBUG] Exception detail:
> java.nio.channels.ClosedChannelException
> 	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> 	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> 	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
> 	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
> 	at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:174)
> 	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:145)
> 	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:132)
> 	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
> 	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:131)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The problem appears to be that the BrokerProxy thread never lets go of ownership of its TopicAndPartitions when a consumer failure occurs. It just tries to reconnect and fetch again. If the broker is down for a while, this results in the thread owning TopicAndPartitions that are now lead by other brokers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)