You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Gleb Zhukov <gz...@iponweb.net> on 2015/09/01 15:26:25 UTC

Kafka loses data after one broker reboot

Hi, All!
We use 3 kafka's brokers with replica-factor 2. Today we were doing
partitions reassignment and one of our brokers was rebooted due some
hardware problem. After broker had returned back to work we found that our
consumer doesn't work with errors like:

ERROR java.lang.AssertionError: assumption failed: 765994 exceeds 6339
ERROR java.lang.AssertionError: assumption failed: 1501252 exceeds 416522
ERROR java.lang.AssertionError: assumption failed: 950819 exceeds 805377

Some logs from broker:

[2015-09-01 13:00:16,976] ERROR [Replica Manager on Broker 61]: Error when
processing fetch request for partition [avro_match,27] offset 208064729
from consumer with correlation id 0. Possible cause: Request for offset
208064729 but we only have log segments in the range 209248794 to
250879159. (kafka.server.ReplicaManager)
[2015-09-01 13:01:17,943] ERROR [Replica Manager on Broker 45]: Error when
processing fetch request for partition [logs.conv_expired,20] offset 454
from consumer with correlation id 0. Possible cause: Request for offset 454
but we only have log segments in the range 1349769 to 1476231.
(kafka.server.ReplicaManager)

[2015-09-01 13:21:23,896] INFO Partition [logs.avro_event,29] on broker 61:
Expanding ISR for partition [logs.avro_event,29] from 61,77 to 61,77,45
(kafka.cluster.Partition)
[2015-09-01 13:21:23,899] INFO Partition [logs.imp_tstvssamza,6] on broker
61: Expanding ISR for partition [logs.imp_tstvssamza,6] from 61,77 to
61,77,45 (kafka.cluster.Partition)
[2015-09-01 13:21:23,902] INFO Partition [__consumer_offsets,30] on broker
61: Expanding ISR for partition [__consumer_offsets,30] from 61,77 to
61,77,45 (kafka.cluster.Partition)
[2015-09-01 13:21:23,905] INFO Partition [logs.test_imp,44] on broker 61:
Expanding ISR for partition [logs.test_imp,44] from 61 to 61,45
(kafka.cluster.Partition)

Looks like we lost part of our data.

Also kafka started to replicating random partitions (bad broker was already
up and running, log recovery was completed):

root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc -l
Tue Sep  1 13:02:24 UTC 2015
431
root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc -l
Tue Sep  1 13:02:37 UTC 2015
386
root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc -l
Tue Sep  1 13:02:48 UTC 2015
501
root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc -l
Tue Sep  1 13:02:58 UTC 2015
288
root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc -l
Tue Sep  1 13:03:08 UTC 2015
363

Could anyone throw some light on this situation?

We use ext4 on our brokers and these settings:

port=9092
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mnt/kafka/kafka-data
num.partitions=1
default.replication.factor=2
message.max.bytes=10000000
replica.fetch.max.bytes=10000000
auto.create.topics.enable=false
log.roll.hours=24
num.replica.fetchers=4
auto.leader.rebalance.enable=true
log.retention.hours=168
log.segment.bytes=134217728
log.retention.check.interval.ms=60000
log.cleaner.enable=false
delete.topic.enable=true
zookeeper.connect=zk1d.gce-eu.kafka:2181,zk2d.gce-eu.kafka:2181,zk3d.gce-eu.kafka:2181/kafka
zookeeper.connection.timeout.ms=6000

Shall I do something with these parameters or cluster with 3 brokers and
with replica-factor=2 should prevent such issues?

log.flush.interval.ms
log.flush.interval.messageslog.flush.scheduler.interval.ms

THX!


-- 
Best regards,
Gleb Zhukov

Re: Kafka loses data after one broker reboot

Posted by Gleb Zhukov <gz...@iponweb.net>.
Hi, Lukas.
I don't think so because we lost data with offsets which are larger than
exist offsets. But retention should delete offsets less than exist offsets.

Hi, Gwen.
Thanks a lot. We will sleep on it.


On Wed, Sep 2, 2015 at 1:07 AM, Gwen Shapira <gw...@confluent.io> wrote:

