You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mark Smith <ma...@qq.is> on 2016/11/18 01:39:37 UTC

Investigating apparent data loss during preferred replica election

Hey folks,

I work at Dropbox and I was doing some maintenance yesterday and it
looks like we lost some committed data during a preferred replica
election. As far as I understand this shouldn't happen, but I have a
theory and want to run it by ya'll.

Preamble:
* Kafka 0.9.0.1
* required.acks = -1 (All)
* min.insync.replicas = 2 (only 2 replicas for the partition, so we
require both to have the data)
* consumer is Kafka Connect
* 1400 topics, total of about 15,000 partitions
* 30 brokers

I was performing some rolling restarts of brokers yesterday as part of
our regular DRT (disaster testing) process and at the end that always
leaves many partitions that need to be failed back to the preferred
replica. There were about 8,000 partitions that needed moving. I started
the election in Kafka Manager and it worked, but it looks like 4 of
those 8,000 partitions experienced some relatively small amount of data
loss at the tail.

From the Kafka Connect point of view, we saw a handful of these:

[2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-analytics-staging-8-5]
INFO Fetch offset 67614479952 is out of range, resetting offset
(o.a.k.c.c.i.Fetcher:595)

I believe that was because it asked the new leader for data and the new
leader had less data than the old leader. Indeed, the old leader became
a follower and immediately truncated:

2016-11-17 02:55:27,237 INFO log.Log: Truncating log
goscribe.client-host_activity-21 to offset 67614479601.

Given the above production settings I don't know why KC would ever see
an OffsetOutOfRange error but this caused KC to reset to the beginning
of the partition. Various broker logs for the failover paint the
following timeline:
https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6

My current working theory that I'd love eyes on:

  1. Leader receives produce request and appends to log, incrementing
  LEO, but given the durability requirements the HW is not incremented
  and the produce response is delayed (normal)

  2. Replica sends Fetch request to leader as part of normal replication
  flow

  3. Leader increments HW when it STARTS to respond to the Fetch request
  (per fetchMessages in ReplicaManager.scala), so the HW is updated as
  soon as we've prepared messages for response -- importantly the HW is
  updated even though the replica has not yet actually seen the
  messages, even given the durability settings we've got

  4. Meanwhile, Kafka Connect sends Fetch request to leader and receives
  the messages below the new HW, but the messages have not actually been
  received by the replica yet still

  5. Preferred replica election begins (oh the travesty!)

  6. Replica starts the become-leader process and makeLeader removes
  this partition from partitionMap, which means when the response comes
  in finally, we ignore it (we discard the old-leader committed
  messages)

  7. Old-leader starts become-follower process and truncates to the HW
  of the new-leader i.e. the old-leader has now thrown away data it had
  committed and given out moments ago

  8. Kafka Connect sends Fetch request to the new-leader but its offset
  is now greater than the HW of the new-leader, so we get the
  OffsetOutOfRange error and restart

Can someone tell me whether or not this is plausible? If it is, is there
a known issue/bug filed for it? I'm not exactly sure what the solution
is, but it does seem unfortunate that a normal operation (leader
election with both brokers alive and well) can result in the loss of
committed messages.

And, if my theory doesn't hold, can anybody explain what happened? I'm
happy to provide more logs or whatever.

Thanks!


-- 
Mark Smith
mark@qq.is

Re: Investigating apparent data loss during preferred replica election

Posted by Mark Smith <ma...@qq.is>.
Correct, we've disabled unclean leader election. There were also no log
messages from an unclean election. I believe that Kafka thinks it
performed a clean election and still lost data.


--

Mark Smith

mark@qq.is





On Thu, Nov 17, 2016, at 06:23 PM, Tauzell, Dave wrote:

> Do you have:

>

> Unclean.leader.election.enable = false ?

>

> Dave

>

> > On Nov 17, 2016, at 19:39, Mark Smith <ma...@qq.is> wrote:

> >

> > Hey folks,

> >

> > I work at Dropbox and I was doing some maintenance yesterday and it
> > looks like we lost some committed data during a preferred replica

> > election. As far as I understand this shouldn't happen, but I have a
> > theory and want to run it by ya'll.

> >

> > Preamble:

> > * Kafka 0.9.0.1

> > * required.acks = -1 (All)

> > * min.insync.replicas = 2 (only 2 replicas for the partition, so we
> > require both to have the data)

> > * consumer is Kafka Connect

> > * 1400 topics, total of about 15,000 partitions

> > * 30 brokers

> >

> > I was performing some rolling restarts of brokers yesterday as
> > part of
> > our regular DRT (disaster testing) process and at the end that
> > always
> > leaves many partitions that need to be failed back to the preferred
> > replica. There were about 8,000 partitions that needed moving. I
> > started
> > the election in Kafka Manager and it worked, but it looks like 4 of
> > those 8,000 partitions experienced some relatively small amount
> > of data
> > loss at the tail.

> >

> > From the Kafka Connect point of view, we saw a handful of these:

> >

> > [2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-analytics-staging-8-
> > 5]
> > INFO Fetch offset 67614479952 is out of range, resetting offset

> > (o.a.k.c.c.i.Fetcher:595)

> >

> > I believe that was because it asked the new leader for data and
> > the new
> > leader had less data than the old leader. Indeed, the old leader
> > became
> > a follower and immediately truncated:

> >

> > 2016-11-17 02:55:27,237 INFO log.Log: Truncating log

> > goscribe.client-host_activity-21 to offset 67614479601.

> >

> > Given the above production settings I don't know why KC would
> > ever see
> > an OffsetOutOfRange error but this caused KC to reset to the
> > beginning
> > of the partition. Various broker logs for the failover paint the

> > following timeline:

> > https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6

> >

> > My current working theory that I'd love eyes on:

> >

> >  1. Leader receives produce request and appends to log, incrementing
> >  LEO, but given the durability requirements the HW is not
> >  incremented
> >  and the produce response is delayed (normal)

> >

> >  2. Replica sends Fetch request to leader as part of normal
> >     replication
> >  flow

> >

> >  3. Leader increments HW when it STARTS to respond to the Fetch
> >     request
> >  (per fetchMessages in ReplicaManager.scala), so the HW is
> >  updated as
> >  soon as we've prepared messages for response -- importantly the
> >  HW is
> >  updated even though the replica has not yet actually seen the

> >  messages, even given the durability settings we've got

> >

> >  4. Meanwhile, Kafka Connect sends Fetch request to leader and
> >     receives
> >  the messages below the new HW, but the messages have not
> >  actually been
> >  received by the replica yet still

> >

> >  5. Preferred replica election begins (oh the travesty!)

> >

> >  6. Replica starts the become-leader process and makeLeader removes
> >  this partition from partitionMap, which means when the response
> >  comes
> >  in finally, we ignore it (we discard the old-leader committed

> >  messages)

> >

> >  7. Old-leader starts become-follower process and truncates to the
> >     HW
> >  of the new-leader i.e. the old-leader has now thrown away data
> >  it had
> >  committed and given out moments ago

> >

> >  8. Kafka Connect sends Fetch request to the new-leader but its
> >     offset
> >  is now greater than the HW of the new-leader, so we get the

> >  OffsetOutOfRange error and restart

> >

> > Can someone tell me whether or not this is plausible? If it is,
> > is there
> > a known issue/bug filed for it? I'm not exactly sure what the
> > solution
> > is, but it does seem unfortunate that a normal operation (leader

> > election with both brokers alive and well) can result in the loss of
> > committed messages.

> >

> > And, if my theory doesn't hold, can anybody explain what
> > happened? I'm
> > happy to provide more logs or whatever.

> >

> > Thanks!

> >

> >

> > --

> > Mark Smith

> > mark@qq.is

> This e-mail and any files transmitted with it are confidential, may

> contain sensitive information, and are intended solely for the
> use of the
> individual or entity to whom they are addressed. If you have received
> this e-mail in error, please notify the sender by reply e-mail

> immediately and destroy all copies of the e-mail and any attachments.


Re: Investigating apparent data loss during preferred replica election

Posted by "Tauzell, Dave" <Da...@surescripts.com>.
Do you have:

Unclean.leader.election.enable = false ?

Dave

