You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/25 02:06:34 UTC

svn commit: r1377157 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: cluster/Partition.scala server/ReplicaManager.scala

Author: junrao
Date: Sat Aug 25 00:06:33 2012
New Revision: 1377157

URL: http://svn.apache.org/viewvc?rev=1377157&view=rev
Log:
update leaderAndISR in ZK conditionally; patched by Yang Ye; reviewed by Jun Rao; KAFKA-428

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1377157&r1=1377156&r2=1377157&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Sat Aug 25 00:06:33 2012
@@ -39,6 +39,9 @@ class Partition(val topic: String,
   var inSyncReplicas: Set[Replica] = Set.empty[Replica]
   private val assignedReplicaMap = new Pool[Int,Replica]
   private val leaderISRUpdateLock = new Object
+  private var zkVersion: Int = LeaderAndISR.initialZKVersion
+  private var leaderEpoch: Int = LeaderAndISR.initialLeaderEpoch - 1
+  this.logIdent = "Partition [%s, %d] on broker %d, ".format(topic, partitionId, localBrokerId)
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
@@ -90,71 +93,70 @@ class Partition(val topic: String,
     assignedReplicaMap.values.toSet
   }
 
+
+  /**
+   *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader.
+   */
+  def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR, isMakingLeader: Boolean): Boolean = {
+    leaderISRUpdateLock synchronized {
+      if (leaderEpoch >= leaderAndISR.leaderEpoch){
+        info("Current leaderEpoch [%d] is larger or equal to the requested leaderEpoch [%d], discard the become %s request"
+          .format(leaderEpoch, leaderAndISR.leaderEpoch, if(isMakingLeader) "leader" else "follower"))
+        return false
+      }
+      if(isMakingLeader)
+        makeLeader(topic, partitionId, leaderAndISR)
+      else
+        makeFollower(topic, partitionId, leaderAndISR)
+      true
+    }
+  }
+
   /**
-   *  If the local replica is not the leader, make the local replica the leader in the following steps.
+   *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the leader in the following steps.
    *  1. stop the existing replica fetcher
    *  2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in ISR to be available)
    *  3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
    *  4. set the new leader and ISR
    */
-  def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR): Boolean = {
-    leaderISRUpdateLock synchronized {
-      val shouldBecomeLeader = leaderReplicaIdOpt match {
-        case Some(leaderReplicaId) => !isReplicaLocal(leaderReplicaId)
-        case None => true
-      }
-      if (shouldBecomeLeader) {
-        info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
-        // stop replica fetcher thread, if any
-        replicaFetcherManager.removeFetcher(topic, partitionId)
-
-        val newInSyncReplicas = leaderAndISR.ISR.map(r => getOrCreateReplica(r)).toSet
-        // reset LogEndOffset for remote replicas
-        assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
-        inSyncReplicas = newInSyncReplicas
-        leaderReplicaIdOpt = Some(localBrokerId)
-        true
-      } else
-        false
-    }
+  private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) {
+      trace("Started to become leader at the request %s".format(leaderAndISR.toString()))
+      // stop replica fetcher thread, if any
+      replicaFetcherManager.removeFetcher(topic, partitionId)
+
+      val newInSyncReplicas = leaderAndISR.ISR.map(r => getOrCreateReplica(r)).toSet
+      // reset LogEndOffset for remote replicas
+      assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
+      inSyncReplicas = newInSyncReplicas
+      leaderEpoch = leaderAndISR.leaderEpoch
+      zkVersion = leaderAndISR.zkVersion
+      leaderReplicaIdOpt = Some(localBrokerId)
   }
 
   /**
-   *  If the local replica is not already following the new leader, make it follow the new leader.
    *  1. stop any existing fetcher on this partition from the local replica
    *  2. make sure local replica exists and truncate the log to high watermark
    *  3. set the leader and set ISR to empty
    *  4. start a fetcher to the new leader
    */
