You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/09/10 19:09:52 UTC
svn commit: r1382988 [1/2] - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/
main/scala/kafka/common/ main/scala/kafka/consumer/
main/scala/kafka/server/ main/scala/kafka/utils/ test/sc...
Author: nehanarkhede
Date: Mon Sep 10 17:09:52 2012
New Revision: 1382988
URL: http://svn.apache.org/viewvc?rev=1382988&view=rev
Log:
KAFKA-498: Controller has race conditions and synchronization bugs; patched by Neha Narkhede; reviewed by Jun Rao
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotAvailableException.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Mon Sep 10 17:09:52 2012
@@ -24,7 +24,7 @@ import kafka.utils.{Logging, Utils, ZkUt
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import scala.collection.mutable
-import kafka.common.{LeaderNotAvailableException, ReplicaNotAvailableException, ErrorMapping}
+import kafka.common.{BrokerNotAvailableException, LeaderNotAvailableException, ReplicaNotAvailableException, ErrorMapping}
object AdminUtils extends Logging {
val rand = new Random
@@ -148,13 +148,11 @@ object AdminUtils extends Logging {
optionalBrokerInfo match {
case Some(brokerInfo) => brokerInfo // return broker info from the cache
case None => // fetch it from zookeeper
- try {
- val brokerInfo = ZkUtils.getBrokerInfoFromIds(zkClient, List(id)).head
- cachedBrokerInfo += (id -> brokerInfo)
- brokerInfo
- }catch {
- case e => error("Failed to fetch broker info for broker id " + id)
- throw e
+ ZkUtils.getBrokerInfo(zkClient, id) match {
+ case Some(brokerInfo) =>
+ cachedBrokerInfo += (id -> brokerInfo)
+ brokerInfo
+ case None => throw new BrokerNotAvailableException("Failed to fetch broker info for broker " + id)
}
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala Mon Sep 10 17:09:52 2012
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package kafka.api
-
-import java.nio._
-import kafka.utils._
-import collection.mutable.Map
-import collection.mutable.HashMap
-
-
-object LeaderAndISR {
- val initialLeaderEpoch: Int = 0
- val initialZKVersion: Int = 0
- def readFrom(buffer: ByteBuffer): LeaderAndISR = {
- val leader = buffer.getInt
- val leaderGenId = buffer.getInt
- val ISRString = Utils.readShortString(buffer, "UTF-8")
- val ISR = ISRString.split(",").map(_.toInt).toList
- val zkVersion = buffer.getInt
- new LeaderAndISR(leader, leaderGenId, ISR, zkVersion)
- }
-}
-
-case class LeaderAndISR(var leader: Int, var leaderEpoch: Int, var ISR: List[Int], var zkVersion: Int){
- def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndISR.initialLeaderEpoch, ISR, LeaderAndISR.initialZKVersion)
-
- def writeTo(buffer: ByteBuffer) {
- buffer.putInt(leader)
- buffer.putInt(leaderEpoch)
- Utils.writeShortString(buffer, ISR.mkString(","), "UTF-8")
- buffer.putInt(zkVersion)
- }
-
- def sizeInBytes(): Int = {
- val size = 4 + 4 + (2 + ISR.mkString(",").length) + 4
- size
- }
-
- override def toString(): String = {
- val jsonDataMap = new HashMap[String, String]
- jsonDataMap.put("leader", leader.toString)
- jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
- jsonDataMap.put("ISR", ISR.mkString(","))
- Utils.stringMapToJsonString(jsonDataMap)
- }
-}
-
-
-object LeaderAndISRRequest {
- val CurrentVersion = 1.shortValue()
- val DefaultClientId = ""
- val IsInit: Boolean = true
- val NotInit: Boolean = false
- val DefaultAckTimeout: Int = 1000
-
- def readFrom(buffer: ByteBuffer): LeaderAndISRRequest = {
- val versionId = buffer.getShort
- val clientId = Utils.readShortString(buffer)
- val isInit = if(buffer.get() == 1.toByte) true else false
- val ackTimeoutMs = buffer.getInt
- val leaderAndISRRequestCount = buffer.getInt
- val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndISR]
-
- for(i <- 0 until leaderAndISRRequestCount){
- val topic = Utils.readShortString(buffer, "UTF-8")
- val partition = buffer.getInt
- val leaderAndISRRequest = LeaderAndISR.readFrom(buffer)
-
- leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
- }
- new LeaderAndISRRequest(versionId, clientId, isInit, ackTimeoutMs, leaderAndISRInfos)
- }
-}
-
-
-case class LeaderAndISRRequest (versionId: Short,
- clientId: String,
- isInit: Boolean,
- ackTimeoutMs: Int,
- leaderAndISRInfos: Map[(String, Int), LeaderAndISR])
- extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) {
- def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
- this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, LeaderAndISRRequest.DefaultAckTimeout, leaderAndISRInfos)
- }
-
- def writeTo(buffer: ByteBuffer) {
- buffer.putShort(versionId)
- Utils.writeShortString(buffer, clientId)
- buffer.put(if(isInit) 1.toByte else 0.toByte)
- buffer.putInt(ackTimeoutMs)
- buffer.putInt(leaderAndISRInfos.size)
- for((key, value) <- leaderAndISRInfos){
- Utils.writeShortString(buffer, key._1, "UTF-8")
- buffer.putInt(key._2)
- value.writeTo(buffer)
- }
- }
-
- def sizeInBytes(): Int = {
- var size = 1 + 2 + (2 + clientId.length) + 4 + 4
- for((key, value) <- leaderAndISRInfos)
- size += (2 + key._1.length) + 4 + value.sizeInBytes
- size
- }
-}
\ No newline at end of file
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala?rev=1382988&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala Mon Sep 10 17:09:52 2012
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package kafka.api
+
+import java.nio._
+import kafka.utils._
+import collection.mutable.Map
+import collection.mutable.HashMap
+
+
+object LeaderAndIsr {
+ val initialLeaderEpoch: Int = 0
+ val initialZKVersion: Int = 0
+ def readFrom(buffer: ByteBuffer): LeaderAndIsr = {
+ val leader = buffer.getInt
+ val leaderGenId = buffer.getInt
+ val ISRString = Utils.readShortString(buffer, "UTF-8")
+ val ISR = ISRString.split(",").map(_.toInt).toList
+ val zkVersion = buffer.getInt
+ new LeaderAndIsr(leader, leaderGenId, ISR, zkVersion)
+ }
+}
+
+case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int){
+ def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR, LeaderAndIsr.initialZKVersion)
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putInt(leader)
+ buffer.putInt(leaderEpoch)
+ Utils.writeShortString(buffer, isr.mkString(","), "UTF-8")
+ buffer.putInt(zkVersion)
+ }
+
+ def sizeInBytes(): Int = {
+ val size = 4 + 4 + (2 + isr.mkString(",").length) + 4
+ size
+ }
+
+ override def toString(): String = {
+ val jsonDataMap = new HashMap[String, String]
+ jsonDataMap.put("leader", leader.toString)
+ jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
+ jsonDataMap.put("ISR", isr.mkString(","))
+ Utils.stringMapToJsonString(jsonDataMap)
+ }
+}
+
+
+object LeaderAndIsrRequest {
+ val CurrentVersion = 1.shortValue()
+ val DefaultClientId = ""
+ val IsInit: Boolean = true
+ val NotInit: Boolean = false
+ val DefaultAckTimeout: Int = 1000
+
+ def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = {
+ val versionId = buffer.getShort
+ val clientId = Utils.readShortString(buffer)
+ val isInit = if(buffer.get() == 1.toByte) true else false
+ val ackTimeoutMs = buffer.getInt
+ val leaderAndISRRequestCount = buffer.getInt
+ val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndIsr]
+
+ for(i <- 0 until leaderAndISRRequestCount){
+ val topic = Utils.readShortString(buffer, "UTF-8")
+ val partition = buffer.getInt
+ val leaderAndISRRequest = LeaderAndIsr.readFrom(buffer)
+
+ leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
+ }
+ new LeaderAndIsrRequest(versionId, clientId, isInit, ackTimeoutMs, leaderAndISRInfos)
+ }
+}
+
+
+case class LeaderAndIsrRequest (versionId: Short,
+ clientId: String,
+ isInit: Boolean,
+ ackTimeoutMs: Int,
+ leaderAndISRInfos: Map[(String, Int), LeaderAndIsr])
+ extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) {
+ def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
+ this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, isInit, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos)
+ }
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putShort(versionId)
+ Utils.writeShortString(buffer, clientId)
+ buffer.put(if(isInit) 1.toByte else 0.toByte)
+ buffer.putInt(ackTimeoutMs)
+ buffer.putInt(leaderAndISRInfos.size)
+ for((key, value) <- leaderAndISRInfos){
+ Utils.writeShortString(buffer, key._1, "UTF-8")
+ buffer.putInt(key._2)
+ value.writeTo(buffer)
+ }
+ }
+
+ def sizeInBytes(): Int = {
+ var size = 1 + 2 + (2 + clientId.length) + 4 + 4
+ for((key, value) <- leaderAndISRInfos)
+ size += (2 + key._1.length) + 4 + value.sizeInBytes
+ size
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala Mon Sep 10 17:09:52 2012
@@ -19,7 +19,7 @@ package kafka.cluster
import kafka.utils.Utils._
import java.nio.ByteBuffer
-import kafka.common.BrokerNotExistException
+import kafka.common.BrokerNotAvailableException
/**
* A Kafka broker
@@ -28,7 +28,7 @@ private[kafka] object Broker {
def createBroker(id: Int, brokerInfoString: String): Broker = {
if(brokerInfoString == null)
- throw new BrokerNotExistException("Broker id %s does not exist".format(id))
+ throw new BrokerNotAvailableException("Broker id %s does not exist".format(id))
val brokerInfo = brokerInfoString.split(":")
new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Mon Sep 10 17:09:52 2012
@@ -19,7 +19,7 @@ package kafka.cluster
import scala.collection._
import kafka.utils._
import java.lang.Object
-import kafka.api.LeaderAndISR
+import kafka.api.LeaderAndIsr
import kafka.server.ReplicaManager
import kafka.common.ErrorMapping
@@ -39,8 +39,8 @@ class Partition(val topic: String,
var inSyncReplicas: Set[Replica] = Set.empty[Replica]
private val assignedReplicaMap = new Pool[Int,Replica]
private val leaderISRUpdateLock = new Object
- private var zkVersion: Int = LeaderAndISR.initialZKVersion
- private var leaderEpoch: Int = LeaderAndISR.initialLeaderEpoch - 1
+ private var zkVersion: Int = LeaderAndIsr.initialZKVersion
+ private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
this.logIdent = "Partition [%s, %d] on broker %d, ".format(topic, partitionId, localBrokerId)
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
@@ -53,7 +53,7 @@ class Partition(val topic: String,
if (isReplicaLocal(replicaId)) {
val log = logManager.getOrCreateLog(topic, partitionId)
val localReplica = new Replica(replicaId, this, time,
- highwaterMarkCheckpoint.read(topic, partitionId), Some(log))
+ highwaterMarkCheckpoint.read(topic, partitionId), Some(log))
addReplicaIfNotExists(localReplica)
}
else {
@@ -97,7 +97,7 @@ class Partition(val topic: String,
/**
* If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader.
*/
- def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR, isMakingLeader: Boolean): Boolean = {
+ def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr, isMakingLeader: Boolean): Boolean = {
leaderISRUpdateLock synchronized {
if (leaderEpoch >= leaderAndISR.leaderEpoch){
info("Current leaderEpoch [%d] is larger or equal to the requested leaderEpoch [%d], discard the become %s request"
@@ -119,20 +119,20 @@ class Partition(val topic: String,
* 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
* 4. set the new leader and ISR
*/
- private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) {
- trace("Started to become leader at the request %s".format(leaderAndISR.toString()))
- // stop replica fetcher thread, if any
- replicaFetcherManager.removeFetcher(topic, partitionId)
-
- val newInSyncReplicas = leaderAndISR.ISR.map(r => getOrCreateReplica(r)).toSet
- // reset LogEndOffset for remote replicas
- assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
- inSyncReplicas = newInSyncReplicas
- leaderEpoch = leaderAndISR.leaderEpoch
- zkVersion = leaderAndISR.zkVersion
- leaderReplicaIdOpt = Some(localBrokerId)
- // we may need to increment high watermark since ISR could be down to 1
- maybeIncrementLeaderHW(getReplica().get)
+ private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) {
+ trace("Started to become leader at the request %s".format(leaderAndISR.toString()))
+ // stop replica fetcher thread, if any
+ replicaFetcherManager.removeFetcher(topic, partitionId)
+
+ val newInSyncReplicas = leaderAndISR.isr.map(r => getOrCreateReplica(r)).toSet
+ // reset LogEndOffset for remote replicas
+ assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
+ inSyncReplicas = newInSyncReplicas
+ leaderEpoch = leaderAndISR.leaderEpoch
+ zkVersion = leaderAndISR.zkVersion
+ leaderReplicaIdOpt = Some(localBrokerId)
+ // we may need to increment high watermark since ISR could be down to 1
+ maybeIncrementLeaderHW(getReplica().get)
}
/**
@@ -141,24 +141,28 @@ class Partition(val topic: String,
* 3. set the leader and set ISR to empty
* 4. start a fetcher to the new leader
*/
- private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = {
- trace("Started to become follower at the request %s".format(leaderAndISR.toString()))
- val newLeaderBrokerId: Int = leaderAndISR.leader
- info("Starting the follower state transition to follow leader %d for topic %s partition %d"
- .format(newLeaderBrokerId, topic, partitionId))
- val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(newLeaderBrokerId)).head
- // stop fetcher thread to previous leader
- replicaFetcherManager.removeFetcher(topic, partitionId)
-
- // make sure local replica exists
- val localReplica = getOrCreateReplica()
- localReplica.log.get.truncateTo(localReplica.highWatermark)
- inSyncReplicas = Set.empty[Replica]
- leaderEpoch = leaderAndISR.leaderEpoch
- zkVersion = leaderAndISR.zkVersion
- leaderReplicaIdOpt = Some(newLeaderBrokerId)
- // start fetcher thread to current leader
- replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
+ private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = {
+ trace("Started to become follower at the request %s".format(leaderAndISR.toString()))
+ val newLeaderBrokerId: Int = leaderAndISR.leader
+ info("Starting the follower state transition to follow leader %d for topic %s partition %d"
+ .format(newLeaderBrokerId, topic, partitionId))
+ ZkUtils.getBrokerInfo(zkClient, newLeaderBrokerId) match {
+ case Some(leaderBroker) =>
+ // stop fetcher thread to previous leader
+ replicaFetcherManager.removeFetcher(topic, partitionId)
+ // make sure local replica exists
+ val localReplica = getOrCreateReplica()
+ localReplica.log.get.truncateTo(localReplica.highWatermark)
+ inSyncReplicas = Set.empty[Replica]
+ leaderEpoch = leaderAndISR.leaderEpoch
+ zkVersion = leaderAndISR.zkVersion
+ leaderReplicaIdOpt = Some(newLeaderBrokerId)
+ // start fetcher thread to current leader
+ replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
+ case None => // leader went down
+ warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) +
+ " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId))
+ }
}
def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) {
@@ -197,7 +201,7 @@ class Partition(val topic: String,
})
trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
if ((requiredAcks < 0 && numAcks >= inSyncReplicas.size) ||
- (requiredAcks > 0 && numAcks >= requiredAcks)) {
+ (requiredAcks > 0 && numAcks >= requiredAcks)) {
/*
* requiredAcks < 0 means acknowledge after all replicas in ISR
* are fully caught up to the (local) leader's offset
@@ -211,7 +215,7 @@ class Partition(val topic: String,
}
}
}
-
+
def maybeIncrementLeaderHW(leaderReplica: Replica) {
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
val newHighWatermark = allLogEndOffsets.min
@@ -220,7 +224,7 @@ class Partition(val topic: String,
leaderReplica.highWatermark = newHighWatermark
else
debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s"
- .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
+ .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
}
def maybeShrinkISR(replicaMaxLagTimeMs: Long, replicaMaxLagBytes: Long) {
@@ -249,7 +253,7 @@ class Partition(val topic: String,
* for keepInSyncTimeMs ms, the follower is stuck and should be removed from the ISR
* 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncBytes, the
* follower is not catching up and should be removed from the ISR
- **/
+ **/
val leaderLogEndOffset = leaderReplica.logEndOffset
val candidateReplicas = inSyncReplicas - leaderReplica
// Case 1 above
@@ -266,9 +270,9 @@ class Partition(val topic: String,
private def updateISR(newISR: Set[Replica]) {
info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(",")))
- val newLeaderAndISR = new LeaderAndISR(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion)
+ val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion)
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
- ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), newLeaderAndISR.toString, zkVersion)
+ ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString, zkVersion)
if (updateSucceeded){
inSyncReplicas = newISR
zkVersion = newVersion
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotAvailableException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotAvailableException.scala?rev=1382988&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotAvailableException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotAvailableException.scala Mon Sep 10 17:09:52 2012
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class BrokerNotAvailableException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala Mon Sep 10 17:09:52 2012
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-class BrokerNotExistException(message: String) extends RuntimeException(message) {
- def this() = this(null)
-}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Mon Sep 10 17:09:52 2012
@@ -38,7 +38,7 @@ object ErrorMapping {
val LeaderNotAvailableCode : Short = 6
val NotLeaderForPartitionCode : Short = 7
val RequestTimedOutCode: Short = 8
- val BrokerNotExistInZookeeperCode: Short = 9
+ val BrokerNotAvailableCode: Short = 9
val ReplicaNotAvailableCode: Short = 10
private val exceptionToCode =
@@ -51,7 +51,7 @@ object ErrorMapping {
classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
- classOf[BrokerNotExistException].asInstanceOf[Class[Throwable]] -> BrokerNotExistInZookeeperCode,
+ classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode
).withDefaultValue(UnknownCode)
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala?rev=1382988&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala Mon Sep 10 17:09:52 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * This exception is thrown by the leader elector in the controller when leader election fails for a partition since
+ * all the replicas for a partition are offline
+ */
+class PartitionOfflineException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Sep 10 17:09:52 2012
@@ -292,21 +292,21 @@ private[kafka] class ZookeeperConsumerCo
return partitionInfo.getConsumeOffset
}
- //otherwise, try to get it from zookeeper
+ // otherwise, try to get it from zookeeper
try {
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
val znode = topicDirs.consumerOffsetDir + "/" + partitionId
val offsetString = readDataMaybeNull(zkClient, znode)._1
- if (offsetString != null)
- return offsetString.toLong
- else
- return -1
+ offsetString match {
+ case Some(offset) => offset.toLong
+ case None => -1L
+ }
}
catch {
case e =>
error("error in getConsumedOffset JMX ", e)
+ -2L
}
- return -2
}
def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long =
@@ -649,18 +649,19 @@ private[kafka] class ZookeeperConsumerCo
val znode = topicDirs.consumerOffsetDir + "/" + partition
val offsetString = readDataMaybeNull(zkClient, znode)._1
// If first time starting a consumer, set the initial offset based on the config
- var offset : Long = 0L
- if (offsetString == null)
- offset = config.autoOffsetReset match {
+ val offset =
+ offsetString match {
+ case Some(offsetStr) => offsetStr.toLong
+ case None =>
+ config.autoOffsetReset match {
case OffsetRequest.SmallestTimeString =>
- earliestOrLatestOffset(topic, leader, partition, OffsetRequest.EarliestTime)
+ earliestOrLatestOffset(topic, leader, partition, OffsetRequest.EarliestTime)
case OffsetRequest.LargestTimeString =>
- earliestOrLatestOffset(topic, leader, partition, OffsetRequest.LatestTime)
+ earliestOrLatestOffset(topic, leader, partition, OffsetRequest.LatestTime)
case _ =>
- throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
+ throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
+ }
}
- else
- offset = offsetString.toLong
val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
val consumedOffset = new AtomicLong(offset)
val fetchedOffset = new AtomicLong(offset)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon Sep 10 17:09:52 2012
@@ -66,7 +66,7 @@ class KafkaApis(val requestChannel: Requ
}
def handleLeaderAndISRRequest(request: RequestChannel.Request){
- val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer)
+ val leaderAndISRRequest = LeaderAndIsrRequest.readFrom(request.request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest)
trace("Handling leader and isr request " + leaderAndISRRequest)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Mon Sep 10 17:09:52 2012
@@ -22,20 +22,21 @@ import collection.immutable.Set
import kafka.cluster.Broker
import kafka.api._
import kafka.network.{Receive, BlockingChannel}
-import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.{IZkStateListener, ZkClient, IZkDataListener, IZkChildListener}
import org.apache.zookeeper.Watcher.Event.KeeperState
import collection.JavaConversions._
import kafka.utils.{ShutdownableThread, ZkUtils, Logging}
import java.lang.Object
+import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
+import kafka.common.{KafkaException, PartitionOfflineException}
class RequestSendThread(val controllerId: Int,
val toBrokerId: Int,
val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
val channel: BlockingChannel)
- extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)){
+ extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) {
private val lock = new Object()
override def doWork(): Unit = {
@@ -44,163 +45,157 @@ class RequestSendThread(val controllerId
val callback = queueItem._2
var receive: Receive = null
-
- try{
- lock synchronized {
- channel.send(request)
- receive = channel.receive()
- var response: RequestOrResponse = null
- request.requestId.get match {
- case RequestKeys.LeaderAndISRRequest =>
- response = LeaderAndISRResponse.readFrom(receive.buffer)
- case RequestKeys.StopReplicaRequest =>
- response = StopReplicaResponse.readFrom(receive.buffer)
- }
- trace("got a response %s".format(controllerId, response, toBrokerId))
- if(callback != null){
- callback(response)
- }
- }
- } catch {
- case e =>
- // log it and let it go. Let controller shut it down.
- debug("Exception occurs", e)
+ try{
+ lock synchronized {
+ channel.send(request)
+ receive = channel.receive()
+ var response: RequestOrResponse = null
+ request.requestId.get match {
+ case RequestKeys.LeaderAndISRRequest =>
+ response = LeaderAndISRResponse.readFrom(receive.buffer)
+ case RequestKeys.StopReplicaRequest =>
+ response = StopReplicaResponse.readFrom(receive.buffer)
+ }
+ trace("got a response %s".format(controllerId, response, toBrokerId))
+
+ if(callback != null){
+ callback(response)
}
}
-
+ } catch {
+ case e =>
+ // log it and let it go. Let controller shut it down.
+ debug("Exception occurs", e)
+ }
+ }
}
-class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) extends Logging{
- private val brokers = new HashMap[Int, Broker]
- private val messageChannels = new HashMap[Int, BlockingChannel]
- private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]]
- private val messageThreads = new HashMap[Int, RequestSendThread]
- private val lock = new Object
+class ControllerChannelManager private (config: KafkaConfig) extends Logging {
+ private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
+ private val brokerLock = new Object
this.logIdent = "[Channel manager on controller " + config.brokerId + "], "
- for(broker <- allBrokers){
- brokers.put(broker.id, broker)
- info("channel to broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
- val channel = new BlockingChannel(broker.host, broker.port,
- BlockingChannel.UseDefaultBufferSize,
- BlockingChannel.UseDefaultBufferSize,
- config.controllerSocketTimeoutMs)
- channel.connect()
- messageChannels.put(broker.id, channel)
- messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
+
+ def this(allBrokers: Set[Broker], config : KafkaConfig) {
+ this(config)
+ allBrokers.foreach(addNewBroker(_))
}
def startup() = {
- for((brokerId, broker) <- brokers){
- val thread = new RequestSendThread(config.brokerId, brokerId, messageQueues(brokerId), messageChannels(brokerId))
- thread.setDaemon(false)
- thread.start()
- messageThreads.put(broker.id, thread)
+ brokerLock synchronized {
+ brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
}
}
def shutdown() = {
- lock synchronized {
- for((brokerId, broker) <- brokers){
- removeBroker(brokerId)
- }
+ brokerLock synchronized {
+ brokerStateInfo.foreach(brokerState => removeExistingBroker(brokerState._1))
}
}
- def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null){
- messageQueues(brokerId).put((request, callback))
+ def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) {
+ brokerLock synchronized {
+ brokerStateInfo(brokerId).messageQueue.put((request, callback))
+ }
}
- def addBroker(broker: Broker){
- lock synchronized {
- brokers.put(broker.id, broker)
- messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
- val channel = new BlockingChannel(broker.host, broker.port,
- BlockingChannel.UseDefaultBufferSize,
- BlockingChannel.UseDefaultBufferSize,
- config.controllerSocketTimeoutMs)
- channel.connect()
- messageChannels.put(broker.id, channel)
- val thread = new RequestSendThread(config.brokerId, broker.id, messageQueues(broker.id), messageChannels(broker.id))
- thread.setDaemon(false)
- thread.start()
- messageThreads.put(broker.id, thread)
+ def addBroker(broker: Broker) {
+ brokerLock synchronized {
+ addNewBroker(broker)
+ startRequestSendThread(broker.id)
}
}
- def removeBroker(brokerId: Int){
- lock synchronized {
- brokers.remove(brokerId)
- try {
- messageChannels(brokerId).disconnect()
- messageChannels.remove(brokerId)
- messageQueues.remove(brokerId)
- messageThreads(brokerId).shutdown()
- messageThreads.remove(brokerId)
- }catch {
- case e => error("Error while removing broker by the controller", e)
- }
+ def removeBroker(brokerId: Int) {
+ brokerLock synchronized {
+ removeExistingBroker(brokerId)
+ }
+ }
+
+ private def addNewBroker(broker: Broker) {
+ val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)
+ val channel = new BlockingChannel(broker.host, broker.port,
+ BlockingChannel.UseDefaultBufferSize,
+ BlockingChannel.UseDefaultBufferSize,
+ config.controllerSocketTimeoutMs)
+ channel.connect()
+ val requestThread = new RequestSendThread(config.brokerId, broker.id, messageQueue, channel)
+ requestThread.setDaemon(false)
+ brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread))
+ }
+
+ private def removeExistingBroker(brokerId: Int) {
+ try {
+ brokerStateInfo(brokerId).channel.disconnect()
+ brokerStateInfo(brokerId).requestSendThread.shutdown()
+ brokerStateInfo.remove(brokerId)
+ }catch {
+ case e => error("Error while removing broker by the controller", e)
}
}
+
+ private def startRequestSendThread(brokerId: Int) {
+ brokerStateInfo(brokerId).requestSendThread.start()
+ }
}
+case class ControllerBrokerStateInfo(channel: BlockingChannel,
+ broker: Broker,
+ messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
+ requestSendThread: RequestSendThread)
+
class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging {
this.logIdent = "[Controller " + config.brokerId + "], "
- info("startup");
private var isRunning = true
private val controllerLock = new Object
private var controllerChannelManager: ControllerChannelManager = null
- private var allBrokers : Set[Broker] = null
- private var allBrokerIds : Set[Int] = null
+ private var liveBrokers : Set[Broker] = null
+ private var liveBrokerIds : Set[Int] = null
private var allTopics: Set[String] = null
private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null
private var allLeaders: mutable.Map[(String, Int), Int] = null
- // Return true if this controller succeeds in the controller competition
+ // Return true if this controller succeeds in the controller leader election
private def tryToBecomeController(): Boolean = {
- try {
- ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString)
- // Only the broker successfully registering as the controller can execute following code, otherwise
- // some exception will be thrown.
- registerBrokerChangeListener()
- registerTopicChangeListener()
- allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
- allBrokerIds = allBrokers.map(_.id)
- info("all brokers: %s".format(allBrokerIds))
- allTopics = ZkUtils.getAllTopics(zkClient).toSet
- info("all topics: %s".format(allTopics))
- allPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics.iterator)
- info("allPartitionReplicaAssignment: %s".format(allPartitionReplicaAssignment))
- allLeaders = new mutable.HashMap[(String, Int), Int]
- controllerChannelManager = new ControllerChannelManager(allBrokers, config)
- controllerChannelManager.startup()
- return true
- } catch {
- case e: ZkNodeExistsException =>
- registerControllerExistListener()
- info("broker didn't succeed registering as the controller since it's taken by someone else")
- return false
- case e2 => throw e2
- }
- }
-
- private def controllerRegisterOrFailover(){
- if(!isRunning){
- info("controller has already been shut down, don't need to compete for lead controller any more")
- return
- }
- info("try to become controller")
- if(tryToBecomeController() == true){
- info("won the controller competition and work on leader and isr recovery")
- deliverLeaderAndISRFromZookeeper(allBrokerIds, allTopics)
- debug("work on broker changes")
- onBrokerChange()
-
- // If there are some partition with leader not initialized, init the leader for them
- val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(m => !allLeaders.contains(m._1))
- debug("work on init leaders: %s, current cache for all leader is: %s".format(partitionReplicaAssignment.toString(), allLeaders))
- initLeaders(partitionReplicaAssignment)
- }
+ val controllerStatus =
+ try {
+ ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString)
+ // Only the broker elected as the new controller can execute following code, otherwise
+ // some exception will be thrown.
+ registerBrokerChangeListener()
+ registerTopicChangeListener()
+ liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
+ liveBrokerIds = liveBrokers.map(_.id)
+ info("Currently active brokers in the cluster: %s".format(liveBrokerIds))
+ allTopics = ZkUtils.getAllTopics(zkClient).toSet
+ info("Current list of topics in the cluster: %s".format(allTopics))
+ allPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics.iterator)
+ info("Partition replica assignment: %s".format(allPartitionReplicaAssignment))
+ allLeaders = new mutable.HashMap[(String, Int), Int]
+ controllerChannelManager = new ControllerChannelManager(liveBrokers, config)
+ controllerChannelManager.startup()
+ true
+ } catch {
+ case e: ZkNodeExistsException =>
+ registerControllerExistsListener()
+ false
+ case e2 => throw e2
+ }
+ controllerStatus
+ }
+
+ private def controllerRegisterOrFailover() {
+ if(isRunning) {
+ if(tryToBecomeController()) {
+ readAndSendLeaderAndIsrFromZookeeper(liveBrokerIds, allTopics)
+ onBrokerChange()
+ // If there are some partition with leader not initialized, init the leader for them
+ val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(m => !allLeaders.contains(m._1))
+ debug("work on init leaders: %s, current cache for all leader is: %s".format(partitionReplicaAssignment.toString(), allLeaders))
+ initLeaders(partitionReplicaAssignment)
+ }
+ }else
+ info("Controller has been shut down, aborting startup procedure")
}
def isActive(): Boolean = {
@@ -209,20 +204,22 @@ class KafkaController(config : KafkaConf
def startup() = {
controllerLock synchronized {
+ info("Controller starting up");
registerSessionExpirationListener()
- registerControllerExistListener()
+ registerControllerExistsListener()
isRunning = true
controllerRegisterOrFailover()
+ info("Controller startup complete")
}
}
def shutdown() = {
controllerLock synchronized {
- if(controllerChannelManager != null){
- info("shut down")
+ if(controllerChannelManager != null) {
+ info("Controller shutting down")
controllerChannelManager.shutdown()
controllerChannelManager = null
- info("shutted down completely")
+ info("Controller shutdown complete")
}
isRunning = false
}
@@ -244,8 +241,8 @@ class KafkaController(config : KafkaConf
zkClient.subscribeStateChanges(new SessionExpireListener())
}
- private def registerControllerExistListener(){
- zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistListener())
+ private def registerControllerExistsListener(){
+ zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistsListener())
}
class SessionExpireListener() extends IZkStateListener with Logging {
@@ -265,199 +262,265 @@ class KafkaController(config : KafkaConf
@throws(classOf[Exception])
def handleNewSession() {
controllerLock synchronized {
- if(controllerChannelManager != null){
+ if(controllerChannelManager != null) {
info("session expires, clean up the state")
controllerChannelManager.shutdown()
controllerChannelManager = null
}
+ controllerRegisterOrFailover()
}
- controllerRegisterOrFailover()
}
}
/**
- * Used to populate the leaderAndISR from zookeeper to affected brokers when the brokers comes up
+ * @param brokerIds The set of currently active brokers in the cluster, as known to the controller
+ * @param topics The set of topics known to the controller by reading from zookeeper
+ * This API reads the list of partitions that exist for all the topics in the specified list of input topics.
+ * For each of those partitions, it reads the assigned replica list so that it can send the appropriate leader and
+ * isr state change request to all the brokers in the assigned replica list. It arranges the leader and isr state
+ * change requests by broker id. At the end, it circles through this map, sending the required INIT state change requests
+ * to each broker. This API is called when -
+ * 1. A new broker starts up
+ * 2. A new controller is elected
*/
- private def deliverLeaderAndISRFromZookeeper(brokerIds: Set[Int], topics: Set[String]) = {
- val leaderAndISRInfos = ZkUtils.getPartitionLeaderAndISRForTopics(zkClient, topics.iterator)
- val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]]
- for((topicPartition, leaderAndISR) <- leaderAndISRInfos){
- // If the leader specified in the leaderAndISR is no longer alive, there is no need to recover it
- if(allBrokerIds.contains(leaderAndISR.leader)){
- val brokersAssignedToThisPartitionOpt = allPartitionReplicaAssignment.get(topicPartition)
- if(brokersAssignedToThisPartitionOpt == None){
- warn("during leaderAndISR recovery, there's no replica assignment for partition [%s, %d] with allPartitionReplicaAssignment: %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
- } else{
- val relatedBrokersAssignedToThisPartition = brokersAssignedToThisPartitionOpt.get.filter(brokerIds.contains(_))
- relatedBrokersAssignedToThisPartition.foreach(b => {
- if(!brokerToLeaderAndISRInfosMap.contains(b))
- brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
- brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
- })
- allLeaders.put(topicPartition, leaderAndISR.leader)
- }
- } else
- debug("during leaderAndISR recovery, the leader %d is not alive any more, just ignore it".format(leaderAndISR.leader))
+ private def readAndSendLeaderAndIsrFromZookeeper(brokerIds: Set[Int], topics: Set[String]) = {
+ val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, topics.iterator)
+ val brokerToLeaderAndIsrInfoMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
+ for((topicPartition, leaderAndIsr) <- leaderAndIsrInfo) {
+ // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it
+ liveBrokerIds.contains(leaderAndIsr.leader) match {
+ case true =>
+ val brokersAssignedToThisPartitionOpt = allPartitionReplicaAssignment.get(topicPartition)
+ brokersAssignedToThisPartitionOpt match {
+ case Some(brokersAssignedToThisPartition) =>
+ val relatedBrokersAssignedToThisPartition = brokersAssignedToThisPartitionOpt.get.filter(brokerIds.contains(_))
+ relatedBrokersAssignedToThisPartition.foreach(b => {
+ brokerToLeaderAndIsrInfoMap.getOrElseUpdate(b, new mutable.HashMap[(String, Int), LeaderAndIsr])
+ brokerToLeaderAndIsrInfoMap(b).put(topicPartition, leaderAndIsr)
+ })
+ allLeaders.put(topicPartition, leaderAndIsr.leader)
+ case None => warn(("While refreshing controller's leader and isr cache, no replica assignment was found " +
+ "for partition [%s, %d]. Rest of the partition replica assignment is %s").format(topicPartition._1,
+ topicPartition._2, allPartitionReplicaAssignment))
+ }
+ case false =>
+ debug("While refreshing controller's leader and isr cache, broker %d is not alive any more, just ignore it"
+ .format(leaderAndIsr.leader))
+ }
}
- info("during leaderAndISR recovery, the broker to request map is [%s]".format(brokerToLeaderAndISRInfosMap.toString()))
+ debug(("While refreshing controller's leader and isr cache, the state change requests for each broker is " +
+ "[%s]").format(brokerToLeaderAndIsrInfoMap.toString()))
- brokerToLeaderAndISRInfosMap.foreach(m =>{
+ brokerToLeaderAndIsrInfoMap.foreach(m =>{
val broker = m._1
- val leaderAndISRs = m._2
- val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.IsInit, leaderAndISRs)
- info("during leaderAndISR recovery, the leaderAndISRRequest sent to new broker [%s] is [%s]".format(broker, leaderAndISRRequest.toString))
- sendRequest(broker, leaderAndISRRequest)
+ val leaderAndIsrs = m._2
+ val leaderAndIsrRequest = new LeaderAndIsrRequest(LeaderAndIsrRequest.IsInit, leaderAndIsrs)
+ info("After refreshing controller's leader and isr cache, the leader and ISR change state change request sent to" +
+ " new broker [%s] is [%s]".format(broker, leaderAndIsrRequest.toString))
+ sendRequest(broker, leaderAndIsrRequest)
})
-
- info("after leaderAndISR recovery for brokers %s, the leaders assignment is %s".format(brokerIds, allLeaders))
+ info("After refreshing controller's leader and isr cache for brokers %s, the leaders assignment is %s"
+ .format(brokerIds, allLeaders))
}
-
private def initLeaders(partitionReplicaAssignment: collection.mutable.Map[(String, Int), Seq[Int]]) {
- val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int),LeaderAndISR]]
+ val brokerToLeaderAndISRInfoMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int),LeaderAndIsr]]
for((topicPartition, replicaAssignment) <- partitionReplicaAssignment) {
- val liveAssignedReplicas = replicaAssignment.filter(r => allBrokerIds.contains(r))
+ val liveAssignedReplicas = replicaAssignment.filter(r => liveBrokerIds.contains(r))
debug("for topic [%s], partition [%d], live assigned replicas are: [%s]"
- .format(topicPartition._1,
- topicPartition._2,
- liveAssignedReplicas))
- if(!liveAssignedReplicas.isEmpty){
+ .format(topicPartition._1,
+ topicPartition._2,
+ liveAssignedReplicas))
+ if(!liveAssignedReplicas.isEmpty) {
debug("live assigned replica is not empty, check zkClient: %s".format(zkClient))
val leader = liveAssignedReplicas.head
- var leaderAndISR: LeaderAndISR = null
+ var leaderAndISR: LeaderAndIsr = null
var updateLeaderISRZKPathSucceeded: Boolean = false
- while(!updateLeaderISRZKPathSucceeded){
- val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2)
+ while(!updateLeaderISRZKPathSucceeded) {
+ val curLeaderAndISROpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topicPartition._1, topicPartition._2)
debug("curLeaderAndISROpt is %s, zkClient is %s ".format(curLeaderAndISROpt, zkClient))
if(curLeaderAndISROpt == None){
debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is empty".format(topicPartition._1, topicPartition._2))
- leaderAndISR = new LeaderAndISR(leader, liveAssignedReplicas.toList)
- ZkUtils.createPersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString)
+ leaderAndISR = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
+ ZkUtils.createPersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2), leaderAndISR.toString)
updateLeaderISRZKPathSucceeded = true
- } else{
- debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is not empty".format(topicPartition._1, topicPartition._2))
+ } else {
+ debug("During initializing leader of parition (%s, %d),".format(topicPartition._1, topicPartition._2) +
+ " the current leader and isr in zookeeper is not empty")
val curZkPathVersion = curLeaderAndISROpt.get.zkVersion
- leaderAndISR = new LeaderAndISR(leader, curLeaderAndISROpt.get.leaderEpoch + 1,liveAssignedReplicas.toList, curLeaderAndISROpt.get.zkVersion + 1)
- val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion)
- if(updateSucceeded){
+ leaderAndISR = new LeaderAndIsr(leader, curLeaderAndISROpt.get.leaderEpoch + 1,liveAssignedReplicas.toList,
+ curLeaderAndISROpt.get.zkVersion + 1)
+ val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+ ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2),
+ leaderAndISR.toString, curZkPathVersion)
+ if(updateSucceeded) {
leaderAndISR.zkVersion = newVersion
}
updateLeaderISRZKPathSucceeded = updateSucceeded
}
}
liveAssignedReplicas.foreach(b => {
- if(!brokerToLeaderAndISRInfosMap.contains(b))
- brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
- brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
+ if(!brokerToLeaderAndISRInfoMap.contains(b))
+ brokerToLeaderAndISRInfoMap.put(b, new mutable.HashMap[(String, Int), LeaderAndIsr])
+ brokerToLeaderAndISRInfoMap(b).put(topicPartition, leaderAndISR)
}
)
allLeaders.put(topicPartition, leaderAndISR.leader)
}
else{
- warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), allBrokerIds))
+ warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), liveBrokerIds))
}
}
- info("after leaders initialization for partition replica assignments %s, the cached leaders in controller is %s, and the broker to request map is: %s".format(partitionReplicaAssignment, allLeaders, brokerToLeaderAndISRInfosMap))
- brokerToLeaderAndISRInfosMap.foreach(m =>{
+ info("after leaders initialization for partition replica assignments %s, the cached leaders in controller is %s, and the broker to request map is: %s".format(partitionReplicaAssignment, allLeaders, brokerToLeaderAndISRInfoMap))
+ brokerToLeaderAndISRInfoMap.foreach(m =>{
val broker = m._1
val leaderAndISRs = m._2
- val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRs)
+ val leaderAndISRRequest = new LeaderAndIsrRequest(LeaderAndIsrRequest.NotInit, leaderAndISRs)
info("at initializing leaders for new partitions, the leaderAndISR request sent to broker %d is %s".format(broker, leaderAndISRRequest))
sendRequest(broker, leaderAndISRRequest)
})
}
-
- private def onBrokerChange(newBrokers: Set[Int] = null){
+ /**
+ * @param newBrokers The list of brokers that are started up. This is an optional argument that can be empty when
+ * new controller is being elected
+ * The purpose of this API is to send the leader state change request to all live replicas of partitions that
+ * currently don't have an alive leader. It first finds the partitions with dead leaders, then it looks up the list
+ * of assigned replicas for those partitions that are alive. It reads the leader and isr info for those partitions
+ * from zookeeper.
+ * It can happen that when the controller is in the middle of updating the new leader info in zookeeper,
+ * the leader changes the ISR for the partition. Due to this, the zookeeper path's version will be different than
+ * what was known to the controller. So it's new leader update will fail. The controller retries the leader election
+ * based on the new ISR until it's leader update in zookeeper succeeds.
+ * Once the write to zookeeper succeeds, it sends the leader state change request to the live assigned replicas for
+ * each affected partition.
+ */
+ private def onBrokerChange(newBrokers: Set[Int] = Set.empty[Int]) {
/** handle the new brokers, send request for them to initialize the local log **/
- if(newBrokers != null && newBrokers.size != 0)
- deliverLeaderAndISRFromZookeeper(newBrokers, allTopics)
+ if(newBrokers.size != 0)
+ readAndSendLeaderAndIsrFromZookeeper(newBrokers, allTopics)
/** handle leader election for the partitions whose leader is no longer alive **/
- val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]]
- allLeaders.foreach(m =>{
- val topicPartition = m._1
- val leader = m._2
- // We only care about the partitions, whose leader is no longer alive
- if(!allBrokerIds.contains(leader)){
- var updateLeaderISRZKPathSucceeded: Boolean = false
- while(!updateLeaderISRZKPathSucceeded){
- val assignedReplicasOpt = allPartitionReplicaAssignment.get(topicPartition)
- if(assignedReplicasOpt == None)
- throw new IllegalStateException("On broker changes, the assigned replica for [%s, %d], shouldn't be None, the general assignment is %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
- val assignedReplicas = assignedReplicasOpt.get
- val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => allBrokerIds.contains(r))
- val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2)
- if(curLeaderAndISROpt == None){
- throw new IllegalStateException("On broker change, there's no leaderAndISR information for partition (%s, %d) in zookeeper".format(topicPartition._1, topicPartition._2))
- }
- val curLeaderAndISR = curLeaderAndISROpt.get
- val leader = curLeaderAndISR.leader
- var newLeader: Int = -1
- val leaderEpoch = curLeaderAndISR.leaderEpoch
- val ISR = curLeaderAndISR.ISR
- val curZkPathVersion = curLeaderAndISR.zkVersion
- debug("leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]".format(topicPartition._1, topicPartition._2, leader, leaderEpoch, ISR, curZkPathVersion))
- // The leader is no longer alive, need reelection, we only care about the leader change here, the ISR change can be handled by the leader
- var leaderAndISR: LeaderAndISR = null
- // The ISR contains at least 1 broker in the live broker list
- val liveBrokersInISR = ISR.filter(r => allBrokerIds.contains(r))
- if(!liveBrokersInISR.isEmpty){
- newLeader = liveBrokersInISR.head
- leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch +1, liveBrokersInISR.toList, curZkPathVersion + 1)
- debug("some broker in ISR is alive, new leader and ISR is %s".format(leaderAndISR.toString()))
- } else{
- debug("live broker in ISR is empty, see live assigned replicas: %s".format(liveAssignedReplicasToThisPartition))
- if (!liveAssignedReplicasToThisPartition.isEmpty){
- newLeader = liveAssignedReplicasToThisPartition.head
- leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch + 1, List(newLeader), curZkPathVersion + 1)
- warn("on broker change, no broker in ISR is alive, new leader elected is [%s], there's potential data loss".format(newLeader))
- } else
- error("on broker change, for partition ([%s, %d]), live brokers are: [%s], assigned replicas are: [%s]; no asigned replica is alive".format(topicPartition._1, topicPartition._2, allBrokerIds, assignedReplicas))
- }
- debug("the leader and ISR converted string: [%s]".format(leaderAndISR))
- val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion)
- if(updateSucceeded){
- leaderAndISR.zkVersion = newVersion
- liveAssignedReplicasToThisPartition.foreach(b => {
- if(!brokerToLeaderAndISRInfosMap.contains(b))
- brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
- brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
- })
- allLeaders.put(topicPartition, newLeader)
- info("on broker changes, allLeader is updated to %s".format(allLeaders))
- }
- updateLeaderISRZKPathSucceeded = updateSucceeded
+ val brokerToLeaderAndIsrInfoMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
+ // retain only partitions whose leaders are not alive
+ val partitionsWithDeadLeaders = allLeaders.filter(partitionAndLeader => !liveBrokerIds.contains(partitionAndLeader._2))
+ partitionsWithDeadLeaders.foreach { partitionAndLeader =>
+ val topic = partitionAndLeader._1._1
+ val partition = partitionAndLeader._1._2
+
+ try {
+ allPartitionReplicaAssignment.get((topic, partition)) match {
+ case Some(assignedReplicas) =>
+ val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => liveBrokerIds.contains(r))
+ ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
+ case Some(currentLeaderAndIsr) =>
+ try {
+ // elect new leader or throw exception
+ val newLeaderAndIsr = electLeaderForPartition(topic, partition, currentLeaderAndIsr, assignedReplicas)
+ // store new leader and isr info in cache
+ liveAssignedReplicasToThisPartition.foreach { b =>
+ brokerToLeaderAndIsrInfoMap.getOrElseUpdate(b, new mutable.HashMap[(String, Int), LeaderAndIsr])
+ brokerToLeaderAndIsrInfoMap(b).put((topic, partition), newLeaderAndIsr)
+ }
+ }catch {
+ case e => error("Error while electing leader for partition [%s, %d]".format(topic, partition))
+ }
+ case None => throw new KafkaException(("On broker changes, " +
+ "there's no leaderAndISR information for partition (%s, %d) in zookeeper").format(topic, partition))
+ }
+ case None => throw new KafkaException(("While handling broker changes, the " +
+ "partition [%s, %d] doesn't have assigned replicas. The replica assignment for other partitions is %s")
+ .format(topic, partition, allPartitionReplicaAssignment))
}
+ }catch {
+ case e: PartitionOfflineException =>
+ error("All replicas for partition [%s, %d] are dead.".format(topic, partition) +
+ " Marking this partition offline")
}
- })
- brokerToLeaderAndISRInfosMap.foreach(m => {
+ }
+ debug("After leader election, leader cache is updated to %s".format(allLeaders))
+ brokerToLeaderAndIsrInfoMap.foreach(m => {
val broker = m._1
- val leaderAndISRInfos = m._2
- val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRInfos)
+ val leaderAndISRInfo = m._2
+ val leaderAndISRRequest = new LeaderAndIsrRequest(LeaderAndIsrRequest.NotInit, leaderAndISRInfo)
sendRequest(broker, leaderAndISRRequest)
- info("on broker change, the LeaderAndISRRequest send to brokers [%d] is [%s]".format(broker, leaderAndISRRequest))
+ info("On broker changes, the LeaderAndIsrRequest send to broker [%d] is [%s]".format(broker, leaderAndISRRequest))
})
}
+ /**
+ * @param topic The topic of the partition whose leader needs to be elected
+ * @param partition The partition whose leader needs to be elected
+ * @param currentLeaderAndIsr The leader and isr information stored for this partition in zookeeper
+ * @param assignedReplicas The list of replicas assigned to the input partition
+ * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
+ * This API selects a new leader for the input partition -
+ * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader
+ * 2. Else, it picks some alive broker from the assigned replica list as the new leader
+ * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException
+ * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
+ * TODO: If a leader cannot be elected for a partition, it should be marked offline and exposed through some metric
+ */
+ private def electLeaderForPartition(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr,
+ assignedReplicas: Seq[Int]):LeaderAndIsr = {
+ var zookeeperPathUpdateSucceeded: Boolean = false
+ var newLeaderAndIsr: LeaderAndIsr = currentLeaderAndIsr
+ while(!zookeeperPathUpdateSucceeded) {
+ val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => liveBrokerIds.contains(r))
+ val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => liveBrokerIds.contains(r))
+ val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
+ val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
+ debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]"
+ .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr,
+ currentLeaderIsrZkPathVersion))
+ newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
+ case true =>
+ debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s"
+ .format(liveAssignedReplicasToThisPartition.mkString(",")))
+ liveAssignedReplicasToThisPartition.isEmpty match {
+ case true => throw new PartitionOfflineException(("No replica for partition " +
+ "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, liveBrokerIds)) +
+ " Assigned replicas are: [%s]".format(assignedReplicas))
+ case false =>
+ val newLeader = liveAssignedReplicasToThisPartition.head
+ warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
+ "There's potential data loss")
+ new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
+ }
+ case false =>
+ val newLeader = liveBrokersInIsr.head
+ debug("Some broker in ISR is alive, picking the leader from the ISR: " + newLeader)
+ new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
+ }
+ info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
+ // update the new leadership decision in zookeeper or retry
+ val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+ ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+ newLeaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
+ newLeaderAndIsr.zkVersion = newVersion
+ zookeeperPathUpdateSucceeded = updateSucceeded
+ }
+ // update the leader cache
+ allLeaders.put((topic, partition), newLeaderAndIsr.leader)
+ newLeaderAndIsr
+ }
+
class BrokerChangeListener() extends IZkChildListener with Logging {
this.logIdent = "[Controller " + config.brokerId + "], "
- def handleChildChange(parentPath : String, javaCurChildren : java.util.List[String]) {
+ def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
controllerLock synchronized {
- info("broker change listener triggered")
- val curChildrenSeq: Seq[String] = javaCurChildren
- val curBrokerIdsSeq = curChildrenSeq.map(_.toInt)
- val curBrokerIds = curBrokerIdsSeq.toSet
- val addedBrokerIds = curBrokerIds -- allBrokerIds
- val addedBrokersSeq = ZkUtils.getBrokerInfoFromIds(zkClient, addedBrokerIds.toSeq)
- val deletedBrokerIds = allBrokerIds -- curBrokerIds
- allBrokers = ZkUtils.getBrokerInfoFromIds(zkClient, curBrokerIdsSeq).toSet
- allBrokerIds = allBrokers.map(_.id)
- info("added brokers: %s, deleted brokers: %s, all brokers: %s".format(addedBrokerIds, deletedBrokerIds, allBrokerIds))
- addedBrokersSeq.foreach(controllerChannelManager.addBroker(_))
+ val curBrokerIds = currentBrokerList.map(_.toInt).toSet
+ val newBrokerIds = curBrokerIds -- liveBrokerIds
+ val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+ val deletedBrokerIds = liveBrokerIds -- curBrokerIds
+ liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+ liveBrokerIds = liveBrokers.map(_.id)
+ info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s"
+ .format(newBrokerIds.mkString(","), deletedBrokerIds.mkString(","), liveBrokerIds.mkString(",")))
+ newBrokers.foreach(controllerChannelManager.addBroker(_))
deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_))
- onBrokerChange(addedBrokerIds)
+ onBrokerChange(newBrokerIds)
}
}
}
@@ -465,7 +528,7 @@ class KafkaController(config : KafkaConf
private def handleNewTopics(topics: Set[String], partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
// get relevant partitions to this broker
val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => topics.contains(p._1._1))
- trace("handling new topics, the partition replica assignment to be handled is %s".format(partitionReplicaAssignment))
+ debug("handling new topics, the partition replica assignment to be handled is %s".format(partitionReplicaAssignment))
initLeaders(partitionReplicaAssignment)
}
@@ -479,24 +542,24 @@ class KafkaController(config : KafkaConf
}
allLeaders.remove(topicPartition)
info("after deleting topics %s, allLeader is updated to %s and the broker to stop replia request map is %s".format(topics, allLeaders, brokerToPartitionToStopReplicaMap))
- ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2))
+ ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2))
}
for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
info("handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
sendRequest(broker, stopReplicaRequest)
}
- /*TODO: kafka-330 remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
+ /* TODO: kafka-330 remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
}
class TopicChangeListener extends IZkChildListener with Logging {
this.logIdent = "[Controller " + config.brokerId + "], "
@throws(classOf[Exception])
- def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+ def handleChildChange(parentPath : String, children : java.util.List[String]) {
controllerLock synchronized {
info("topic/partition change listener fired for path " + parentPath)
- val currentChildren = JavaConversions.asBuffer(curChilds).toSet
+ val currentChildren = JavaConversions.asBuffer(children).toSet
val newTopics = currentChildren -- allTopics
val deletedTopics = allTopics -- currentChildren
val deletedPartitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => deletedTopics.contains(p._1._1))
@@ -512,7 +575,7 @@ class KafkaController(config : KafkaConf
}
}
- class ControllerExistListener extends IZkDataListener with Logging {
+ class ControllerExistsListener extends IZkDataListener with Logging {
this.logIdent = "[Controller " + config.brokerId + "], "
@throws(classOf[Exception])
@@ -523,7 +586,7 @@ class KafkaController(config : KafkaConf
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
controllerLock synchronized {
- info("the current controller failed, competes to be new controller")
+ info("Current controller failed, participating in election for a new controller")
controllerRegisterOrFailover()
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Mon Sep 10 17:09:52 2012
@@ -22,7 +22,7 @@ import org.I0Itec.zkclient.ZkClient
import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils._
import kafka.log.LogManager
-import kafka.api.{LeaderAndISRRequest, LeaderAndISR}
+import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr}
import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
object ReplicaManager {
@@ -114,7 +114,7 @@ class ReplicaManager(val config: KafkaCo
}
}
- def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndISRRequest): collection.Map[(String, Int), Short] = {
+ def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): collection.Map[(String, Int), Short] = {
info("Handling leader and isr request %s".format(leaderAndISRRequest))
val responseMap = new collection.mutable.HashMap[(String, Int), Short]
@@ -141,7 +141,7 @@ class ReplicaManager(val config: KafkaCo
* If IsInit flag is on, this means that the controller wants to treat topics not in the request
* as deleted.
*/
- if(leaderAndISRRequest.isInit == LeaderAndISRRequest.IsInit){
+ if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
startHighWaterMarksCheckPointThread
val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1)
info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
@@ -151,7 +151,7 @@ class ReplicaManager(val config: KafkaCo
responseMap
}
- private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = {
+ private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = {
info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
val partition = getOrCreatePartition(topic, partitionId)
if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, true)) {
@@ -163,7 +163,7 @@ class ReplicaManager(val config: KafkaCo
info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId))
}
- private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) {
+ private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) {
val leaderBrokerId: Int = leaderAndISR.leader
info("Starting the follower state transition to follow leader %d for topic %s partition %d"
.format(leaderBrokerId, topic, partitionId))
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Mon Sep 10 17:09:52 2012
@@ -62,18 +62,17 @@ object UpdateOffsetsInZK {
"getOffsetsBefore request")
}
- val brokerInfos = ZkUtils.getBrokerInfoFromIds(zkClient, List(broker))
- if(brokerInfos.size == 0)
- throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker))
+ ZkUtils.getBrokerInfo(zkClient, broker) match {
+ case Some(brokerInfo) =>
+ val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
+ val offsets = consumer.getOffsetsBefore(topic, partition, offsetOption, 1)
+ val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
- val brokerInfo = brokerInfos.head
- val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
- val offsets = consumer.getOffsetsBefore(topic, partition, offsetOption, 1)
- val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
-
- println("updating partition " + partition + " with new offset: " + offsets(0))
- ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offsets(0).toString)
- numParts += 1
+ println("updating partition " + partition + " with new offset: " + offsets(0))
+ ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offsets(0).toString)
+ numParts += 1
+ case None => throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker))
+ }
}
println("updated the offset for " + numParts + " partitions")
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Mon Sep 10 17:09:52 2012
@@ -26,7 +26,6 @@ import java.util.zip.CRC32
import javax.management._
import scala.collection._
import scala.collection.mutable
-import kafka.message.{NoCompressionCodec, CompressionCodec}
import org.I0Itec.zkclient.ZkClient
import java.util.{Random, Properties}
import joptsimple.{OptionSpec, OptionSet, OptionParser}