You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Nelson Elhage (JIRA)" <ji...@apache.org> on 2016/10/28 22:29:58 UTC

[jira] [Updated] (KAFKA-4358) Following a hung broker, newly elected leader is unnecessarily slow assuming leadership because of ReplicaFetcherThread

     [ https://issues.apache.org/jira/browse/KAFKA-4358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Nelson Elhage updated KAFKA-4358:
---------------------------------
    Status: Open  (was: Patch Available)

> Following a hung broker, newly elected leader is unnecessarily slow assuming leadership because of ReplicaFetcherThread
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4358
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4358
>             Project: Kafka
>          Issue Type: Bug
>          Components: replication
>    Affects Versions: 0.10.0.1
>            Reporter: Nelson Elhage
>            Priority: Minor
>
> When a broker handles a `LeaderAndIsr` request, the replica manager blocks waiting for idle replication fetcher threads to die before responding to the message and being able to service new produce requests.
> If requests to a broker start blackholing (e.g. due to network failure, or due to the broker hanging), shutting down the `ReplicaFetcherThread` can take a long time (around 30s in my testing), blocking recovery of any partitions previously lead by that broker.
> This is a very similar issue to KAFKA-612.
> Instructions to reproduce/demonstrate:
> Stand up three brokers and create a replicated topic:
> {code}
> bin/zookeeper-server-start.sh config/zookeeper.properties &
> bin/kafka-server-start.sh config/server1.properties &
> bin/kafka-server-start.sh config/server2.properties &
> bin/kafka-server-start.sh config/server3.properties &
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic replicated.topic
> {code}
> Identify the leader, and (for simplicity in interpreting the event) make sure it's not the same as the cluster controller:
> {code}
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic replicated.topic
> {code}
> Start a stream of produce events (with a shortened timeout so we get faster visibility into when the cluster recovers):
> {code}
> echo request.timeout.ms=1000 >> config/producer.properties
> bin/kafka-verifiable-producer.sh --throughput 2 --topic replicated.topic --broker-list localhost:9092 --producer.config $(pwd)/config/producer.properties
> {code}
> Now SIGSTOP the leader (3, in my example):
> {code}
> kill -STOP $(pgrep -f server3.properties)
> {code}
> The producer will log errors for about 30 seconds, and then recover. However, if we read logs, we'll see (excerpting key log lines from `state-change.log`):
> {code}
> [2016-10-28 20:36:03,128] TRACE Controller 2 epoch 8 sending become-leader LeaderAndIsr request (Leader:2,ISR:2,1,LeaderEpoch:22,ControllerEpoch:8) to broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:36:03,131] TRACE Broker 2 handling LeaderAndIsr request correlationId 2 from controller 2 epoch 8 starting the become-leader transition for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:17,741] TRACE Controller 1 epoch 11 changed partition [replicated.topic,0] from OnlinePartition to OnlinePartition with leader 3 (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 changed partition [replicated.topic,0] state from OnlinePartition to OfflinePartition (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 started leader election for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,016] TRACE Controller 1 epoch 11 elected leader 2 for Offline partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 changed partition [replicated.topic,0] from OfflinePartition to OnlinePartition with leader 2 (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 sending become-leader LeaderAndIsr request (Leader:2,ISR:1,2,LeaderEpoch:30,ControllerEpoch:11) to broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,023] TRACE Broker 2 received LeaderAndIsr request PartitionState(controllerEpoch=11, leader=2, leaderEpoch=30, isr=[1, 2], zkVersion=46, replicas=[1, 2, 3]) correlation id 18 from controller 1 epoch 11 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,024] TRACE Broker 2 handling LeaderAndIsr request correlationId 18 from controller 1 epoch 11 starting the become-leader transition for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,026] TRACE Broker 2 stopped fetchers as part of become-leader request from controller 1 epoch 11 with correlation id 18 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,026] TRACE Broker 2 completed LeaderAndIsr request correlationId 18 from controller 1 epoch 11 for the become-leader transition for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:56,058] TRACE Controller 1 epoch 11 received response {error_code=0,partitions=[{topic=replicated.topic,partition=0,error_code=0}]} for a request sent to broker qa-dev1.northwest.stripe.io:9093 (id: 2 rack: null) (state.change.logger)
> {code}
> Note the ~23s pause between broker 2 logging completion of the LeaderAndIsr request, and the controller logging receipt. If we look at broker 2 specifically now, we find
> {code}
> [2016-10-28 20:48:33,025] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions replicated.topic-0 (kafka.server.ReplicaFetcherManager)
> [2016-10-28 20:48:33,026] INFO [ReplicaFetcherThread-0-3], Shutting down (kafka.server.ReplicaFetcherThread)
> [2016-10-28 20:48:56,055] INFO [ReplicaFetcherThread-0-3], Stopped  (kafka.server.ReplicaFetcherThread)
> [2016-10-28 20:48:56,055] INFO [ReplicaFetcherThread-0-3], Shutdown completed (kafka.server.ReplicaFetcherThread)
> {code}
> Which timestamps exactly match the "missing" seconds. It also aligns with the first successful publish from my publish job, which happened at `2016-10-28 20:48:57.555`
> Aggressively dropping `request.timeout.ms` speeds up this failover process, but it seems that we should be able to recover those ~25s of unavailability without having to drop any additional timeouts, by either interrupting that thread somehow or not having to block on its shutdown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)