You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Emmett Butler <em...@parsely.com> on 2018/08/22 22:00:53 UTC

Fwd: coordinator load + OffsetFetchRequest error = consumption failure

Hi Kafka users,

*tldr questions;*

*1. Is it normal or expected for the coordinator load state to last for 6
hours? Is this load time affected by log retention settings, message
production rate, or other parameters?*
*2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming
only from the non-erroring partitions? Pykafka's insistence that all
partitions return successful OffsetFetchResponses can be a source of
consumption downtime.*
*3. Why does Kafka sometimes select a non-synced replica as the preferred
replica during coordinator loads? How can I reassign partition leaders to
replicas in the ISR?*
*4. Are all of these questions moot because I should just be using a newer
version of Kafka than 0.8.2.1?*


I'm using Kafka 0.8.2.1, running a topic with 200 partitions and RF=3, with
log retention set to about 1GB.

An unknown event caused the cluster to enter the "coordinator load" or
"group load" state. A few signals made this apparent: the pykafka-based
consumers began to fail
<https://github.com/Parsely/pykafka/blob/858554029830e15cfa6d15df002d1772672d8ee0/pykafka/simpleconsumer.py#L643>
during FetchOffsetRequests with error code 14 COORDINATOR_LOAD_IN_PROGRESS
for some subset of partitions. These errors were triggered when consuming
with a consumer group that had existed since before the coordinator load.
In broker logs, messages like this appeared:

[2018-05...] ERROR Controller 17 epoch 20 initiated state change for
partition [my.cool.topic,144] from OnlinePartition to OnlinePartition
failed (state.change.logger)
kafka.common.StateChangeFailedException: encountered error while electing
leader for partition [my.cool.topic,144] due to: Preferred replica 11 for
partition [my.cool.topic,144] is either not alive or not in the isr.
Current leader and ISR: [{"leader":12,"leader_epoch":7,"isr":[12,13]}].

For some reason, Kafka decided that replica 11 was the "preferred" replica
despite the fact that it was not in the ISR. To my knowledge, consumption
*could* continue uninterrupted from either replica 12 or 13 while 11
resynchronized - it's not clear why Kafka chose a non-synced replica as the
preferred leader.

The above-described behavior lasted for about 6 hours, during which the
pykafka fetch_offsets error made message consumption impossible. While the
coordinator load was still in progress, other consumer groups were able to
consume the topic without error. In fact, the eventual fix was to restart
the broken consumers with a new consumer_group name.

*Questions*

1. Is it normal or expected for the coordinator load state to last for 6
hours? Is this load time affected by log retention settings, message
production rate, or other parameters?
2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming
only from the non-erroring partitions? Pykafka's insistence that all
partitions return successful OffsetFetchResponses can be a source of
consumption downtime.
3. Why does Kafka sometimes select a non-synced replica as the preferred
replica during coordinator loads? How can I reassign partition leaders to
replicas in the ISR?
4. Are all of these questions moot because I should just be using a newer
version of Kafka?

Thanks for your help, and please let me know if I can provide additional
information or answer additional questions.

Broker config options:

broker.id=10
port=9092
zookeeper.connect=****/kafka5

log.dirs=*****
delete.topic.enable=true
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
replica.lag.max.messages=4000
controller.socket.timeout.ms=30000
message.max.bytes=1000000
auto.create.topics.enable=false
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=96
log.roll.hours=168
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
num.io.threads=8
socket.request.max.bytes=104857600
num.replica.fetchers=4
controller.message.queue.size=10
num.partitions=8
log.flush.interval.ms=60000
log.flush.interval.messages=60000
log.flush.scheduler.interval.ms=2000
num.network.threads=8
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=500
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
controlled.shutdown.enable=true

-- 
Emmett Butler | Senior Software Engineer
<http://www.parsely.com/?utm_source=Signature&utm_medium=emmett-butler&utm_campaign=Signature>