> There is another edge-case that can lead to this scenario. It is described
> in detail in KAFKA-2134, and I'll copy Becket's excellent summary here for
> reference:
>
> 1) Broker A (leader) has committed offset up-to 5000
> 2) Broker B (follower) has committed offset up to 3000 (he is still in ISR
> because of replica.lag.max.messages)
> ***network glitch happens***
> 3) Broker B becomes a leader, Broker A becomes a follower
> 4) Broker A fetch from Broker B and got an OffsetOutOfRangeException (since
> it is trying to fetch offset over 5000 while B only has 3000).
> 4.1) Broker B got some new message and its log end offset become 6000.
> 4.2) Broker A tries to handle the Exception so it checks the log end offset
> on broker B (Followers that get OffsetOutOfRangeException try to reset to
> the LogEndOffset of the leader) and found it is greater than broker A's log
> end offset so it truncate itself to the starting offset of Broker B
> (causing significant replication to happen).
>
> You can avoid data loss in these scenarios by setting acks = -1 on all
> producers and unclean.leader.election = false.
> This guarantees that messages that got "acked" to producers exist on all
> in-sync replicas, and that an unsynced replica will never become a leader
> of a partition. So when a scenario like above occurs, it means that
> messages 3000-5000 (which were lost) were never acked to producers (and
> hopefully the producers will retry sending these messages).
>
> Gwen
>
> On Tue, Sep 1, 2015 at 9:56 AM, Lukas Steiblys <lu...@doubledutch.me>
> wrote:
>
> > Hi Gleb,
> >
> > Are you sure you were not trying to read data that had already exceeded
> > its retention policy?
> >
> > Lukas
> >
> > -----Original Message----- From: Gleb Zhukov
> > Sent: Tuesday, September 1, 2015 6:26 AM
> > To: users@kafka.apache.org
> > Subject: Kafka loses data after one broker reboot
> >
> >
> > Hi, All!
> > We use 3 kafka's brokers with replica-factor 2. Today we were doing
> > partitions reassignment and one of our brokers was rebooted due some
> > hardware problem. After broker had returned back to work we found that
> our
> > consumer doesn't work with errors like:
> >
> > ERROR java.lang.AssertionError: assumption failed: 765994 exceeds 6339
> > ERROR java.lang.AssertionError: assumption failed: 1501252 exceeds 416522
> > ERROR java.lang.AssertionError: assumption failed: 950819 exceeds 805377
> >
> > Some logs from broker:
> >
> > [2015-09-01 13:00:16,976] ERROR [Replica Manager on Broker 61]: Error
> when
> > processing fetch request for partition [avro_match,27] offset 208064729
> > from consumer with correlation id 0. Possible cause: Request for offset
> > 208064729 but we only have log segments in the range 209248794 to
> > 250879159. (kafka.server.ReplicaManager)
> > [2015-09-01 13:01:17,943] ERROR [Replica Manager on Broker 45]: Error
> when
> > processing fetch request for partition [logs.conv_expired,20] offset 454
> > from consumer with correlation id 0. Possible cause: Request for offset
> 454
> > but we only have log segments in the range 1349769 to 1476231.
> > (kafka.server.ReplicaManager)
> >
> > [2015-09-01 13:21:23,896] INFO Partition [logs.avro_event,29] on broker
> 61:
> > Expanding ISR for partition [logs.avro_event,29] from 61,77 to 61,77,45
> > (kafka.cluster.Partition)
> > [2015-09-01 13:21:23,899] INFO Partition [logs.imp_tstvssamza,6] on
> broker
> > 61: Expanding ISR for partition [logs.imp_tstvssamza,6] from 61,77 to
> > 61,77,45 (kafka.cluster.Partition)
> > [2015-09-01 13:21:23,902] INFO Partition [__consumer_offsets,30] on
> broker
> > 61: Expanding ISR for partition [__consumer_offsets,30] from 61,77 to
> > 61,77,45 (kafka.cluster.Partition)
> > [2015-09-01 13:21:23,905] INFO Partition [logs.test_imp,44] on broker 61:
> > Expanding ISR for partition [logs.test_imp,44] from 61 to 61,45
> > (kafka.cluster.Partition)
> >
> > Looks like we lost part of our data.
> >
> > Also kafka started to replicating random partitions (bad broker was
> already
> > up and running, log recovery was completed):
> >
> > root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
> > zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc
> > -l
> > Tue Sep  1 13:02:24 UTC 2015
> > 431
> > root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
> > zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc
> > -l
> > Tue Sep  1 13:02:37 UTC 2015
> > 386
> > root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
> > zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc
> > -l
> > Tue Sep  1 13:02:48 UTC 2015
> > 501
> > root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
> > zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc
> > -l
> > Tue Sep  1 13:02:58 UTC 2015
> > 288
> > root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
> > zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc
> > -l
> > Tue Sep  1 13:03:08 UTC 2015
> > 363
> >
> > Could anyone throw some light on this situation?
> >
> > We use ext4 on our brokers and these settings:
> >
> > port=9092
> > num.network.threads=2
> > num.io.threads=8
> > socket.send.buffer.bytes=1048576
> > socket.receive.buffer.bytes=1048576
> > socket.request.max.bytes=104857600
> > log.dirs=/mnt/kafka/kafka-data
> > num.partitions=1
> > default.replication.factor=2
> > message.max.bytes=10000000
> > replica.fetch.max.bytes=10000000
> > auto.create.topics.enable=false
> > log.roll.hours=24
> > num.replica.fetchers=4
> > auto.leader.rebalance.enable=true
> > log.retention.hours=168
> > log.segment.bytes=134217728
> > log.retention.check.interval.ms=60000
> > log.cleaner.enable=false
> > delete.topic.enable=true
> >
> >
> zookeeper.connect=zk1d.gce-eu.kafka:2181,zk2d.gce-eu.kafka:2181,zk3d.gce-eu.kafka:2181/kafka
> > zookeeper.connection.timeout.ms=6000
> >
> > Shall I do something with these parameters or cluster with 3 brokers and
> > with replica-factor=2 should prevent such issues?
> >
> > log.flush.interval.ms
> > log.flush.interval.messageslog.flush.scheduler.interval.ms
> >
> > THX!
> >
> >
> > --
> > Best regards,
> > Gleb Zhukov
> >
>



