You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/04/05 06:20:02 UTC
git commit: KAFKA-851 Broken handling of leader and isr request leads
to incorrect high watermark checkpoint file;
reviewed by Jun Rao and Swapnil Ghike
Updated Branches:
refs/heads/0.8 afecc9f23 -> 1d3c343f5
KAFKA-851 Broken handling of leader and isr request leads to incorrect high watermark checkpoint file; reviewed by Jun Rao and Swapnil Ghike
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d3c343f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d3c343f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d3c343f
Branch: refs/heads/0.8
Commit: 1d3c343f57db55076c7e627bc362b4a64e22dcb2
Parents: afecc9f
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Apr 4 21:19:52 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Apr 4 21:19:52 2013 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/cluster/Partition.scala | 7 +++++--
1 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d3c343f/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 2ca7ee6..aa2092e 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -179,13 +179,16 @@ class Partition(val topic: String,
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
+ // make sure local replica exists. This reads the last check pointed high watermark from disk. On startup, it is
+ // important to ensure that this operation happens for every single partition in a leader and isr request, else
+ // some high watermark values could be overwritten with 0. This leads to replicas fetching from the earliest offset
+ // on the leader
+ val localReplica = getOrCreateReplica()
val newLeaderBrokerId: Int = leaderAndIsr.leader
liveBrokers.find(_.id == newLeaderBrokerId) match {
case Some(leaderBroker) =>
// stop fetcher thread to previous leader
replicaFetcherManager.removeFetcher(topic, partitionId)
- // make sure local replica exists
- val localReplica = getOrCreateReplica()
localReplica.log.get.truncateTo(localReplica.highWatermark)
inSyncReplicas = Set.empty[Replica]
leaderEpoch = leaderAndIsr.leaderEpoch