> On Nov 17, 2016, at 19:39, Mark Smith <ma...@qq.is> wrote:
>
> Hey folks,
>
> I work at Dropbox and I was doing some maintenance yesterday and it
> looks like we lost some committed data during a preferred replica
> election. As far as I understand this shouldn't happen, but I have a
> theory and want to run it by ya'll.
>
> Preamble:
> * Kafka 0.9.0.1
> * required.acks = -1 (All)
> * min.insync.replicas = 2 (only 2 replicas for the partition, so we
> require both to have the data)
> * consumer is Kafka Connect
> * 1400 topics, total of about 15,000 partitions
> * 30 brokers
>
> I was performing some rolling restarts of brokers yesterday as part of
> our regular DRT (disaster testing) process and at the end that always
> leaves many partitions that need to be failed back to the preferred
> replica. There were about 8,000 partitions that needed moving. I started
> the election in Kafka Manager and it worked, but it looks like 4 of
> those 8,000 partitions experienced some relatively small amount of data
> loss at the tail.
>
> From the Kafka Connect point of view, we saw a handful of these:
>
> [2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-analytics-staging-8-5]
> INFO Fetch offset 67614479952 is out of range, resetting offset
> (o.a.k.c.c.i.Fetcher:595)
>
> I believe that was because it asked the new leader for data and the new
> leader had less data than the old leader. Indeed, the old leader became
> a follower and immediately truncated:
>
> 2016-11-17 02:55:27,237 INFO log.Log: Truncating log
> goscribe.client-host_activity-21 to offset 67614479601.
>
> Given the above production settings I don't know why KC would ever see
> an OffsetOutOfRange error but this caused KC to reset to the beginning
> of the partition. Various broker logs for the failover paint the
> following timeline:
> https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6
>
> My current working theory that I'd love eyes on:
>
>  1. Leader receives produce request and appends to log, incrementing
>  LEO, but given the durability requirements the HW is not incremented
>  and the produce response is delayed (normal)
>
>  2. Replica sends Fetch request to leader as part of normal replication
>  flow
>
>  3. Leader increments HW when it STARTS to respond to the Fetch request
>  (per fetchMessages in ReplicaManager.scala), so the HW is updated as
>  soon as we've prepared messages for response -- importantly the HW is
>  updated even though the replica has not yet actually seen the
>  messages, even given the durability settings we've got
>
>  4. Meanwhile, Kafka Connect sends Fetch request to leader and receives
>  the messages below the new HW, but the messages have not actually been
>  received by the replica yet still
>
>  5. Preferred replica election begins (oh the travesty!)
>
>  6. Replica starts the become-leader process and makeLeader removes
>  this partition from partitionMap, which means when the response comes
>  in finally, we ignore it (we discard the old-leader committed
>  messages)
>
>  7. Old-leader starts become-follower process and truncates to the HW
>  of the new-leader i.e. the old-leader has now thrown away data it had
>  committed and given out moments ago
>
>  8. Kafka Connect sends Fetch request to the new-leader but its offset
>  is now greater than the HW of the new-leader, so we get the
>  OffsetOutOfRange error and restart
>
> Can someone tell me whether or not this is plausible? If it is, is there
> a known issue/bug filed for it? I'm not exactly sure what the solution
> is, but it does seem unfortunate that a normal operation (leader
> election with both brokers alive and well) can result in the loss of
> committed messages.
>
> And, if my theory doesn't hold, can anybody explain what happened? I'm
> happy to provide more logs or whatever.
>
> Thanks!
>
>
> --
> Mark Smith
> mark@qq.is
This e-mail and any files transmitted with it are confidential, may contain sensitive information, and are intended solely for the use of the individual or entity to whom they are addressed. If you have received this e-mail in error, please notify the sender by reply e-mail immediately and destroy all copies of the e-mail and any attachments.

Re: Investigating apparent data loss during preferred replica election

Posted by Mark Smith <ma...@qq.is>.
Jun,


I see what's going on -- the leader doesn't update its HW as soon as the
follower has requested the messages, it updates when the follower
requests the _next_ messages. I.e., it infers that because the follower
requested from offset 38 that everything <= 37 is durable.

This makes sense and means my understanding was wrong and this wasn't an
issue. Thanks for helping clear that up.


This means there is still an unresolved issue, unfortunately. I can
replicate the conditions that led to it and see if I can reproduce the
problem. If so, I'll update this thread again.


--

Mark Smith

mark@qq.is





On Mon, Nov 21, 2016, at 06:59 PM, Jun Rao wrote:

> Hi, Mark,

> 

> So you did the manual leader election after the cluster is stabilized
> (i.e, all replicas are in sync)? Then, it may not be related to the
> issue that I described.
> 

> If there is just a single leadership change, what you described
> shouldn't happen by design. I modified your steps to the following
> according to the design and I am not sure how the message can be lost.
> 

> 1. Starting point: Leader and Replica both only have up to message
>    #36, HW is at 37
> 2. Client produces new message with required.acks=all at offset 37

> 3.  the produce request is blocked

> 4. Replica fetches messages at offset 37

> 5. Leader's HW still at 37 and the produce request still blocked

> 5.1 Replica receives message at 37 and appends it to local log.

> 5.2 Replica fetches messages at offset 38

> 5.3 Leader's HW moves to 38 and the produce request is unblocked

> 5.4 Replica's HW still at 37

> 6. PREFERRED REPLICA ELECTION BEGIN

> 8. Replica becomes leader; no truncation is done; message at offset 37
>    is preserved; HW still at 37
> 9. Connect issues a fetch request at 38 to replica and gets an empty
>    response instead of OffsetOutOfRangeException since the log end
>    offset is at 38.
> 9. Leader becomes follower; truncate to HW 38, keeping message at
>    offset 37.
> 10. Leader starts fetch from offset 38

> 11. Replica moves HW to 38

> 12. Message #37 is preserved in both replicas and is not lost

> 

> 

> BTW, do you have unclean leader election disabled? Is this issue
> reproducible? If so, we can enable some debug level logging to see
> what's causing this. Now, I am also not sure if this is a broker side
> issue or a consumer side issue.
> 

> Thanks,

> 

> Jun

> 

> 

> On Mon, Nov 21, 2016 at 5:20 PM, Mark Smith <ma...@qq.is> wrote:

>> __

>> Jun,

>> 

>> Yeah, I probably have an off-by-one issue in the HW description. I
>> think you could pick any number here and the problem remains -- could
>> you read through the steps I posted and see if they logically make
>> sense, numbers aside?
>> 

>> We definitely lost data in 4 partitions of the 8,000 that were
>> elected, and there was only a single election for each partition. We
>> had done a rolling restart hours before, but that had been done for a
>> long time and everything was stable. We do not allow automatic
>> election, it's a manual process that we initiate after the cluster
>> has stabilized.
>> 

>> So in this case, I still don't think any discussion about multiple-
>> failovers is germane to the problem we saw. Each of our partitions
>> only had a single failover, and yet 4 of them still truncated
>> committed data.
>> 

>> --

>> Mark Smith

>> mark@qq.is

>> 

>> 

>> On Mon, Nov 21, 2016, at 05:12 PM, Jun Rao wrote:

>>> Hi, Mark,

>>> 

>>> Hmm, the committing of a message at offset X is the equivalent of
>>> saying that the HW is at offset X + 1. So, in your example, if the
>>> producer publishes a new message at offset 37, this message won't be
>>> committed (i.e., HW moves to offset 38) until the leader sees the
>>> follower fetch from offset 38 (not offset 37). At that point, the
>>> follower would have received message at offset 37 in the fetch
>>> response and appended that message to its local log. If the follower
>>> now becomes the new leader, message at offset 37 is preserved.
>>> 

