You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/29 05:19:25 UTC

kafka git commit: KAFKA-2143: fix replica offset truncate to beginning during leader migration.

Repository: kafka
Updated Branches:
  refs/heads/trunk af0df0961 -> 87eccb9a3


KAFKA-2143: fix replica offset truncate to beginning during leader migration.

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Gwen Shapira, Guozhang Wang

Closes #129 from becketqin/KAFKA-2143


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/87eccb9a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/87eccb9a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/87eccb9a

Branch: refs/heads/trunk
Commit: 87eccb9a3bea56e5d7d5696aaddef1421f038903
Parents: af0df09
Author: Jiangjie Qin <be...@gmail.com>
Authored: Thu Jan 28 20:19:21 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jan 28 20:19:21 2016 -0800

----------------------------------------------------------------------
 .../kafka/server/ReplicaFetcherThread.scala     | 27 +++++++++++++++++---
 1 file changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/87eccb9a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8ce0c8f..a672917 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -176,17 +176,36 @@ class ReplicaFetcherThread(name: String,
       leaderEndOffset
     } else {
       /**
-       * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
+       * If the leader's log end offset is greater than follower's log end offset, there are two possibilities:
+       * 1. The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
        * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
+       * 2. When unclean leader election occurs, it is possible that the old leader's high watermark is greater than
+       * the new leader's log end offset. So when the old leader truncates its offset to its high watermark and starts
+       * to fetch from new leader, an OffsetOutOfRangeException will be thrown. After that some more messages are
+       * produced to the new leader. When the old leader was trying to handle the OffsetOutOfRangeException and query
+       * the log end offset of new leader, new leader's log end offset become higher than follower's log end offset.
+       *
+       * In the first case, the follower's current log end offset is smaller than leader's log start offset. So the
+       * follower should truncate all its log, roll out a new segment and start to fetch from current leader's log
+       * start offset.
+       * In the second case, the follower should just keep the current log segments and retry fetch. In the second
+       * case, their will be some inconsistency of data between old leader and new leader. Weare not solving it here.
+       * If user want to have strong consistency guarantee, appropriate configurations needs to be set for both
+       * brokers and producers.
+       *
+       * Putting the two case together, the follower should fetch from the higher one of its replica log end offset
+       * and the current leader's log start offset.
        *
-       * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
        */
       val leaderStartOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.EARLIEST_TIMESTAMP,
         brokerConfig.brokerId)
       warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d"
         .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
-      replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
-      leaderStartOffset
+      val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
+      // Only truncate log when current leader's log start offset is greater than follower's log end offset.
+      if (leaderStartOffset > replica.logEndOffset.messageOffset)
+        replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
+      offsetToFetch
     }
   }