-- 
Best regards,
Gleb Zhukov

Re: Kafka loses data after one broker reboot

Posted by Gwen Shapira <gw...@confluent.io>.
There is another edge-case that can lead to this scenario. It is described
in detail in KAFKA-2134, and I'll copy Becket's excellent summary here for
reference:

1) Broker A (leader) has committed offset up-to 5000
2) Broker B (follower) has committed offset up to 3000 (he is still in ISR
because of replica.lag.max.messages)
***network glitch happens***
3) Broker B becomes a leader, Broker A becomes a follower
4) Broker A fetch from Broker B and got an OffsetOutOfRangeException (since
it is trying to fetch offset over 5000 while B only has 3000).
4.1) Broker B got some new message and its log end offset become 6000.
4.2) Broker A tries to handle the Exception so it checks the log end offset
on broker B (Followers that get OffsetOutOfRangeException try to reset to
the LogEndOffset of the leader) and found it is greater than broker A's log
end offset so it truncate itself to the starting offset of Broker B
(causing significant replication to happen).

You can avoid data loss in these scenarios by setting acks = -1 on all
producers and unclean.leader.election = false.
This guarantees that messages that got "acked" to producers exist on all
in-sync replicas, and that an unsynced replica will never become a leader
of a partition. So when a scenario like above occurs, it means that
messages 3000-5000 (which were lost) were never acked to producers (and
hopefully the producers will retry sending these messages).

Gwen

On Tue, Sep 1, 2015 at 9:56 AM, Lukas Steiblys <lu...@doubledutch.me> wrote:

