You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/04/05 06:20:02 UTC

git commit: KAFKA-851 Broken handling of leader and isr request leads to incorrect high watermark checkpoint file; reviewed by Jun Rao and Swapnil Ghike

Updated Branches:
  refs/heads/0.8 afecc9f23 -> 1d3c343f5


KAFKA-851 Broken handling of leader and isr request leads to incorrect high watermark checkpoint file; reviewed by Jun Rao and Swapnil Ghike


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d3c343f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d3c343f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d3c343f

Branch: refs/heads/0.8
Commit: 1d3c343f57db55076c7e627bc362b4a64e22dcb2
Parents: afecc9f
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Apr 4 21:19:52 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Apr 4 21:19:52 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Partition.scala |    7 +++++--
 1 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d3c343f/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 2ca7ee6..aa2092e 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -179,13 +179,16 @@ class Partition(val topic: String,
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
+      // make sure local replica exists. This reads the last check pointed high watermark from disk. On startup, it is
+      // important to ensure that this operation happens for every single partition in a leader and isr request, else
+      // some high watermark values could be overwritten with 0. This leads to replicas fetching from the earliest offset
+      // on the leader
+      val localReplica = getOrCreateReplica()
       val newLeaderBrokerId: Int = leaderAndIsr.leader
       liveBrokers.find(_.id == newLeaderBrokerId) match {
         case Some(leaderBroker) =>
           // stop fetcher thread to previous leader
           replicaFetcherManager.removeFetcher(topic, partitionId)
-          // make sure local replica exists
-          val localReplica = getOrCreateReplica()
           localReplica.log.get.truncateTo(localReplica.highWatermark)
           inSyncReplicas = Set.empty[Replica]
           leaderEpoch = leaderAndIsr.leaderEpoch