You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/25 02:06:34 UTC
svn commit: r1377157 - in
/incubator/kafka/branches/0.8/core/src/main/scala/kafka:
cluster/Partition.scala server/ReplicaManager.scala
Author: junrao
Date: Sat Aug 25 00:06:33 2012
New Revision: 1377157
URL: http://svn.apache.org/viewvc?rev=1377157&view=rev
Log:
update leaderAndISR in ZK conditionally; patched by Yang Ye; reviewed by Jun Rao; KAFKA-428
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1377157&r1=1377156&r2=1377157&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Sat Aug 25 00:06:33 2012
@@ -39,6 +39,9 @@ class Partition(val topic: String,
var inSyncReplicas: Set[Replica] = Set.empty[Replica]
private val assignedReplicaMap = new Pool[Int,Replica]
private val leaderISRUpdateLock = new Object
+ private var zkVersion: Int = LeaderAndISR.initialZKVersion
+ private var leaderEpoch: Int = LeaderAndISR.initialLeaderEpoch - 1
+ this.logIdent = "Partition [%s, %d] on broker %d, ".format(topic, partitionId, localBrokerId)
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
@@ -90,71 +93,70 @@ class Partition(val topic: String,
assignedReplicaMap.values.toSet
}
+
+ /**
+ * If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader.
+ */
+ def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR, isMakingLeader: Boolean): Boolean = {
+ leaderISRUpdateLock synchronized {
+ if (leaderEpoch >= leaderAndISR.leaderEpoch){
+ info("Current leaderEpoch [%d] is larger or equal to the requested leaderEpoch [%d], discard the become %s request"
+ .format(leaderEpoch, leaderAndISR.leaderEpoch, if(isMakingLeader) "leader" else "follower"))
+ return false
+ }
+ if(isMakingLeader)
+ makeLeader(topic, partitionId, leaderAndISR)
+ else
+ makeFollower(topic, partitionId, leaderAndISR)
+ true
+ }
+ }
+
/**
- * If the local replica is not the leader, make the local replica the leader in the following steps.
+ * If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the leader in the following steps.
* 1. stop the existing replica fetcher
* 2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in ISR to be available)
* 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
* 4. set the new leader and ISR
*/
- def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR): Boolean = {
- leaderISRUpdateLock synchronized {
- val shouldBecomeLeader = leaderReplicaIdOpt match {
- case Some(leaderReplicaId) => !isReplicaLocal(leaderReplicaId)
- case None => true
- }
- if (shouldBecomeLeader) {
- info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
- // stop replica fetcher thread, if any
- replicaFetcherManager.removeFetcher(topic, partitionId)
-
- val newInSyncReplicas = leaderAndISR.ISR.map(r => getOrCreateReplica(r)).toSet
- // reset LogEndOffset for remote replicas
- assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
- inSyncReplicas = newInSyncReplicas
- leaderReplicaIdOpt = Some(localBrokerId)
- true
- } else
- false
- }
+ private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) {
+ trace("Started to become leader at the request %s".format(leaderAndISR.toString()))
+ // stop replica fetcher thread, if any
+ replicaFetcherManager.removeFetcher(topic, partitionId)
+
+ val newInSyncReplicas = leaderAndISR.ISR.map(r => getOrCreateReplica(r)).toSet
+ // reset LogEndOffset for remote replicas
+ assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
+ inSyncReplicas = newInSyncReplicas
+ leaderEpoch = leaderAndISR.leaderEpoch
+ zkVersion = leaderAndISR.zkVersion
+ leaderReplicaIdOpt = Some(localBrokerId)
}
/**
- * If the local replica is not already following the new leader, make it follow the new leader.
* 1. stop any existing fetcher on this partition from the local replica
* 2. make sure local replica exists and truncate the log to high watermark
* 3. set the leader and set ISR to empty
* 4. start a fetcher to the new leader
*/
- def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR): Boolean = {
- leaderISRUpdateLock synchronized {
+ private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = {
+ trace("Started to become follower at the request %s".format(leaderAndISR.toString()))
val newLeaderBrokerId: Int = leaderAndISR.leader
info("Starting the follower state transition to follow leader %d for topic %s partition %d"
.format(newLeaderBrokerId, topic, partitionId))
val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(newLeaderBrokerId)).head
- val currentLeaderBrokerIdOpt = replicaFetcherManager.fetcherSourceBroker(topic, partitionId)
- // become follower only if it is not already following the same leader
- val shouldBecomeFollower = currentLeaderBrokerIdOpt match {
- case Some(currentLeaderBrokerId) => currentLeaderBrokerId != newLeaderBrokerId
- case None => true
- }
- if(shouldBecomeFollower) {
- info("Becoming follower to leader %d for topic %s partition %d".format(newLeaderBrokerId, topic, partitionId))
- // stop fetcher thread to previous leader
- replicaFetcherManager.removeFetcher(topic, partitionId)
-
- // make sure local replica exists
- val localReplica = getOrCreateReplica()
- localReplica.log.get.truncateTo(localReplica.highWatermark)
- inSyncReplicas = Set.empty[Replica]
- leaderReplicaIdOpt = Some(newLeaderBrokerId)
-
- // start fetcher thread to current leader
- replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
- true
- } else
- false
- }
+ // stop fetcher thread to previous leader
+ replicaFetcherManager.removeFetcher(topic, partitionId)
+
+ // make sure local replica exists
+ val localReplica = getOrCreateReplica()
+ localReplica.log.get.truncateTo(localReplica.highWatermark)
+ inSyncReplicas = Set.empty[Replica]
+ leaderEpoch = leaderAndISR.leaderEpoch
+ zkVersion = leaderAndISR.zkVersion
+ leaderReplicaIdOpt = Some(newLeaderBrokerId)
+ // start fetcher thread to current leader
+ replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
}
def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) {
@@ -260,14 +262,15 @@ class Partition(val topic: String,
private def updateISR(newISR: Set[Replica]) {
info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(",")))
- inSyncReplicas = newISR
- val curLeaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partitionId)
- curLeaderAndISR match {
- case None =>
- throw new IllegalStateException("The leaderAndISR info for partition [%s, %s] is not in Zookeeper".format(topic, partitionId))
- case Some(m) =>
- m.ISR = newISR.map(r => r.brokerId).toList
- ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), m.toString)
+ val newLeaderAndISR = new LeaderAndISR(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion)
+ val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+ ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), newLeaderAndISR.toString, zkVersion)
+ if (updateSucceeded){
+ inSyncReplicas = newISR
+ zkVersion = newVersion
+ trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newISR.mkString(","), zkVersion))
+ } else {
+ info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1377157&r1=1377156&r2=1377157&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Sat Aug 25 00:06:33 2012
@@ -154,7 +154,7 @@ class ReplicaManager(val config: KafkaCo
private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = {
info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
val partition = getOrCreatePartition(topic, partitionId)
- if (partition.makeLeader(topic, partitionId, leaderAndISR)) {
+ if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, true)) {
// also add this partition to the list of partitions for which the leader is the current broker
leaderPartitionsLock synchronized {
leaderPartitions += partition
@@ -169,7 +169,7 @@ class ReplicaManager(val config: KafkaCo
.format(leaderBrokerId, topic, partitionId))
val partition = getOrCreatePartition(topic, partitionId)
- if (partition.makeFollower(topic, partitionId, leaderAndISR)) {
+ if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, false)) {
// remove this replica's partition from the ISR expiration queue
leaderPartitionsLock synchronized {
leaderPartitions -= partition