> Hi Gleb,
>
> Are you sure you were not trying to read data that had already exceeded
> its retention policy?
>
> Lukas
>
> -----Original Message----- From: Gleb Zhukov
> Sent: Tuesday, September 1, 2015 6:26 AM
> To: users@kafka.apache.org
> Subject: Kafka loses data after one broker reboot
>
>
> Hi, All!
> We use 3 kafka's brokers with replica-factor 2. Today we were doing
> partitions reassignment and one of our brokers was rebooted due some
> hardware problem. After broker had returned back to work we found that our
> consumer doesn't work with errors like:
>
> ERROR java.lang.AssertionError: assumption failed: 765994 exceeds 6339
> ERROR java.lang.AssertionError: assumption failed: 1501252 exceeds 416522
> ERROR java.lang.AssertionError: assumption failed: 950819 exceeds 805377
>
> Some logs from broker:
>
> [2015-09-01 13:00:16,976] ERROR [Replica Manager on Broker 61]: Error when
> processing fetch request for partition [avro_match,27] offset 208064729
> from consumer with correlation id 0. Possible cause: Request for offset
> 208064729 but we only have log segments in the range 209248794 to
> 250879159. (kafka.server.ReplicaManager)
> [2015-09-01 13:01:17,943] ERROR [Replica Manager on Broker 45]: Error when
> processing fetch request for partition [logs.conv_expired,20] offset 454
> from consumer with correlation id 0. Possible cause: Request for offset 454
> but we only have log segments in the range 1349769 to 1476231.
> (kafka.server.ReplicaManager)
>
> [2015-09-01 13:21:23,896] INFO Partition [logs.avro_event,29] on broker 61:
> Expanding ISR for partition [logs.avro_event,29] from 61,77 to 61,77,45
> (kafka.cluster.Partition)
> [2015-09-01 13:21:23,899] INFO Partition [logs.imp_tstvssamza,6] on broker
> 61: Expanding ISR for partition [logs.imp_tstvssamza,6] from 61,77 to
> 61,77,45 (kafka.cluster.Partition)
> [2015-09-01 13:21:23,902] INFO Partition [__consumer_offsets,30] on broker
> 61: Expanding ISR for partition [__consumer_offsets,30] from 61,77 to
> 61,77,45 (kafka.cluster.Partition)
> [2015-09-01 13:21:23,905] INFO Partition [logs.test_imp,44] on broker 61:
> Expanding ISR for partition [logs.test_imp,44] from 61 to 61,45
> (kafka.cluster.Partition)
>
> Looks like we lost part of our data.
>
> Also kafka started to replicating random partitions (bad broker was already
> up and running, log recovery was completed):
>
> root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
> zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc
> -l
> Tue Sep  1 13:02:24 UTC 2015
> 431
> root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
> zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc
> -l
> Tue Sep  1 13:02:37 UTC 2015
> 386
> root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
> zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc
> -l
> Tue Sep  1 13:02:48 UTC 2015
> 501
> root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
> zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc
> -l
> Tue Sep  1 13:02:58 UTC 2015
> 288
> root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
> zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc
> -l
> Tue Sep  1 13:03:08 UTC 2015
> 363
>
> Could anyone throw some light on this situation?
>
> We use ext4 on our brokers and these settings:
>
> port=9092
> num.network.threads=2
> num.io.threads=8
> socket.send.buffer.bytes=1048576
> socket.receive.buffer.bytes=1048576
> socket.request.max.bytes=104857600
> log.dirs=/mnt/kafka/kafka-data
> num.partitions=1
> default.replication.factor=2
> message.max.bytes=10000000
> replica.fetch.max.bytes=10000000
> auto.create.topics.enable=false
> log.roll.hours=24
> num.replica.fetchers=4
> auto.leader.rebalance.enable=true
> log.retention.hours=168
> log.segment.bytes=134217728
> log.retention.check.interval.ms=60000
> log.cleaner.enable=false
> delete.topic.enable=true
>
> zookeeper.connect=zk1d.gce-eu.kafka:2181,zk2d.gce-eu.kafka:2181,zk3d.gce-eu.kafka:2181/kafka
> zookeeper.connection.timeout.ms=6000
>
> Shall I do something with these parameters or cluster with 3 brokers and
> with replica-factor=2 should prevent such issues?
>
> log.flush.interval.ms
> log.flush.interval.messageslog.flush.scheduler.interval.ms
>
> THX!
>
>
> --
> Best regards,
> Gleb Zhukov
>

Re: Kafka loses data after one broker reboot