-  def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR): Boolean = {
-    leaderISRUpdateLock synchronized  {
+  private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = {
+      trace("Started to become follower at the request %s".format(leaderAndISR.toString()))
       val newLeaderBrokerId: Int = leaderAndISR.leader
       info("Starting the follower state transition to follow leader %d for topic %s partition %d"
                    .format(newLeaderBrokerId, topic, partitionId))
       val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(newLeaderBrokerId)).head
-      val currentLeaderBrokerIdOpt = replicaFetcherManager.fetcherSourceBroker(topic, partitionId)
-      // become follower only if it is not already following the same leader
-      val shouldBecomeFollower = currentLeaderBrokerIdOpt match {
-        case Some(currentLeaderBrokerId) => currentLeaderBrokerId != newLeaderBrokerId
-        case None => true
-      }
-      if(shouldBecomeFollower) {
-        info("Becoming follower to leader %d for topic %s partition %d".format(newLeaderBrokerId, topic, partitionId))
-        // stop fetcher thread to previous leader
-        replicaFetcherManager.removeFetcher(topic, partitionId)
-
-        // make sure local replica exists
-        val localReplica = getOrCreateReplica()
-        localReplica.log.get.truncateTo(localReplica.highWatermark)
-        inSyncReplicas = Set.empty[Replica]
-        leaderReplicaIdOpt = Some(newLeaderBrokerId)
-
-        // start fetcher thread to current leader
-        replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
-        true
-      } else
-        false
-    }
+      // stop fetcher thread to previous leader
+      replicaFetcherManager.removeFetcher(topic, partitionId)
+
+      // make sure local replica exists
+      val localReplica = getOrCreateReplica()
+      localReplica.log.get.truncateTo(localReplica.highWatermark)
+      inSyncReplicas = Set.empty[Replica]
+      leaderEpoch = leaderAndISR.leaderEpoch
+      zkVersion = leaderAndISR.zkVersion
+      leaderReplicaIdOpt = Some(newLeaderBrokerId)
+      // start fetcher thread to current leader
+      replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
   }
 
   def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) {
@@ -260,14 +262,15 @@ class Partition(val topic: String,
 
   private def updateISR(newISR: Set[Replica]) {
     info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(",")))
-    inSyncReplicas = newISR
-    val curLeaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partitionId)
-    curLeaderAndISR match {
-      case None =>
-        throw new IllegalStateException("The leaderAndISR info for partition [%s, %s] is not in Zookeeper".format(topic, partitionId))
-      case Some(m) =>
-         m.ISR = newISR.map(r => r.brokerId).toList
-         ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), m.toString)
+    val newLeaderAndISR = new LeaderAndISR(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion)
+    val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+      ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), newLeaderAndISR.toString, zkVersion)
+    if (updateSucceeded){
+      inSyncReplicas = newISR
+      zkVersion = newVersion
+      trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newISR.mkString(","), zkVersion))
+    } else {
+      info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1377157&r1=1377156&r2=1377157&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Sat Aug 25 00:06:33 2012
@@ -154,7 +154,7 @@ class ReplicaManager(val config: KafkaCo
   private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = {
     info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
     val partition = getOrCreatePartition(topic, partitionId)
-    if (partition.makeLeader(topic, partitionId, leaderAndISR)) {
+    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, true)) {
       // also add this partition to the list of partitions for which the leader is the current broker
       leaderPartitionsLock synchronized {
         leaderPartitions += partition
@@ -169,7 +169,7 @@ class ReplicaManager(val config: KafkaCo
                  .format(leaderBrokerId, topic, partitionId))
 
     val partition = getOrCreatePartition(topic, partitionId)
-    if (partition.makeFollower(topic, partitionId, leaderAndISR)) {
+    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, false)) {
       // remove this replica's partition from the ISR expiration queue
       leaderPartitionsLock synchronized {
         leaderPartitions -= partition