You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2014/07/24 18:24:39 UTC

[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

    [ https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073346#comment-14073346 ] 

Jun Rao commented on KAFKA-1211:
--------------------------------

Yes, this is a potential problem. Waiting for HW to be propagated to the followers will introduce another round of network delay on every message to be committed though. The following is another potential solution that avoid this overhead.

Note that the follower in ISR always has all committed messages. On follower startup, if we can figure out accurately which messages are committed and which ones are not, we won't unnecessarily truncate committed messages. Not that when a follower takes over as the new leader, it always tries to commit all existing messages that are obtained from the previous generation of the leader. After that, it will start committing new messages received in its own generation. If we can track the leader generation of each message, we can do the truncation accurately. To do that, in each replica, we maintain a leader generation vector that contains the leader generation id and its starting offset (the offset of the first message written by the leader in that generation) and we persist that vector in a LeaderGeneration file locally.

If a replica becomes a leader, before it accepts any new message, it first appends the current leader generation id and its current log end offset to the LeaderGeneration file. If a replica becomes a follower, it first gets the leader generation vector from the leader and then determines the offset where its highest leader generation ends in the leader. It will then truncate its log up to that offset (if there are messages beyond that offset). After that, the follower will store the leader generation vector obtained from the leader in its local LeaderGeneration file and starts fetching messages from the leader from its log end offset.

Let's consider a couple of examples. 

Example 1. Suppose that we have two replicas A and B and A is the leader. At some point, we have the following messages in A and B.

{noformat}
offset    A    B
1          m1  m1
2          m2
{noformat}
 
Let's assume that message m1 is committed, but m2 is not. At this point, A dies and B takes over as the leader. Let's say B then commits 2 more messages m3 and m4.

{noformat}
offset    A    B
0          m1  m1
1          m2  m3
2                 m4
{noformat}

When replica A comes back, it's important for A to get rid of m2 from offset 1 since m2 is never committed. In this case, the leader generation vector in A and B will look like the following.

{noformat}
                 A                                                      B
leaderGenId   startOffset                leaderGenId   startOffset
1                     0                              1                     0
                                                       2                     1
{noformat}

By comparing A's leader generation vector with that from the current leader B, A knows that its latest messages are produced by the leader in generation 1, which ends at offset 0. So any message in its local log after offset 0 are not committed and can be truncated. Any message at or before offset 0 is guaranteed to be committed. So, replica A will remove m2 from offset 1 and get m3 and m4 from B afterward. At that point, A's log is consistent with that of B. All committed messages are preserved and all uncommitted messages are removed.

Example 2. Suppose that we have two replicas A and B and A is the leader. At some point, we have the following messages in A and B.

{noformat}
offset    A    B
1          m1  m1
2          m2  m2
{noformat}
 
Let's assume that both message m1 and m2 are committed. At this point, A dies and B takes over as the leader. Let's say B then commits 2 more messages m3 and m4.

{noformat}
offset    A    B
0          m1  m1
1          m2  m2
2                 m3
3                 m4
{noformat}

In this case, the leader generation vector in A and B will look like the following.

{noformat}
                 A                                                      B
leaderGenId   startOffset                leaderGenId   startOffset
1                     0                              1                     0
                                                       2                     2
{noformat}

When A comes back, by comparing its leader generation vector with that from the current leader B, A knows that its latest messages are produced by the leader in generation 1, which ends at offset 1. So, it will keep m2 at offset 1 and get m3 and m4 from B. Again, this will make A's log consistent with B.

This approach doesn't pay the extra network roundtrip to commit a message. The becoming follower process will be a bit slower since It now needs to issue a new request to get the leader vector before it can start fetching from the leader. However, since leader changes are rare, this probably provides a better tradeoff. There are also other details that need to be worked out.

1. We need to deal with the case that the leader generation vector may have a gap, i.e., no messages are produced in a leader generation.
2. We probably need to remove old leader generations from the LeaderGeneration file so that it won't grow forever. Perhaps we need to configure a max # of generations to keep.

Since this problem is relatively rare and the fix is a bit involved, we can probably put it off until 0.9 or beyond.




> Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset
> --------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1211
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1211
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>
> Today during leader failover we will have a weakness period when the followers truncate their data before fetching from the new leader, i.e., number of in-sync replicas is just 1. If during this time the leader has also failed then produce requests with ack >1 that have get responded will still be lost. To avoid this scenario we would prefer to hold the produce request in purgatory until replica's HW has larger than the offset instead of just their end-of-log offsets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)