Posted by Lukas Steiblys <lu...@doubledutch.me>.
Hi Gleb,

Are you sure you were not trying to read data that had already exceeded its 
retention policy?

Lukas

-----Original Message----- 
From: Gleb Zhukov
Sent: Tuesday, September 1, 2015 6:26 AM
To: users@kafka.apache.org
Subject: Kafka loses data after one broker reboot

Hi, All!
We use 3 kafka's brokers with replica-factor 2. Today we were doing
partitions reassignment and one of our brokers was rebooted due some
hardware problem. After broker had returned back to work we found that our
consumer doesn't work with errors like:

ERROR java.lang.AssertionError: assumption failed: 765994 exceeds 6339
ERROR java.lang.AssertionError: assumption failed: 1501252 exceeds 416522
ERROR java.lang.AssertionError: assumption failed: 950819 exceeds 805377

Some logs from broker:

[2015-09-01 13:00:16,976] ERROR [Replica Manager on Broker 61]: Error when
processing fetch request for partition [avro_match,27] offset 208064729
from consumer with correlation id 0. Possible cause: Request for offset
208064729 but we only have log segments in the range 209248794 to
250879159. (kafka.server.ReplicaManager)
[2015-09-01 13:01:17,943] ERROR [Replica Manager on Broker 45]: Error when
processing fetch request for partition [logs.conv_expired,20] offset 454
from consumer with correlation id 0. Possible cause: Request for offset 454
but we only have log segments in the range 1349769 to 1476231.
(kafka.server.ReplicaManager)

[2015-09-01 13:21:23,896] INFO Partition [logs.avro_event,29] on broker 61:
Expanding ISR for partition [logs.avro_event,29] from 61,77 to 61,77,45
(kafka.cluster.Partition)
[2015-09-01 13:21:23,899] INFO Partition [logs.imp_tstvssamza,6] on broker
61: Expanding ISR for partition [logs.imp_tstvssamza,6] from 61,77 to
61,77,45 (kafka.cluster.Partition)
[2015-09-01 13:21:23,902] INFO Partition [__consumer_offsets,30] on broker
61: Expanding ISR for partition [__consumer_offsets,30] from 61,77 to
61,77,45 (kafka.cluster.Partition)
[2015-09-01 13:21:23,905] INFO Partition [logs.test_imp,44] on broker 61:
Expanding ISR for partition [logs.test_imp,44] from 61 to 61,45
(kafka.cluster.Partition)

Looks like we lost part of our data.

Also kafka started to replicating random partitions (bad broker was already
up and running, log recovery was completed):

root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc -l
Tue Sep  1 13:02:24 UTC 2015
431
root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc -l
Tue Sep  1 13:02:37 UTC 2015
386
root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc -l
Tue Sep  1 13:02:48 UTC 2015
501
root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc -l
Tue Sep  1 13:02:58 UTC 2015
288
root@kafka2d:~# date && /usr/lib/kafka/bin/kafka-topics.sh --zookeeper
zk-pool.gce-eu.kafka/kafka --under-replicated-partitions --describe  | wc -l
Tue Sep  1 13:03:08 UTC 2015
363

Could anyone throw some light on this situation?

We use ext4 on our brokers and these settings:

port=9092
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mnt/kafka/kafka-data
num.partitions=1
default.replication.factor=2
message.max.bytes=10000000
replica.fetch.max.bytes=10000000
auto.create.topics.enable=false
log.roll.hours=24
num.replica.fetchers=4
auto.leader.rebalance.enable=true
log.retention.hours=168
log.segment.bytes=134217728
log.retention.check.interval.ms=60000
log.cleaner.enable=false
delete.topic.enable=true
zookeeper.connect=zk1d.gce-eu.kafka:2181,zk2d.gce-eu.kafka:2181,zk3d.gce-eu.kafka:2181/kafka
zookeeper.connection.timeout.ms=6000

Shall I do something with these parameters or cluster with 3 brokers and
with replica-factor=2 should prevent such issues?

log.flush.interval.ms
log.flush.interval.messageslog.flush.scheduler.interval.ms

THX!


-- 
Best regards,
Gleb Zhukov