You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/29 05:19:25 UTC
kafka git commit: KAFKA-2143: fix replica offset truncate to
beginning during leader migration.
Repository: kafka
Updated Branches:
refs/heads/trunk af0df0961 -> 87eccb9a3
KAFKA-2143: fix replica offset truncate to beginning during leader migration.
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Gwen Shapira, Guozhang Wang
Closes #129 from becketqin/KAFKA-2143
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/87eccb9a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/87eccb9a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/87eccb9a
Branch: refs/heads/trunk
Commit: 87eccb9a3bea56e5d7d5696aaddef1421f038903
Parents: af0df09
Author: Jiangjie Qin <be...@gmail.com>
Authored: Thu Jan 28 20:19:21 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jan 28 20:19:21 2016 -0800
----------------------------------------------------------------------
.../kafka/server/ReplicaFetcherThread.scala | 27 +++++++++++++++++---
1 file changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/87eccb9a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8ce0c8f..a672917 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -176,17 +176,36 @@ class ReplicaFetcherThread(name: String,
leaderEndOffset
} else {
/**
- * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
+ * If the leader's log end offset is greater than follower's log end offset, there are two possibilities:
+ * 1. The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
* start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
+ * 2. When unclean leader election occurs, it is possible that the old leader's high watermark is greater than
+ * the new leader's log end offset. So when the old leader truncates its offset to its high watermark and starts
+ * to fetch from new leader, an OffsetOutOfRangeException will be thrown. After that some more messages are
+ * produced to the new leader. When the old leader was trying to handle the OffsetOutOfRangeException and query
+ * the log end offset of new leader, new leader's log end offset become higher than follower's log end offset.
+ *
+ * In the first case, the follower's current log end offset is smaller than leader's log start offset. So the
+ * follower should truncate all its log, roll out a new segment and start to fetch from current leader's log
+ * start offset.
+ * In the second case, the follower should just keep the current log segments and retry fetch. In the second
+ * case, their will be some inconsistency of data between old leader and new leader. Weare not solving it here.
+ * If user want to have strong consistency guarantee, appropriate configurations needs to be set for both
+ * brokers and producers.
+ *
+ * Putting the two case together, the follower should fetch from the higher one of its replica log end offset
+ * and the current leader's log start offset.
*
- * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
*/
val leaderStartOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.EARLIEST_TIMESTAMP,
brokerConfig.brokerId)
warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d"
.format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
- replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
- leaderStartOffset
+ val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
+ // Only truncate log when current leader's log start offset is greater than follower's log end offset.
+ if (leaderStartOffset > replica.logEndOffset.messageOffset)
+ replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
+ offsetToFetch
}
}