>>> The problem that I described regarding data loss can happen during a
>>> rolling restart. Suppose that you have 3 replicas A, B, and C. Let's
>>> say A is the preferred the leader, but during the deployment, the
>>> leader gets moved to replica B at some point and all 3 replicas are
>>> in sync. A new message is produced at offset 37 and is committed
>>> (leader's HW =38). However, the HW in replica A is still at 37. Now,
>>> we try to shutdown broker B and the leader gets moved to replica C.
>>> Replica A starts to follow replica C and it first truncates to HW
>>> 37, which removes the message at offset 37. Now, preferred leader
>>> logic kicks in and the leadership switches again to replica A. Since
>>> A doesn't have message at offset 37 any more and all followers copy
>>> messages from replica A, message at offset 37 is lost.
>>> 

>>> With KAFKA-3670, in the above example, when shutting down broker B,
>>> the leader will be directly moved to replica A since it's a
>>> preferred replica. So the above scenario won't happen.
>>> 

>>> The more complete fix is in KAFKA-1211. The logic for getting the
>>> latest generation snapshot is just a proposal and is not in the code
>>> base yet.
>>> 

>>> Thanks,

>>> 

>>> Jun

>>> 

>>> On Mon, Nov 21, 2016 at 3:20 PM, Mark Smith <ma...@qq.is> wrote:

>>>> Jun,

>>>> 

>>>> Thanks for the reply!

>>>> 

>>>> I am aware the HW won't advance until the in-sync replicas have

>>>> _requested_ the messages. However, I believe the issue is that the
>>>> leader has no guarantee the replicas have _received_ the fetch
>>>> response.
>>>> There is no second-phase to the commit.

>>>> 

>>>> So, in the particular case where a leader transition happens, I
>>>> believe
>>>> this race condition exists (and I'm happy to be wrong here, but it
>>>> looks
>>>> feasible and explains the data loss I saw):

>>>> 

>>>> 1. Starting point: Leader and Replica both only have up to message
>>>>    #36
>>>> 2. Client produces new message with required.acks=all

>>>> 3. Leader commits message #37, but HW is still #36, the produce
>>>>    request
>>>> is blocked

>>>> 4. Replica fetches messages (leader has RECEIVED the fetch request)
>>>> 5. Leader then advances HW to #37 and unblocks the produce request,
>>>> client believes it's durable

>>>> 6. PREFERRED REPLICA ELECTION BEGIN

>>>> 7. Replica starts become-leader process

>>>> 8. Leader finishes sending fetch response, replica is just now
>>>>    seeing
>>>> message #37

>>>> 9. Replica throws away fetch response from step 4 because it is now
>>>> becoming leader (partition has been removed from partitionMap so it
>>>> looks like data is ignored)

>>>> 10. Leader starts become-follower

>>>> 11. Leader truncates to replica HW offset of #36

>>>> 12. Message #37 was durably committed but is now lost

>>>> 

>>>> For the tickets you linked:

>>>> 

>>>> https://issues.apache.org/jira/browse/KAFKA-3670

>>>> * There was no shutdown involved in this case, so this shouldn't be
>>>> impacting.

>>>> 

>>>> https://issues.apache.org/jira/browse/KAFKA-1211

>>>> * I've read through this but I'm not entirely sure if it addresses
>>>>   the
>>>> above. I don't think it does, though. I don't see a step in the
>>>> ticket
>>>> about become-leader making a call to the old leader to get the
>>>> latest
>>>> generation snapshot?

>>>> 
>>>> --
>>>>  Mark Smith
>>>> mark@qq.is

>>>> 

>>>> On Fri, Nov 18, 2016, at 10:52 AM, Jun Rao wrote:

>>>> > Mark,

>>>> >

>>>> > Thanks for reporting this. First, a clarification. The HW is
>>>> > actually
>>>> > never

>>>> > advanced until all in-sync followers have fetched the
>>>> > corresponding
>>>> > message. For example, in step 2, if all follower replicas issue a
>>>> > fetch
>>>> > request at offset 10, it serves as an indication that all
>>>> > replicas have
>>>> > received messages up to offset 9. So,only then, the HW is
>>>> > advanced to
>>>> > offset 10 (which is not inclusive).

>>>> >

>>>> > I think the problem that you are seeing are probably caused by
>>>> > two known
>>>> > issues. The first one is

>>>> > https://issues.apache.org/jira/browse/KAFKA-1211.

>>>> > The issue is that the HW is propagated asynchronously from the
>>>> > leader to
>>>> > the followers. If the leadership changes multiple time very
>>>> > quickly, what
>>>> > can happen is that a follower first truncates its data up to HW
>>>> > and then
>>>> > immediately becomes the new leader. Since the follower's HW may
>>>> > not be up
>>>> > to date, some previously committed messages could be lost. The
>>>> > second one
>>>> > is https://issues.apache.org/jira/browse/KAFKA-3670. The issue is
>>>> > that
>>>> > controlled shutdown and leader balancing can cause leadership to
>>>> > change
>>>> > more than once quickly, which could expose the data loss problem
>>>> > in the
>>>> > first issue.

>>>> >

>>>> > The second issue has been fixed in 0.10.0. So, if you upgrade to
>>>> > that
>>>> > version or above, it should reduce the chance of hitting the
>>>> > first issue
>>>> > significantly. We are actively working on the first issue and
>>>> > hopefully
>>>> > it

>>>> > will be addressed in the next release.

>>>> >

>>>> > Jun

>>>> >

>>>> > On Thu, Nov 17, 2016 at 5:39 PM, Mark Smith <ma...@qq.is> wrote:

>>>> >

>>>> > > Hey folks,

>>>> > >

>>>> > > I work at Dropbox and I was doing some maintenance yesterday
>>>> > > and it
>>>> > > looks like we lost some committed data during a preferred
>>>> > > replica
>>>> > > election. As far as I understand this shouldn't happen, but I
>>>> > > have a
>>>> > > theory and want to run it by ya'll.

>>>> > >

>>>> > > Preamble:

>>>> > > * Kafka 0.9.0.1

>>>> > > * required.acks = -1 (All)

>>>> > > * min.insync.replicas = 2 (only 2 replicas for the partition,
>>>> > >   so we
>>>> > > require both to have the data)

>>>> > > * consumer is Kafka Connect

>>>> > > * 1400 topics, total of about 15,000 partitions

>>>> > > * 30 brokers

>>>> > >

>>>> > > I was performing some rolling restarts of brokers yesterday as
>>>> > > part of
>>>> > > our regular DRT (disaster testing) process and at the end that
>>>> > > always
>>>> > > leaves many partitions that need to be failed back to the
>>>> > > preferred
>>>> > > replica. There were about 8,000 partitions that needed moving.
>>>> > > I started
>>>> > > the election in Kafka Manager and it worked, but it looks like
>>>> > > 4 of
>>>> > > those 8,000 partitions experienced some relatively small amount
>>>> > > of data
>>>> > > loss at the tail.

>>>> > >

>>>> > > From the Kafka Connect point of view, we saw a handful of
>>>> > > these:
>>>> > >

>>>> > > [2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-analytics-staging-8-
>>>> > > 5]
>>>> > > INFO Fetch offset 67614479952 is out of range, resetting offset
>>>> > > (o.a.k.c.c.i.Fetcher:595)

>>>> > >

>>>> > > I believe that was because it asked the new leader for data and
>>>> > > the new
>>>> > > leader had less data than the old leader. Indeed, the old
>>>> > > leader became
>>>> > > a follower and immediately truncated:

>>>> > >

>>>> > > 2016-11-17 02:55:27,237 INFO log.Log: Truncating log

>>>> > > goscribe.client-host_activity-21 to offset 67614479601.

>>>> > >

>>>> > > Given the above production settings I don't know why KC would
>>>> > > ever see
>>>> > > an OffsetOutOfRange error but this caused KC to reset to the
>>>> > > beginning
>>>> > > of the partition. Various broker logs for the failover paint
>>>> > > the
>>>> > > following timeline:

>>>> > > https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6
>>>> > >

>>>> > > My current working theory that I'd love eyes on:

>>>> > >

>>>> > >   1. Leader receives produce request and appends to log,
>>>> > >      incrementing
>>>> > >   LEO, but given the durability requirements the HW is not
>>>> > >   incremented
>>>> > >   and the produce response is delayed (normal)

>>>> > >

>>>> > >   2. Replica sends Fetch request to leader as part of normal
>>>> > >      replication
>>>> > >   flow

>>>> > >

>>>> > >   3. Leader increments HW when it STARTS to respond to the
>>>> > >      Fetch request
>>>> > >   (per fetchMessages in ReplicaManager.scala), so the HW is
>>>> > >   updated as
>>>> > >   soon as we've prepared messages for response -- importantly
>>>> > >   the HW is
>>>> > >   updated even though the replica has not yet actually seen the
>>>> > >   messages, even given the durability settings we've got

>>>> > >

>>>> > >   4. Meanwhile, Kafka Connect sends Fetch request to leader and
>>>> > >      receives
>>>> > >   the messages below the new HW, but the messages have not
>>>> > >   actually been
>>>> > >   received by the replica yet still

>>>> > >

>>>> > >   5. Preferred replica election begins (oh the travesty!)

>>>> > >

>>>> > >   6. Replica starts the become-leader process and makeLeader
>>>> > >      removes
>>>> > >   this partition from partitionMap, which means when the
>>>> > >   response comes
>>>> > >   in finally, we ignore it (we discard the old-leader committed
>>>> > >   messages)

>>>> > >

>>>> > >   7. Old-leader starts become-follower process and truncates to
>>>> > >      the HW
>>>> > >   of the new-leader i.e. the old-leader has now thrown away
>>>> > >   data it had
>>>> > >   committed and given out moments ago

>>>> > >

>>>> > >   8. Kafka Connect sends Fetch request to the new-leader but
>>>> > >      its offset
>>>> > >   is now greater than the HW of the new-leader, so we get the

>>>> > >   OffsetOutOfRange error and restart

>>>> > >

>>>> > > Can someone tell me whether or not this is plausible? If it is,
>>>> > > is there
>>>> > > a known issue/bug filed for it? I'm not exactly sure what the
>>>> > > solution
>>>> > > is, but it does seem unfortunate that a normal operation
>>>> > > (leader
>>>> > > election with both brokers alive and well) can result in the
>>>> > > loss of
>>>> > > committed messages.

>>>> > >

>>>> > > And, if my theory doesn't hold, can anybody explain what
>>>> > > happened? I'm
>>>> > > happy to provide more logs or whatever.

>>>> > >

>>>> > > Thanks!

>>>> > >

>>>> > >

>>>> > > --

>>>> > > Mark Smith

>>>> > > mark@qq.is

>>>> > >

>> 



Re: Investigating apparent data loss during preferred replica election

Posted by Jun Rao <ju...@confluent.io>.
Hi, Mark,

So you did the manual leader election after the cluster is stabilized (i.e,
all replicas are in sync)? Then, it may not be related to the issue that I
described.

If there is just a single leadership change, what you described shouldn't
happen by design. I modified your steps to the following according to the
design and I am not sure how the message can be lost.

1. Starting point: Leader and Replica both only have up to message #36, HW
is at 37
2. Client produces new message with required.acks=all at offset 37
3.  the produce request is blocked
4. Replica fetches messages at offset 37
5. Leader's HW still at 37 and the produce request still blocked
5.1 Replica receives message at 37 and appends it to local log.
5.2 Replica fetches messages at offset 38
5.3 Leader's HW moves to 38 and the produce request is unblocked
5.4 Replica's HW still at 37
6. PREFERRED REPLICA ELECTION BEGIN
8. Replica becomes leader; no truncation is done; message at offset 37 is
preserved; HW still at 37
9. Connect issues a fetch request at 38 to replica and gets an empty
response instead of OffsetOutOfRangeException since the log end offset is
at 38.
9. Leader becomes follower; truncate to HW 38, keeping message at offset 37.
10. Leader starts fetch from offset 38
11. Replica moves HW to 38
12. Message #37 is preserved in both replicas and is not lost


BTW, do you have unclean leader election disabled? Is this issue
reproducible? If so, we can enable some debug level logging to see what's
causing this. Now, I am also not sure if this is a broker side issue or a
consumer side issue.

Thanks,

Jun


On Mon, Nov 21, 2016 at 5:20 PM, Mark Smith <ma...@qq.is> wrote:

> Jun,
>
> Yeah, I probably have an off-by-one issue in the HW description. I think
> you could pick any number here and the problem remains -- could you read
> through the steps I posted and see if they logically make sense, numbers
> aside?
>
> We definitely lost data in 4 partitions of the 8,000 that were elected,
> and there was only a single election for each partition. We had done a
> rolling restart hours before, but that had been done for a long time and
> everything was stable. We do not allow automatic election, it's a manual
> process that we initiate after the cluster has stabilized.
>
> So in this case, I still don't think any discussion about
> multiple-failovers is germane to the problem we saw. Each of our partitions
> only had a single failover, and yet 4 of them still truncated committed
> data.
>
> --
> Mark Smith
> mark@qq.is
>
>
> On Mon, Nov 21, 2016, at 05:12 PM, Jun Rao wrote:
>
> Hi, Mark,
>
> Hmm, the committing of a message at offset X is the equivalent of saying
> that the HW is at offset X + 1. So, in your example, if the producer
> publishes a new message at offset 37, this message won't be committed
> (i.e., HW moves to offset 38) until the leader sees the follower fetch from
> offset 38 (not offset 37). At that point, the follower would have received
> message at offset 37 in the fetch response and appended that message to its
> local log. If the follower now becomes the new leader, message at offset 37
> is preserved.
>
> The problem that I described regarding data loss can happen during a
> rolling restart. Suppose that you have 3 replicas A, B, and C. Let's say A
> is the preferred the leader, but during the deployment, the leader gets
> moved to replica B at some point and all 3 replicas are in sync. A new
> message is produced at offset 37 and is committed (leader's HW =38).
> However, the HW in replica A is still at 37. Now, we try to shutdown broker
> B and the leader gets moved to replica C. Replica A starts to follow
> replica C and it first truncates to HW 37, which removes the message at
> offset 37. Now, preferred leader logic kicks in and the leadership switches
> again to replica A. Since A doesn't have message at offset 37 any more and
> all followers copy messages from replica A, message at offset 37 is lost.
>
> With KAFKA-3670, in the above example, when shutting down broker B, the
> leader will be directly moved to replica A since it's a preferred replica.
> So the above scenario won't happen.
>
> The more complete fix is in KAFKA-1211. The logic for getting the latest
> generation snapshot is just a proposal and is not in the code base yet.
>
> Thanks,
>
> Jun
>
> On Mon, Nov 21, 2016 at 3:20 PM, Mark Smith <ma...@qq.is> wrote:
>
> Jun,
>
> Thanks for the reply!
>
> I am aware the HW won't advance until the in-sync replicas have
> _requested_ the messages. However, I believe the issue is that the
> leader has no guarantee the replicas have _received_ the fetch response.
> There is no second-phase to the commit.
>
> So, in the particular case where a leader transition happens, I believe
> this race condition exists (and I'm happy to be wrong here, but it looks
> feasible and explains the data loss I saw):
>
> 1. Starting point: Leader and Replica both only have up to message #36
> 2. Client produces new message with required.acks=all
> 3. Leader commits message #37, but HW is still #36, the produce request
> is blocked
> 4. Replica fetches messages (leader has RECEIVED the fetch request)
> 5. Leader then advances HW to #37 and unblocks the produce request,
> client believes it's durable
> 6. PREFERRED REPLICA ELECTION BEGIN
> 7. Replica starts become-leader process
> 8. Leader finishes sending fetch response, replica is just now seeing
> message #37
> 9. Replica throws away fetch response from step 4 because it is now
> becoming leader (partition has been removed from partitionMap so it
> looks like data is ignored)
> 10. Leader starts become-follower
> 11. Leader truncates to replica HW offset of #36
> 12. Message #37 was durably committed but is now lost
>
> For the tickets you linked:
>
> https://issues.apache.org/jira/browse/KAFKA-3670
> * There was no shutdown involved in this case, so this shouldn't be
> impacting.
>
> https://issues.apache.org/jira/browse/KAFKA-1211
> * I've read through this but I'm not entirely sure if it addresses the
> above. I don't think it does, though. I don't see a step in the ticket
> about become-leader making a call to the old leader to get the latest
> generation snapshot?
>
> --
> Mark Smith
> mark@qq.is
>
> On Fri, Nov 18, 2016, at 10:52 AM, Jun Rao wrote:
> > Mark,
> >
> > Thanks for reporting this. First, a clarification. The HW is actually
> > never
> > advanced until all in-sync followers have fetched the corresponding
> > message. For example, in step 2, if all follower replicas issue a fetch
> > request at offset 10, it serves as an indication that all replicas have
> > received messages up to offset 9. So,only then, the HW is advanced to
> > offset 10 (which is not inclusive).
> >
> > I think the problem that you are seeing are probably caused by two known
> > issues. The first one is
> > https://issues.apache.org/jira/browse/KAFKA-1211.
> > The issue is that the HW is propagated asynchronously from the leader to
> > the followers. If the leadership changes multiple time very quickly, what
> > can happen is that a follower first truncates its data up to HW and then
> > immediately becomes the new leader. Since the follower's HW may not be up
> > to date, some previously committed messages could be lost. The second one
> > is https://issues.apache.org/jira/browse/KAFKA-3670. The issue is that
> > controlled shutdown and leader balancing can cause leadership to change
> > more than once quickly, which could expose the data loss problem in the
> > first issue.
> >
> > The second issue has been fixed in 0.10.0. So, if you upgrade to that
> > version or above, it should reduce the chance of hitting the first issue
> > significantly. We are actively working on the first issue and hopefully
> > it
> > will be addressed in the next release.
> >
> > Jun
> >
> > On Thu, Nov 17, 2016 at 5:39 PM, Mark Smith <ma...@qq.is> wrote:
> >
> > > Hey folks,
> > >
> > > I work at Dropbox and I was doing some maintenance yesterday and it
> > > looks like we lost some committed data during a preferred replica
> > > election. As far as I understand this shouldn't happen, but I have a
> > > theory and want to run it by ya'll.
> > >
> > > Preamble:
> > > * Kafka 0.9.0.1
> > > * required.acks = -1 (All)
> > > * min.insync.replicas = 2 (only 2 replicas for the partition, so we
> > > require both to have the data)
> > > * consumer is Kafka Connect
> > > * 1400 topics, total of about 15,000 partitions
> > > * 30 brokers
> > >
> > > I was performing some rolling restarts of brokers yesterday as part of
> > > our regular DRT (disaster testing) process and at the end that always
> > > leaves many partitions that need to be failed back to the preferred
> > > replica. There were about 8,000 partitions that needed moving. I
> started
> > > the election in Kafka Manager and it worked, but it looks like 4 of
> > > those 8,000 partitions experienced some relatively small amount of data
> > > loss at the tail.
> > >
> > > From the Kafka Connect point of view, we saw a handful of these:
> > >
> > > [2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-analyt
> ics-staging-8-5]
> > > INFO Fetch offset 67614479952 is out of range, resetting offset
> > > (o.a.k.c.c.i.Fetcher:595)
> > >
> > > I believe that was because it asked the new leader for data and the new
> > > leader had less data than the old leader. Indeed, the old leader became
> > > a follower and immediately truncated:
> > >
> > > 2016-11-17 02:55:27,237 INFO log.Log: Truncating log
> > > goscribe.client-host_activity-21 to offset 67614479601.
> > >
> > > Given the above production settings I don't know why KC would ever see
> > > an OffsetOutOfRange error but this caused KC to reset to the beginning
> > > of the partition. Various broker logs for the failover paint the
> > > following timeline:
> > > https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6
> > >
> > > My current working theory that I'd love eyes on:
> > >
> > >   1. Leader receives produce request and appends to log, incrementing
> > >   LEO, but given the durability requirements the HW is not incremented
> > >   and the produce response is delayed (normal)
> > >
> > >   2. Replica sends Fetch request to leader as part of normal
> replication
> > >   flow
> > >
> > >   3. Leader increments HW when it STARTS to respond to the Fetch
> request
> > >   (per fetchMessages in ReplicaManager.scala), so the HW is updated as
> > >   soon as we've prepared messages for response -- importantly the HW is
> > >   updated even though the replica has not yet actually seen the
> > >   messages, even given the durability settings we've got
> > >
> > >   4. Meanwhile, Kafka Connect sends Fetch request to leader and
> receives
> > >   the messages below the new HW, but the messages have not actually
> been
> > >   received by the replica yet still
> > >
> > >   5. Preferred replica election begins (oh the travesty!)
> > >
> > >   6. Replica starts the become-leader process and makeLeader removes
> > >   this partition from partitionMap, which means when the response comes
> > >   in finally, we ignore it (we discard the old-leader committed
> > >   messages)
> > >
> > >   7. Old-leader starts become-follower process and truncates to the HW
> > >   of the new-leader i.e. the old-leader has now thrown away data it had
> > >   committed and given out moments ago
> > >
> > >   8. Kafka Connect sends Fetch request to the new-leader but its offset
> > >   is now greater than the HW of the new-leader, so we get the
> > >   OffsetOutOfRange error and restart
> > >
> > > Can someone tell me whether or not this is plausible? If it is, is
> there
> > > a known issue/bug filed for it? I'm not exactly sure what the solution
> > > is, but it does seem unfortunate that a normal operation (leader
> > > election with both brokers alive and well) can result in the loss of
> > > committed messages.
> > >
> > > And, if my theory doesn't hold, can anybody explain what happened? I'm
> > > happy to provide more logs or whatever.
> > >
> > > Thanks!
> > >
> > >
> > > --
> > > Mark Smith
> > > mark@qq.is
> > >
>
>
>

Re: Investigating apparent data loss during preferred replica election

Posted by Mark Smith <ma...@qq.is>.
Jun,



Yeah, I probably have an off-by-one issue in the HW description. I
think you could pick any number here and the problem remains -- could
you read through the steps I posted and see if they logically make
sense, numbers aside?

We definitely lost data in 4 partitions of the 8,000 that were elected,
and there was only a single election for each partition. We had done a
rolling restart hours before, but that had been done for a long time and
everything was stable. We do not allow automatic election, it's a manual
process that we initiate after the cluster has stabilized.


So in this case, I still don't think any discussion about multiple-
failovers is germane to the problem we saw. Each of our partitions only
had a single failover, and yet 4 of them still truncated committed data.


--

Mark Smith

mark@qq.is





On Mon, Nov 21, 2016, at 05:12 PM, Jun Rao wrote:

> Hi, Mark,

> 

> Hmm, the committing of a message at offset X is the equivalent of
> saying that the HW is at offset X + 1. So, in your example, if the
> producer publishes a new message at offset 37, this message won't be
> committed (i.e., HW moves to offset 38) until the leader sees the
> follower fetch from offset 38 (not offset 37). At that point, the
> follower would have received message at offset 37 in the fetch
> response and appended that message to its local log. If the follower
> now becomes the new leader, message at offset 37 is preserved.
> 

> The problem that I described regarding data loss can happen during a
> rolling restart. Suppose that you have 3 replicas A, B, and C. Let's
> say A is the preferred the leader, but during the deployment, the
> leader gets moved to replica B at some point and all 3 replicas are in
> sync. A new message is produced at offset 37 and is committed
> (leader's HW =38). However, the HW in replica A is still at 37. Now,
> we try to shutdown broker B and the leader gets moved to replica C.
> Replica A starts to follow replica C and it first truncates to HW 37,
> which removes the message at offset 37. Now, preferred leader logic
> kicks in and the leadership switches again to replica A. Since A
> doesn't have message at offset 37 any more and all followers copy
> messages from replica A, message at offset 37 is lost.
> 

> With KAFKA-3670, in the above example, when shutting down broker B,
> the leader will be directly moved to replica A since it's a preferred
> replica. So the above scenario won't happen.
> 

> The more complete fix is in KAFKA-1211. The logic for getting the
> latest generation snapshot is just a proposal and is not in the code
> base yet.
> 

> Thanks,

> 

> Jun

> 

> On Mon, Nov 21, 2016 at 3:20 PM, Mark Smith <ma...@qq.is> wrote:

>> Jun,

>> 

>>  Thanks for the reply!

>> 

>>  I am aware the HW won't advance until the in-sync replicas have

>>  _requested_ the messages. However, I believe the issue is that the

>>  leader has no guarantee the replicas have _received_ the fetch
>>  response.
>>  There is no second-phase to the commit.

>> 

>>  So, in the particular case where a leader transition happens, I
>>  believe
>>  this race condition exists (and I'm happy to be wrong here, but
>>  it looks
>>  feasible and explains the data loss I saw):

>> 

>>  1. Starting point: Leader and Replica both only have up to message
>>     #36
>>  2. Client produces new message with required.acks=all

>>  3. Leader commits message #37, but HW is still #36, the produce
>>     request
>>  is blocked

>>  4. Replica fetches messages (leader has RECEIVED the fetch request)
>>  5. Leader then advances HW to #37 and unblocks the produce request,
>>  client believes it's durable

>>  6. PREFERRED REPLICA ELECTION BEGIN

>>  7. Replica starts become-leader process

>>  8. Leader finishes sending fetch response, replica is just now
>>     seeing
>>  message #37

>>  9. Replica throws away fetch response from step 4 because it is now
>>  becoming leader (partition has been removed from partitionMap so it
>>  looks like data is ignored)

>>  10. Leader starts become-follower

>>  11. Leader truncates to replica HW offset of #36

>>  12. Message #37 was durably committed but is now lost

>> 

>>  For the tickets you linked:

>> 

>> https://issues.apache.org/jira/browse/KAFKA-3670

>>  * There was no shutdown involved in this case, so this shouldn't be
>>  impacting.

>> 

>> https://issues.apache.org/jira/browse/KAFKA-1211

>>  * I've read through this but I'm not entirely sure if it
>>    addresses the
>>  above. I don't think it does, though. I don't see a step in the
>>  ticket
>>  about become-leader making a call to the old leader to get the
>>  latest
>>  generation snapshot?

>> 
>>  --
>>  Mark Smith
>> mark@qq.is

>> 

>> On Fri, Nov 18, 2016, at 10:52 AM, Jun Rao wrote:

>>  > Mark,

>>  >

>>  > Thanks for reporting this. First, a clarification. The HW is
>>  > actually
>>  > never

>>  > advanced until all in-sync followers have fetched the
>>  > corresponding
>>  > message. For example, in step 2, if all follower replicas issue a
>>  > fetch
>>  > request at offset 10, it serves as an indication that all replicas
>>  > have
>>  > received messages up to offset 9. So,only then, the HW is
>>  > advanced to
>>  > offset 10 (which is not inclusive).

>>  >

>>  > I think the problem that you are seeing are probably caused by two
>>  > known
>>  > issues. The first one is

>>  > https://issues.apache.org/jira/browse/KAFKA-1211.

>>  > The issue is that the HW is propagated asynchronously from the
>>  > leader to
>>  > the followers. If the leadership changes multiple time very
>>  > quickly, what
>>  > can happen is that a follower first truncates its data up to HW
>>  > and then
>>  > immediately becomes the new leader. Since the follower's HW may
>>  > not be up
>>  > to date, some previously committed messages could be lost. The
>>  > second one
>>  > is https://issues.apache.org/jira/browse/KAFKA-3670. The issue is
>>  > that
>>  > controlled shutdown and leader balancing can cause leadership to
>>  > change
>>  > more than once quickly, which could expose the data loss problem
>>  > in the
>>  > first issue.

>>  >

>>  > The second issue has been fixed in 0.10.0. So, if you upgrade to
>>  > that
>>  > version or above, it should reduce the chance of hitting the first
>>  > issue
>>  > significantly. We are actively working on the first issue and
>>  > hopefully
>>  > it

>>  > will be addressed in the next release.

>>  >

>>  > Jun

>>  >

>>  > On Thu, Nov 17, 2016 at 5:39 PM, Mark Smith <ma...@qq.is> wrote:

>>  >

>>  > > Hey folks,

>>  > >

>>  > > I work at Dropbox and I was doing some maintenance yesterday and
>>  > > it
>>  > > looks like we lost some committed data during a preferred
>>  > > replica
>>  > > election. As far as I understand this shouldn't happen, but I
>>  > > have a
>>  > > theory and want to run it by ya'll.

>>  > >

>>  > > Preamble:

>>  > > * Kafka 0.9.0.1

>>  > > * required.acks = -1 (All)

>>  > > * min.insync.replicas = 2 (only 2 replicas for the partition, so
>>  > >   we
>>  > > require both to have the data)

>>  > > * consumer is Kafka Connect

>>  > > * 1400 topics, total of about 15,000 partitions

>>  > > * 30 brokers

>>  > >

>>  > > I was performing some rolling restarts of brokers yesterday as
>>  > > part of
>>  > > our regular DRT (disaster testing) process and at the end that
>>  > > always
>>  > > leaves many partitions that need to be failed back to the
>>  > > preferred
>>  > > replica. There were about 8,000 partitions that needed moving. I
>>  > > started
>>  > > the election in Kafka Manager and it worked, but it looks like 4
>>  > > of
>>  > > those 8,000 partitions experienced some relatively small amount
>>  > > of data
>>  > > loss at the tail.

>>  > >

>>  > > From the Kafka Connect point of view, we saw a handful of these:
>>  > >

>>  > > [2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-analytics-staging-8-
>>  > > 5]
>>  > > INFO Fetch offset 67614479952 is out of range, resetting offset
>>  > > (o.a.k.c.c.i.Fetcher:595)

>>  > >

>>  > > I believe that was because it asked the new leader for data and
>>  > > the new
>>  > > leader had less data than the old leader. Indeed, the old leader
>>  > > became
>>  > > a follower and immediately truncated:

>>  > >

>>  > > 2016-11-17 02:55:27,237 INFO log.Log: Truncating log

>>  > > goscribe.client-host_activity-21 to offset 67614479601.

>>  > >

>>  > > Given the above production settings I don't know why KC would
>>  > > ever see
>>  > > an OffsetOutOfRange error but this caused KC to reset to the
>>  > > beginning
>>  > > of the partition. Various broker logs for the failover paint the
>>  > > following timeline:

>>  > > https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6
>>  > >

>>  > > My current working theory that I'd love eyes on:

>>  > >

>>  > >   1. Leader receives produce request and appends to log,
>>  > >      incrementing
>>  > >   LEO, but given the durability requirements the HW is not
>>  > >   incremented
>>  > >   and the produce response is delayed (normal)

>>  > >

>>  > >   2. Replica sends Fetch request to leader as part of normal
>>  > >      replication
>>  > >   flow

>>  > >

>>  > >   3. Leader increments HW when it STARTS to respond to the Fetch
>>  > >      request
>>  > >   (per fetchMessages in ReplicaManager.scala), so the HW is
>>  > >   updated as
>>  > >   soon as we've prepared messages for response -- importantly
>>  > >   the HW is
>>  > >   updated even though the replica has not yet actually seen the
>>  > >   messages, even given the durability settings we've got

>>  > >

>>  > >   4. Meanwhile, Kafka Connect sends Fetch request to leader and
>>  > >      receives
>>  > >   the messages below the new HW, but the messages have not
>>  > >   actually been
>>  > >   received by the replica yet still

>>  > >

>>  > >   5. Preferred replica election begins (oh the travesty!)

>>  > >

>>  > >   6. Replica starts the become-leader process and makeLeader
>>  > >      removes
>>  > >   this partition from partitionMap, which means when the
>>  > >   response comes
>>  > >   in finally, we ignore it (we discard the old-leader committed
>>  > >   messages)

>>  > >

>>  > >   7. Old-leader starts become-follower process and truncates to
>>  > >      the HW
>>  > >   of the new-leader i.e. the old-leader has now thrown away data
>>  > >   it had
>>  > >   committed and given out moments ago

>>  > >

>>  > >   8. Kafka Connect sends Fetch request to the new-leader but its
>>  > >      offset
>>  > >   is now greater than the HW of the new-leader, so we get the

>>  > >   OffsetOutOfRange error and restart

>>  > >

>>  > > Can someone tell me whether or not this is plausible? If it is,
>>  > > is there
>>  > > a known issue/bug filed for it? I'm not exactly sure what the
>>  > > solution
>>  > > is, but it does seem unfortunate that a normal operation (leader
>>  > > election with both brokers alive and well) can result in the
>>  > > loss of
>>  > > committed messages.

>>  > >

>>  > > And, if my theory doesn't hold, can anybody explain what
>>  > > happened? I'm
>>  > > happy to provide more logs or whatever.

>>  > >

>>  > > Thanks!

>>  > >

>>  > >

>>  > > --

>>  > > Mark Smith

>>  > > mark@qq.is

>>  > >



Re: Investigating apparent data loss during preferred replica election

Posted by Jun Rao <ju...@confluent.io>.
Hi, Mark,

Hmm, the committing of a message at offset X is the equivalent of saying
that the HW is at offset X + 1. So, in your example, if the producer
publishes a new message at offset 37, this message won't be committed
(i.e., HW moves to offset 38) until the leader sees the follower fetch from
offset 38 (not offset 37). At that point, the follower would have received
message at offset 37 in the fetch response and appended that message to its
local log. If the follower now becomes the new leader, message at offset 37
is preserved.

The problem that I described regarding data loss can happen during a
rolling restart. Suppose that you have 3 replicas A, B, and C. Let's say A
is the preferred the leader, but during the deployment, the leader gets
moved to replica B at some point and all 3 replicas are in sync. A new
message is produced at offset 37 and is committed (leader's HW =38).
However, the HW in replica A is still at 37. Now, we try to shutdown broker
B and the leader gets moved to replica C. Replica A starts to follow
replica C and it first truncates to HW 37, which removes the message at
offset 37. Now, preferred leader logic kicks in and the leadership switches
again to replica A. Since A doesn't have message at offset 37 any more and
all followers copy messages from replica A, message at offset 37 is lost.

With KAFKA-3670, in the above example, when shutting down broker B, the
leader will be directly moved to replica A since it's a preferred replica.
So the above scenario won't happen.

The more complete fix is in KAFKA-1211. The logic for getting the latest
generation snapshot is just a proposal and is not in the code base yet.

Thanks,

Jun

On Mon, Nov 21, 2016 at 3:20 PM, Mark Smith <ma...@qq.is> wrote:

> Jun,
>
> Thanks for the reply!
>
> I am aware the HW won't advance until the in-sync replicas have
> _requested_ the messages. However, I believe the issue is that the
> leader has no guarantee the replicas have _received_ the fetch response.
> There is no second-phase to the commit.
>
> So, in the particular case where a leader transition happens, I believe
> this race condition exists (and I'm happy to be wrong here, but it looks
> feasible and explains the data loss I saw):
>
> 1. Starting point: Leader and Replica both only have up to message #36
> 2. Client produces new message with required.acks=all
> 3. Leader commits message #37, but HW is still #36, the produce request
> is blocked
> 4. Replica fetches messages (leader has RECEIVED the fetch request)
> 5. Leader then advances HW to #37 and unblocks the produce request,
> client believes it's durable
> 6. PREFERRED REPLICA ELECTION BEGIN
> 7. Replica starts become-leader process
> 8. Leader finishes sending fetch response, replica is just now seeing
> message #37
> 9. Replica throws away fetch response from step 4 because it is now
> becoming leader (partition has been removed from partitionMap so it
> looks like data is ignored)
> 10. Leader starts become-follower
> 11. Leader truncates to replica HW offset of #36
> 12. Message #37 was durably committed but is now lost
>
> For the tickets you linked:
>
> https://issues.apache.org/jira/browse/KAFKA-3670
> * There was no shutdown involved in this case, so this shouldn't be
> impacting.
>
> https://issues.apache.org/jira/browse/KAFKA-1211
> * I've read through this but I'm not entirely sure if it addresses the
> above. I don't think it does, though. I don't see a step in the ticket
> about become-leader making a call to the old leader to get the latest
> generation snapshot?
>
> --
> Mark Smith
> mark@qq.is
>
> On Fri, Nov 18, 2016, at 10:52 AM, Jun Rao wrote:
> > Mark,
> >
> > Thanks for reporting this. First, a clarification. The HW is actually
> > never
> > advanced until all in-sync followers have fetched the corresponding
> > message. For example, in step 2, if all follower replicas issue a fetch
> > request at offset 10, it serves as an indication that all replicas have
> > received messages up to offset 9. So,only then, the HW is advanced to
> > offset 10 (which is not inclusive).
> >
> > I think the problem that you are seeing are probably caused by two known
> > issues. The first one is
> > https://issues.apache.org/jira/browse/KAFKA-1211.
> > The issue is that the HW is propagated asynchronously from the leader to
> > the followers. If the leadership changes multiple time very quickly, what
> > can happen is that a follower first truncates its data up to HW and then
> > immediately becomes the new leader. Since the follower's HW may not be up
> > to date, some previously committed messages could be lost. The second one
> > is https://issues.apache.org/jira/browse/KAFKA-3670. The issue is that
> > controlled shutdown and leader balancing can cause leadership to change
> > more than once quickly, which could expose the data loss problem in the
> > first issue.
> >
> > The second issue has been fixed in 0.10.0. So, if you upgrade to that
> > version or above, it should reduce the chance of hitting the first issue
> > significantly. We are actively working on the first issue and hopefully
> > it
> > will be addressed in the next release.
> >
> > Jun
> >
> > On Thu, Nov 17, 2016 at 5:39 PM, Mark Smith <ma...@qq.is> wrote:
> >
> > > Hey folks,
> > >
> > > I work at Dropbox and I was doing some maintenance yesterday and it
> > > looks like we lost some committed data during a preferred replica
> > > election. As far as I understand this shouldn't happen, but I have a
> > > theory and want to run it by ya'll.
> > >
> > > Preamble:
> > > * Kafka 0.9.0.1
> > > * required.acks = -1 (All)
> > > * min.insync.replicas = 2 (only 2 replicas for the partition, so we
> > > require both to have the data)
> > > * consumer is Kafka Connect
> > > * 1400 topics, total of about 15,000 partitions
> > > * 30 brokers
> > >
> > > I was performing some rolling restarts of brokers yesterday as part of
> > > our regular DRT (disaster testing) process and at the end that always
> > > leaves many partitions that need to be failed back to the preferred
> > > replica. There were about 8,000 partitions that needed moving. I
> started
> > > the election in Kafka Manager and it worked, but it looks like 4 of
> > > those 8,000 partitions experienced some relatively small amount of data
> > > loss at the tail.
> > >
> > > From the Kafka Connect point of view, we saw a handful of these:
> > >
> > > [2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-
> analytics-staging-8-5]
> > > INFO Fetch offset 67614479952 is out of range, resetting offset
> > > (o.a.k.c.c.i.Fetcher:595)
> > >
> > > I believe that was because it asked the new leader for data and the new
> > > leader had less data than the old leader. Indeed, the old leader became
> > > a follower and immediately truncated:
> > >
> > > 2016-11-17 02:55:27,237 INFO log.Log: Truncating log
> > > goscribe.client-host_activity-21 to offset 67614479601.
> > >
> > > Given the above production settings I don't know why KC would ever see
> > > an OffsetOutOfRange error but this caused KC to reset to the beginning
> > > of the partition. Various broker logs for the failover paint the
> > > following timeline:
> > > https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6
> > >
> > > My current working theory that I'd love eyes on:
> > >
> > >   1. Leader receives produce request and appends to log, incrementing
> > >   LEO, but given the durability requirements the HW is not incremented
> > >   and the produce response is delayed (normal)
> > >
> > >   2. Replica sends Fetch request to leader as part of normal
> replication
> > >   flow
> > >
> > >   3. Leader increments HW when it STARTS to respond to the Fetch
> request
> > >   (per fetchMessages in ReplicaManager.scala), so the HW is updated as
> > >   soon as we've prepared messages for response -- importantly the HW is
> > >   updated even though the replica has not yet actually seen the
> > >   messages, even given the durability settings we've got
> > >
> > >   4. Meanwhile, Kafka Connect sends Fetch request to leader and
> receives
> > >   the messages below the new HW, but the messages have not actually
> been
> > >   received by the replica yet still
> > >
> > >   5. Preferred replica election begins (oh the travesty!)
> > >
> > >   6. Replica starts the become-leader process and makeLeader removes
> > >   this partition from partitionMap, which means when the response comes
> > >   in finally, we ignore it (we discard the old-leader committed
> > >   messages)
> > >
> > >   7. Old-leader starts become-follower process and truncates to the HW
> > >   of the new-leader i.e. the old-leader has now thrown away data it had
> > >   committed and given out moments ago
> > >
> > >   8. Kafka Connect sends Fetch request to the new-leader but its offset
> > >   is now greater than the HW of the new-leader, so we get the
> > >   OffsetOutOfRange error and restart
> > >
> > > Can someone tell me whether or not this is plausible? If it is, is
> there
> > > a known issue/bug filed for it? I'm not exactly sure what the solution
> > > is, but it does seem unfortunate that a normal operation (leader
> > > election with both brokers alive and well) can result in the loss of
> > > committed messages.
> > >
> > > And, if my theory doesn't hold, can anybody explain what happened? I'm
> > > happy to provide more logs or whatever.
> > >
> > > Thanks!
> > >
> > >
> > > --
> > > Mark Smith
> > > mark@qq.is
> > >
>

Re: Investigating apparent data loss during preferred replica election

Posted by Mark Smith <ma...@qq.is>.
Jun,

Thanks for the reply!

I am aware the HW won't advance until the in-sync replicas have
_requested_ the messages. However, I believe the issue is that the
leader has no guarantee the replicas have _received_ the fetch response.
There is no second-phase to the commit.

So, in the particular case where a leader transition happens, I believe
this race condition exists (and I'm happy to be wrong here, but it looks
feasible and explains the data loss I saw):

1. Starting point: Leader and Replica both only have up to message #36
2. Client produces new message with required.acks=all
3. Leader commits message #37, but HW is still #36, the produce request
is blocked
4. Replica fetches messages (leader has RECEIVED the fetch request)
5. Leader then advances HW to #37 and unblocks the produce request,
client believes it's durable
6. PREFERRED REPLICA ELECTION BEGIN
7. Replica starts become-leader process
8. Leader finishes sending fetch response, replica is just now seeing
message #37
9. Replica throws away fetch response from step 4 because it is now
becoming leader (partition has been removed from partitionMap so it
looks like data is ignored)
10. Leader starts become-follower
11. Leader truncates to replica HW offset of #36
12. Message #37 was durably committed but is now lost

For the tickets you linked:

https://issues.apache.org/jira/browse/KAFKA-3670
* There was no shutdown involved in this case, so this shouldn't be
impacting.

https://issues.apache.org/jira/browse/KAFKA-1211
* I've read through this but I'm not entirely sure if it addresses the
above. I don't think it does, though. I don't see a step in the ticket
about become-leader making a call to the old leader to get the latest
generation snapshot?

-- 
Mark Smith
mark@qq.is

On Fri, Nov 18, 2016, at 10:52 AM, Jun Rao wrote:
> Mark,
> 
> Thanks for reporting this. First, a clarification. The HW is actually
> never
> advanced until all in-sync followers have fetched the corresponding
> message. For example, in step 2, if all follower replicas issue a fetch
> request at offset 10, it serves as an indication that all replicas have
> received messages up to offset 9. So,only then, the HW is advanced to
> offset 10 (which is not inclusive).
> 
> I think the problem that you are seeing are probably caused by two known
> issues. The first one is
> https://issues.apache.org/jira/browse/KAFKA-1211.
> The issue is that the HW is propagated asynchronously from the leader to
> the followers. If the leadership changes multiple time very quickly, what
> can happen is that a follower first truncates its data up to HW and then
> immediately becomes the new leader. Since the follower's HW may not be up
> to date, some previously committed messages could be lost. The second one
> is https://issues.apache.org/jira/browse/KAFKA-3670. The issue is that
> controlled shutdown and leader balancing can cause leadership to change
> more than once quickly, which could expose the data loss problem in the
> first issue.
> 
> The second issue has been fixed in 0.10.0. So, if you upgrade to that
> version or above, it should reduce the chance of hitting the first issue
> significantly. We are actively working on the first issue and hopefully
> it
> will be addressed in the next release.
> 
> Jun
> 
> On Thu, Nov 17, 2016 at 5:39 PM, Mark Smith <ma...@qq.is> wrote:
> 
> > Hey folks,
> >
> > I work at Dropbox and I was doing some maintenance yesterday and it
> > looks like we lost some committed data during a preferred replica
> > election. As far as I understand this shouldn't happen, but I have a
> > theory and want to run it by ya'll.
> >
> > Preamble:
> > * Kafka 0.9.0.1
> > * required.acks = -1 (All)
> > * min.insync.replicas = 2 (only 2 replicas for the partition, so we
> > require both to have the data)
> > * consumer is Kafka Connect
> > * 1400 topics, total of about 15,000 partitions
> > * 30 brokers
> >
> > I was performing some rolling restarts of brokers yesterday as part of
> > our regular DRT (disaster testing) process and at the end that always
> > leaves many partitions that need to be failed back to the preferred
> > replica. There were about 8,000 partitions that needed moving. I started
> > the election in Kafka Manager and it worked, but it looks like 4 of
> > those 8,000 partitions experienced some relatively small amount of data
> > loss at the tail.
> >
> > From the Kafka Connect point of view, we saw a handful of these:
> >
> > [2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-analytics-staging-8-5]
> > INFO Fetch offset 67614479952 is out of range, resetting offset
> > (o.a.k.c.c.i.Fetcher:595)
> >
> > I believe that was because it asked the new leader for data and the new
> > leader had less data than the old leader. Indeed, the old leader became
> > a follower and immediately truncated:
> >
> > 2016-11-17 02:55:27,237 INFO log.Log: Truncating log
> > goscribe.client-host_activity-21 to offset 67614479601.
> >
> > Given the above production settings I don't know why KC would ever see
> > an OffsetOutOfRange error but this caused KC to reset to the beginning
> > of the partition. Various broker logs for the failover paint the
> > following timeline:
> > https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6
> >
> > My current working theory that I'd love eyes on:
> >
> >   1. Leader receives produce request and appends to log, incrementing
> >   LEO, but given the durability requirements the HW is not incremented
> >   and the produce response is delayed (normal)
> >
> >   2. Replica sends Fetch request to leader as part of normal replication
> >   flow
> >
> >   3. Leader increments HW when it STARTS to respond to the Fetch request
> >   (per fetchMessages in ReplicaManager.scala), so the HW is updated as
> >   soon as we've prepared messages for response -- importantly the HW is
> >   updated even though the replica has not yet actually seen the
> >   messages, even given the durability settings we've got
> >
> >   4. Meanwhile, Kafka Connect sends Fetch request to leader and receives
> >   the messages below the new HW, but the messages have not actually been
> >   received by the replica yet still
> >
> >   5. Preferred replica election begins (oh the travesty!)
> >
> >   6. Replica starts the become-leader process and makeLeader removes
> >   this partition from partitionMap, which means when the response comes
> >   in finally, we ignore it (we discard the old-leader committed
> >   messages)
> >
> >   7. Old-leader starts become-follower process and truncates to the HW
> >   of the new-leader i.e. the old-leader has now thrown away data it had
> >   committed and given out moments ago
> >
> >   8. Kafka Connect sends Fetch request to the new-leader but its offset
> >   is now greater than the HW of the new-leader, so we get the
> >   OffsetOutOfRange error and restart
> >
> > Can someone tell me whether or not this is plausible? If it is, is there
> > a known issue/bug filed for it? I'm not exactly sure what the solution
> > is, but it does seem unfortunate that a normal operation (leader
> > election with both brokers alive and well) can result in the loss of
> > committed messages.
> >
> > And, if my theory doesn't hold, can anybody explain what happened? I'm
> > happy to provide more logs or whatever.
> >
> > Thanks!
> >
> >
> > --
> > Mark Smith
> > mark@qq.is
> >

Re: Investigating apparent data loss during preferred replica election

Posted by Jun Rao <ju...@confluent.io>.
Mark,

Thanks for reporting this. First, a clarification. The HW is actually never
advanced until all in-sync followers have fetched the corresponding
message. For example, in step 2, if all follower replicas issue a fetch
request at offset 10, it serves as an indication that all replicas have
received messages up to offset 9. So,only then, the HW is advanced to
offset 10 (which is not inclusive).

I think the problem that you are seeing are probably caused by two known
issues. The first one is https://issues.apache.org/jira/browse/KAFKA-1211.
The issue is that the HW is propagated asynchronously from the leader to
the followers. If the leadership changes multiple time very quickly, what
can happen is that a follower first truncates its data up to HW and then
immediately becomes the new leader. Since the follower's HW may not be up
to date, some previously committed messages could be lost. The second one
is https://issues.apache.org/jira/browse/KAFKA-3670. The issue is that
controlled shutdown and leader balancing can cause leadership to change
more than once quickly, which could expose the data loss problem in the
first issue.

The second issue has been fixed in 0.10.0. So, if you upgrade to that
version or above, it should reduce the chance of hitting the first issue
significantly. We are actively working on the first issue and hopefully it
will be addressed in the next release.

Jun

On Thu, Nov 17, 2016 at 5:39 PM, Mark Smith <ma...@qq.is> wrote:

> Hey folks,
>
> I work at Dropbox and I was doing some maintenance yesterday and it
> looks like we lost some committed data during a preferred replica
> election. As far as I understand this shouldn't happen, but I have a
> theory and want to run it by ya'll.
>
> Preamble:
> * Kafka 0.9.0.1
> * required.acks = -1 (All)
> * min.insync.replicas = 2 (only 2 replicas for the partition, so we
> require both to have the data)
> * consumer is Kafka Connect
> * 1400 topics, total of about 15,000 partitions
> * 30 brokers
>
> I was performing some rolling restarts of brokers yesterday as part of
> our regular DRT (disaster testing) process and at the end that always
> leaves many partitions that need to be failed back to the preferred
> replica. There were about 8,000 partitions that needed moving. I started
> the election in Kafka Manager and it worked, but it looks like 4 of
> those 8,000 partitions experienced some relatively small amount of data
> loss at the tail.
>
> From the Kafka Connect point of view, we saw a handful of these:
>
> [2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-analytics-staging-8-5]
> INFO Fetch offset 67614479952 is out of range, resetting offset
> (o.a.k.c.c.i.Fetcher:595)
>
> I believe that was because it asked the new leader for data and the new
> leader had less data than the old leader. Indeed, the old leader became
> a follower and immediately truncated:
>
> 2016-11-17 02:55:27,237 INFO log.Log: Truncating log
> goscribe.client-host_activity-21 to offset 67614479601.
>
> Given the above production settings I don't know why KC would ever see
> an OffsetOutOfRange error but this caused KC to reset to the beginning
> of the partition. Various broker logs for the failover paint the
> following timeline:
> https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6
>
> My current working theory that I'd love eyes on:
>
>   1. Leader receives produce request and appends to log, incrementing
>   LEO, but given the durability requirements the HW is not incremented
>   and the produce response is delayed (normal)
>
>   2. Replica sends Fetch request to leader as part of normal replication
>   flow
>
>   3. Leader increments HW when it STARTS to respond to the Fetch request
>   (per fetchMessages in ReplicaManager.scala), so the HW is updated as
>   soon as we've prepared messages for response -- importantly the HW is
>   updated even though the replica has not yet actually seen the
>   messages, even given the durability settings we've got
>
>   4. Meanwhile, Kafka Connect sends Fetch request to leader and receives
>   the messages below the new HW, but the messages have not actually been
>   received by the replica yet still
>
>   5. Preferred replica election begins (oh the travesty!)
>
>   6. Replica starts the become-leader process and makeLeader removes
>   this partition from partitionMap, which means when the response comes
>   in finally, we ignore it (we discard the old-leader committed
>   messages)
>
>   7. Old-leader starts become-follower process and truncates to the HW
>   of the new-leader i.e. the old-leader has now thrown away data it had
>   committed and given out moments ago
>
>   8. Kafka Connect sends Fetch request to the new-leader but its offset
>   is now greater than the HW of the new-leader, so we get the
>   OffsetOutOfRange error and restart
>
> Can someone tell me whether or not this is plausible? If it is, is there
> a known issue/bug filed for it? I'm not exactly sure what the solution
> is, but it does seem unfortunate that a normal operation (leader
> election with both brokers alive and well) can result in the loss of
> committed messages.
>
> And, if my theory doesn't hold, can anybody explain what happened? I'm
> happy to provide more logs or whatever.
>
> Thanks!
>
>
> --
> Mark Smith
> mark@qq.is
>