You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/01 00:51:01 UTC
svn commit: r1367811 [2/4] - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/
main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/
main/scala/kafka/network/ main/sca...
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Tue Jul 31 22:50:59 2012
@@ -16,11 +16,12 @@
*/
package kafka.server
+import kafka.common.KafkaZookeeperClient
import collection.mutable.HashMap
-import collection._
import collection.immutable.Set
import kafka.cluster.Broker
import kafka.api._
+import java.lang.Object
import kafka.network.{Receive, BlockingChannel}
import kafka.utils.{ZkUtils, Logging}
import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, BlockingQueue}
@@ -28,46 +29,36 @@ import org.I0Itec.zkclient.exception.ZkN
import java.util.concurrent.atomic.AtomicBoolean
import org.I0Itec.zkclient.{IZkStateListener, ZkClient, IZkDataListener, IZkChildListener}
import org.apache.zookeeper.Watcher.Event.KeeperState
-import collection.JavaConversions._
-import java.lang.Object
-import java.nio.channels.AsynchronousCloseException
-class RequestSendThread(val controllerId: Int,
- val toBrokerId: Int,
+class RequestSendThread(val brokerId: Int,
val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
val channel: BlockingChannel)
- extends Thread("requestSendThread-" + toBrokerId) with Logging {
- this.logIdent = "Controller %d, request send thread to broker %d, ".format(controllerId, toBrokerId)
+ extends Thread("requestSendThread-" + brokerId) with Logging {
val isRunning: AtomicBoolean = new AtomicBoolean(true)
private val shutDownLatch = new CountDownLatch(1)
- private val lock = new Object()
+ private val lock = new Object
def shutDown(): Unit = {
- info("shutting down")
+ info("Shutting down controller request send thread to broker %d".format(brokerId))
isRunning.set(false)
interrupt()
shutDownLatch.await()
- info("shutted down completed")
+ info("Controller request send thread to broker %d shutting down completed".format(brokerId))
}
override def run(): Unit = {
try{
+ info("In controller, thread for broker: " + brokerId + " started running")
while(isRunning.get()){
val queueItem = queue.take()
val request = queueItem._1
val callback = queueItem._2
var receive: Receive = null
- try{
- lock synchronized {
- channel.send(request)
- receive = channel.receive()
- }
- } catch {
- case e =>
- // log it and let it go. Let controller shut it down.
- debug("Exception occurs", e)
+ lock synchronized {
+ channel.send(request)
+ receive = channel.receive()
}
var response: RequestOrResponse = null
@@ -77,15 +68,13 @@ class RequestSendThread(val controllerId
case RequestKeys.StopReplicaRequest =>
response = StopReplicaResponse.readFrom(receive.buffer)
}
- trace("got a response %s".format(controllerId, response, toBrokerId))
-
if(callback != null){
callback(response)
}
}
} catch{
- case e: InterruptedException => warn("intterrupted. Shutting down")
- case e1 => error("Error due to ", e1)
+ case e: InterruptedException => warn("Controller request send thread to broker %d is intterrupted. Shutting down".format(brokerId))
+ case e1 => error("Error in controller request send thread to broker %d down due to ".format(brokerId), e1)
}
shutDownLatch.countDown()
}
@@ -96,10 +85,9 @@ class ControllerChannelManager(allBroker
private val messageChannels = new HashMap[Int, BlockingChannel]
private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]]
private val messageThreads = new HashMap[Int, RequestSendThread]
- this.logIdent = "Channel manager on controller " + config.brokerId + ", "
for(broker <- allBrokers){
brokers.put(broker.id, broker)
- info("channel to broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
+ info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
val channel = new BlockingChannel(broker.host, broker.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
@@ -111,7 +99,7 @@ class ControllerChannelManager(allBroker
def startUp() = {
for((brokerId, broker) <- brokers){
- val thread = new RequestSendThread(config.brokerId, brokerId, messageQueues(brokerId), messageChannels(brokerId))
+ val thread = new RequestSendThread(brokerId, messageQueues(brokerId), messageChannels(brokerId))
thread.setDaemon(false)
thread.start()
messageThreads.put(broker.id, thread)
@@ -131,13 +119,14 @@ class ControllerChannelManager(allBroker
def addBroker(broker: Broker){
brokers.put(broker.id, broker)
messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
+ info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
val channel = new BlockingChannel(broker.host, broker.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
channel.connect()
messageChannels.put(broker.id, channel)
- val thread = new RequestSendThread(config.brokerId, broker.id, messageQueues(broker.id), messageChannels(broker.id))
+ val thread = new RequestSendThread(broker.id, messageQueues(broker.id), messageChannels(broker.id))
thread.setDaemon(false)
thread.start()
messageThreads.put(broker.id, thread)
@@ -157,62 +146,38 @@ class ControllerChannelManager(allBroker
}
}
-class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging {
- this.logIdent = "Controller " + config.brokerId + ", "
- info("startup");
- private val controllerLock = new Object
+class KafkaController(config : KafkaConfig) extends Logging {
+ info("controller startup");
+ private val lock = new Object
+
+ private var zkClient: ZkClient = null
private var controllerChannelManager: ControllerChannelManager = null
private var allBrokers : Set[Broker] = null
- private var allBrokerIds : Set[Int] = null
private var allTopics: Set[String] = null
- private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null
- private var allLeaders: mutable.Map[(String, Int), Int] = null
- // Return true if this controller succeeds in the controller competition
- private def tryToBecomeController(): Boolean = {
- try {
- ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString)
- // Only the broker successfully registering as the controller can execute following code, otherwise
- // some exception will be thrown.
- registerBrokerChangeListener()
- registerTopicChangeListener()
- allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
- allBrokerIds = allBrokers.map(_.id)
- info("all brokers: %s".format(allBrokerIds))
- allTopics = ZkUtils.getAllTopics(zkClient).toSet
- info("all topics: %s".format(allTopics))
- allPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics.iterator)
- info("allPartitionReplicaAssignment: %s".format(allPartitionReplicaAssignment))
- allLeaders = new mutable.HashMap[(String, Int), Int]
- controllerChannelManager = new ControllerChannelManager(allBrokers, config)
- controllerChannelManager.startUp()
- return true
- } catch {
- case e: ZkNodeExistsException =>
- registerControllerExistListener()
- info("broker didn't succeed registering as the controller since it's taken by someone else")
- return false
- case e2 => throw e2
- }
- }
-
- private def controllerRegisterOrFailover(){
- info("try to become controller")
- if(tryToBecomeController() == true){
- info("won the controller competition and work on leader and isr recovery")
- deliverLeaderAndISRFromZookeeper(allBrokerIds, allTopics)
- debug("work on broker changes")
- onBrokerChange()
-
- // If there are some partition with leader not initialized, init the leader for them
- val partitionReplicaAssignment = allPartitionReplicaAssignment.clone()
- for((topicPartition, replicas) <- partitionReplicaAssignment){
- if (allLeaders.contains(topicPartition)){
- partitionReplicaAssignment.remove(topicPartition)
+ private def tryToBecomeController() = {
+ lock synchronized {
+ val curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+ if (curController == null){
+ try {
+ ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString())
+
+ // Only the broker successfully registering as the controller can execute following code, otherwise
+ // some exception will be thrown.
+ registerBrokerChangeListener()
+ registerTopicChangeListener()
+ allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
+ allTopics = ZkUtils.getAllTopics(zkClient).toSet
+ controllerChannelManager = new ControllerChannelManager(allBrokers, config)
+ controllerChannelManager.startUp()
+ } catch {
+ case e: ZkNodeExistsException =>
+ registerControllerExistListener()
+ info("Broker " + config.brokerId + " didn't succeed registering as the controller since it's taken by someone else")
+ case e2 => throw e2
}
}
- debug("work on init leaders: %s, current cache for all leader is: %s".format(partitionReplicaAssignment.toString(), allLeaders))
- initLeaders(partitionReplicaAssignment)
+ else info("Broker " + config.brokerId + " see not null skip " + " current controller " + curController)
}
}
@@ -221,22 +186,17 @@ class KafkaController(config : KafkaConf
}
def startup() = {
- controllerLock synchronized {
- registerSessionExpirationListener()
- registerControllerExistListener()
- controllerRegisterOrFailover()
- }
+ zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+ registerSessionExpirationListener()
+ registerControllerExistListener()
+ tryToBecomeController()
}
def shutDown() = {
- controllerLock synchronized {
- if(controllerChannelManager != null){
- info("shut down")
- controllerChannelManager.shutDown()
- controllerChannelManager = null
- info("shutted down completely")
- }
- }
+ if(controllerChannelManager != null)
+ controllerChannelManager.shutDown()
+ if(zkClient != null)
+ zkClient.close()
}
def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
@@ -259,8 +219,7 @@ class KafkaController(config : KafkaConf
zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistListener())
}
- class SessionExpireListener() extends IZkStateListener with Logging {
- this.logIdent = "Controller " + config.brokerId + ", "
+ class SessionExpireListener() extends IZkStateListener {
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
@@ -275,256 +234,50 @@ class KafkaController(config : KafkaConf
*/
@throws(classOf[Exception])
def handleNewSession() {
- controllerLock synchronized {
- info("session expires, clean up the state")
- controllerChannelManager.shutDown()
- controllerChannelManager = null
- controllerRegisterOrFailover()
- }
- }
- }
-
- /**
- * Used to populate the leaderAndISR from zookeeper to affected brokers when the brokers comes up
- */
- private def deliverLeaderAndISRFromZookeeper(brokerIds: Set[Int], topics: Set[String]) = {
- val leaderAndISRInfos = ZkUtils.getPartitionLeaderAndISRForTopics(zkClient, topics.iterator)
- val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]]
- for((topicPartition, leaderAndISR) <- leaderAndISRInfos){
- // If the leader specified in the leaderAndISR is no longer alive, there is no need to recover it
- if(allBrokerIds.contains(leaderAndISR.leader)){
- val brokersAssignedToThisPartitionOpt = allPartitionReplicaAssignment.get(topicPartition)
- if(brokersAssignedToThisPartitionOpt == None){
- warn("during leaderAndISR recovery, there's no replica assignment for partition [%s, %d] with allPartitionReplicaAssignment: %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
- } else{
- val relatedBrokersAssignedToThisPartition = brokersAssignedToThisPartitionOpt.get.filter(brokerIds.contains(_))
- relatedBrokersAssignedToThisPartition.foreach(b => {
- if(!brokerToLeaderAndISRInfosMap.contains(b))
- brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
- brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
- })
- allLeaders.put(topicPartition, leaderAndISR.leader)
- }
- } else
- debug("during leaderAndISR recovery, the leader %d is not alive any more, just ignore it".format(leaderAndISR.leader))
- }
- info("during leaderAndISR recovery, the broker to request map is [%s]".format(brokerToLeaderAndISRInfosMap.toString()))
-
- brokerToLeaderAndISRInfosMap.foreach(m =>{
- val broker = m._1
- val leaderAndISRs = m._2
- val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.IsInit, leaderAndISRs)
- info("during leaderAndISR recovery, the leaderAndISRRequest sent to new broker [%s] is [%s]".format(broker, leaderAndISRRequest.toString))
- sendRequest(broker, leaderAndISRRequest)
- })
-
- info("after leaderAndISR recovery for brokers %s, the leaders assignment is %s".format(brokerIds, allLeaders))
- }
-
-
- private def initLeaders(partitionReplicaAssignment: collection.mutable.Map[(String, Int), Seq[Int]]) {
- val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int),LeaderAndISR]]
- for((topicPartition, replicaAssignment) <- partitionReplicaAssignment) {
- val liveAssignedReplicas = replicaAssignment.filter(r => allBrokerIds.contains(r))
- debug("for topic [%s], partition [%d], live assigned replicas are: [%s]"
- .format(topicPartition._1,
- topicPartition._2,
- liveAssignedReplicas))
- if(!liveAssignedReplicas.isEmpty){
- debug("live assigned replica is not empty, check zkClient: %s".format(zkClient))
- val leader = liveAssignedReplicas.head
- var leaderAndISR: LeaderAndISR = null
- var updateLeaderISRZKPathSucceeded: Boolean = false
- while(!updateLeaderISRZKPathSucceeded){
- val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2)
- debug("curLeaderAndISROpt is %s, zkClient is %s ".format(curLeaderAndISROpt, zkClient))
- if(curLeaderAndISROpt == None){
- debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is empty".format(topicPartition._1, topicPartition._2))
- leaderAndISR = new LeaderAndISR(leader, liveAssignedReplicas.toList)
- ZkUtils.createPersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString)
- updateLeaderISRZKPathSucceeded = true
- } else{
- debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is not empty".format(topicPartition._1, topicPartition._2))
- val curZkPathVersion = curLeaderAndISROpt.get.zkVersion
- leaderAndISR = new LeaderAndISR(leader, curLeaderAndISROpt.get.leaderEpoch + 1,liveAssignedReplicas.toList, curLeaderAndISROpt.get.zkVersion + 1)
- val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion)
- if(updateSucceeded){
- leaderAndISR.zkVersion = newVersion
- }
- updateLeaderISRZKPathSucceeded = updateSucceeded
- }
- }
- liveAssignedReplicas.foreach(b => {
- if(!brokerToLeaderAndISRInfosMap.contains(b))
- brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
- brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
- }
- )
- allLeaders.put(topicPartition, leaderAndISR.leader)
- }
- else{
- warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment, allBrokerIds))
- }
+ info("Controller session expires, clean up the state, current controller: " + config.brokerId)
+ controllerChannelManager.shutDown()
+ controllerChannelManager = null
+ info("Controller session expires, the channel manager shut downr: " + config.brokerId)
+ tryToBecomeController()
}
-
- info("after leaders initialization for partition replica assignments %s, the cached leaders in controller is %s, and the broker to request map is: %s".format(partitionReplicaAssignment, allLeaders, brokerToLeaderAndISRInfosMap))
- brokerToLeaderAndISRInfosMap.foreach(m =>{
- val broker = m._1
- val leaderAndISRs = m._2
- val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRs)
- info("at initializing leaders for new partitions, the leaderAndISR request sent to broker %d is %s".format(broker, leaderAndISRRequest))
- sendRequest(broker, leaderAndISRRequest)
- })
- }
-
-
- private def onBrokerChange(newBrokers: Set[Int] = null){
- /** handle the new brokers, send request for them to initialize the local log **/
- if(newBrokers != null)
- deliverLeaderAndISRFromZookeeper(newBrokers, allTopics)
-
- /** handle leader election for the partitions whose leader is no longer alive **/
- val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]]
- allLeaders.foreach(m =>{
- val topicPartition = m._1
- val leader = m._2
- // We only care about the partitions, whose leader is no longer alive
- if(!allBrokerIds.contains(leader)){
- var updateLeaderISRZKPathSucceeded: Boolean = false
- while(!updateLeaderISRZKPathSucceeded){
- val assignedReplicasOpt = allPartitionReplicaAssignment.get(topicPartition)
- if(assignedReplicasOpt == None)
- throw new IllegalStateException("On broker changes, the assigned replica for [%s, %d], shouldn't be None, the general assignment is %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
- val assignedReplicas = assignedReplicasOpt.get
- val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => allBrokerIds.contains(r))
- val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2)
- if(curLeaderAndISROpt == None){
- throw new IllegalStateException("On broker change, there's no leaderAndISR information for partition (%s, %d) in zookeeper".format(topicPartition._1, topicPartition._2))
- }
- val curLeaderAndISR = curLeaderAndISROpt.get
- val leader = curLeaderAndISR.leader
- var newLeader: Int = -1
- val leaderEpoch = curLeaderAndISR.leaderEpoch
- val ISR = curLeaderAndISR.ISR
- val curZkPathVersion = curLeaderAndISR.zkVersion
- debug("leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]".format(topicPartition._1, topicPartition._2, leader, leaderEpoch, ISR, curZkPathVersion))
- // The leader is no longer alive, need reelection, we only care about the leader change here, the ISR change can be handled by the leader
- var leaderAndISR: LeaderAndISR = null
- // The ISR contains at least 1 broker in the live broker list
- val liveBrokersInISR = ISR.filter(r => allBrokerIds.contains(r))
- if(!liveBrokersInISR.isEmpty){
- newLeader = liveBrokersInISR.head
- leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch +1, liveBrokersInISR.toList, curZkPathVersion + 1)
- debug("some broker in ISR is alive, new leader and ISR is %s".format(leaderAndISR.toString()))
- } else{
- debug("live broker in ISR is empty, see live assigned replicas: %s".format(liveAssignedReplicasToThisPartition))
- if (!liveAssignedReplicasToThisPartition.isEmpty){
- newLeader = liveAssignedReplicasToThisPartition.head
- leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch + 1, List(newLeader), curZkPathVersion + 1)
- warn("on broker change, no broker in ISR is alive, new leader elected is [%s], there's potential data loss".format(newLeader))
- } else
- error("on broker change, for partition ([%s, %d]), live brokers are: [%s], assigned replicas are: [%s]; no asigned replica is alive".format(topicPartition._1, topicPartition._2, allBrokerIds, assignedReplicas))
- }
- debug("the leader and ISR converted string: [%s]".format(leaderAndISR))
- val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion)
- if(updateSucceeded){
- leaderAndISR.zkVersion = newVersion
- liveAssignedReplicasToThisPartition.foreach(b => {
- if(!brokerToLeaderAndISRInfosMap.contains(b))
- brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
- brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
- })
- allLeaders.put(topicPartition, newLeader)
- info("on broker changes, allLeader is updated to %s".format(allLeaders))
- }
- updateLeaderISRZKPathSucceeded = updateSucceeded
- }
- }
- })
- trace("after acting on broker change, the broker to leaderAndISR request map is".format(brokerToLeaderAndISRInfosMap))
- brokerToLeaderAndISRInfosMap.foreach(m => {
- val broker = m._1
- val leaderAndISRInfos = m._2
- val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRInfos)
- sendRequest(broker, leaderAndISRRequest)
- info("on broker change, the LeaderAndISRRequest send to brokers [%d] is [%s]".format(leaderAndISRRequest, broker))
- })
}
class BrokerChangeListener() extends IZkChildListener with Logging {
- this.logIdent = "Controller " + config.brokerId + ", "
def handleChildChange(parentPath : String, javaCurChildren : java.util.List[String]) {
- controllerLock synchronized {
- info("broker change listener triggered")
+ import scala.collection.JavaConversions._
+ lock synchronized {
+ info("Broker change listener at controller triggerred")
+ val allBrokerIds = allBrokers.map(_.id)
val curChildrenSeq: Seq[String] = javaCurChildren
val curBrokerIdsSeq = curChildrenSeq.map(_.toInt)
val curBrokerIds = curBrokerIdsSeq.toSet
val addedBrokerIds = curBrokerIds -- allBrokerIds
val addedBrokersSeq = ZkUtils.getBrokerInfoFromIds(zkClient, addedBrokerIds.toSeq)
+ info("Added brokers: " + addedBrokerIds.toString())
val deletedBrokerIds = allBrokerIds -- curBrokerIds
- allBrokers = ZkUtils.getBrokerInfoFromIds(zkClient, curBrokerIdsSeq).toSet
- allBrokerIds = allBrokers.map(_.id)
- info("added brokers: %s, deleted brokers: %s, all brokers: %s".format(addedBrokerIds, deletedBrokerIds, allBrokerIds))
- addedBrokersSeq.foreach(controllerChannelManager.addBroker(_))
- deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_))
- onBrokerChange(addedBrokerIds)
- }
- }
- }
+ info("Deleted brokers: " + deletedBrokerIds.toString())
- private def handleNewTopics(topics: Set[String], partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
- // get relevant partitions to this broker
- val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => topics.contains(p._1._1))
- trace("handling new topics, the partition replica assignment to be handled is %s".format(partitionReplicaAssignment))
- initLeaders(partitionReplicaAssignment)
- }
+ allBrokers = ZkUtils.getBrokerInfoFromIds(zkClient, curBrokerIdsSeq).toSet
- private def handleDeletedTopics(topics: Set[String], partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
- val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
- for((topicPartition, brokers) <- partitionReplicaAssignment){
- for (broker <- brokers){
- if (!brokerToPartitionToStopReplicaMap.contains(broker))
- brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
- brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
+ for(broker <- addedBrokersSeq){
+ controllerChannelManager.addBroker(broker)
+ }
+ for (brokerId <- deletedBrokerIds){
+ controllerChannelManager.removeBroker(brokerId)
+ }
+ /** TODO: add other broker change handler logic**/
}
- allLeaders.remove(topicPartition)
- info("after deleting topics %s, allLeader is updated to %s and the broker to stop replia request map is %s".format(topics, allLeaders, brokerToPartitionToStopReplicaMap))
- ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2))
- }
- for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
- val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
- info("handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
- sendRequest(broker, stopReplicaRequest)
}
- /*TODO: kafka-330 remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
}
class TopicChangeListener extends IZkChildListener with Logging {
- this.logIdent = "Controller " + config.brokerId + ", "
-
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
- controllerLock synchronized {
- info("topic/partition change listener fired for path " + parentPath)
- val currentChildren = JavaConversions.asBuffer(curChilds).toSet
- val newTopics = currentChildren -- allTopics
- val deletedTopics = allTopics -- currentChildren
- val deletedPartitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => deletedTopics.contains(p._1._1))
- allTopics = currentChildren
-
- val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.iterator)
- allPartitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1._1))
- allPartitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
- info("new topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics, deletedTopics, allPartitionReplicaAssignment))
- handleNewTopics(newTopics, addedPartitionReplicaAssignment)
- handleDeletedTopics(deletedTopics, deletedPartitionReplicaAssignment)
- }
+ // TODO: Incomplete, do not need to review this time
}
}
class ControllerExistListener extends IZkDataListener with Logging {
- this.logIdent = "Controller " + config.brokerId + ", "
-
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
// do nothing, since No logic is needed here
@@ -532,10 +285,8 @@ class KafkaController(config : KafkaConf
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
- controllerLock synchronized {
- info("the current controller failed, competes to be new controller")
- controllerRegisterOrFailover()
- }
+ info("Controller fail over, broker " + config.brokerId + " try to become controller")
+ tryToBecomeController()
}
}
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Tue Jul 31 22:50:59 2012
@@ -24,44 +24,41 @@ import java.util.concurrent.atomic.Atomi
/**
* A thread that answers kafka requests.
*/
-class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
- this.logIdent = "Kafka Request Handler " + id + " on Broker " + brokerId + ", "
-
+class KafkaRequestHandler(val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
+
def run() {
while(true) {
val req = requestChannel.receiveRequest()
- if(req == RequestChannel.AllDone){
- trace("receives shut down command, shut down".format(brokerId, id))
+ trace("Processor " + Thread.currentThread.getName + " got request " + req)
+ if(req == RequestChannel.AllDone)
return
- }
- debug("handles request " + req)
apis.handle(req)
}
}
def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
+
}
-class KafkaRequestHandlerPool(val brokerId: Int,
- val requestChannel: RequestChannel,
- val apis: KafkaApis,
+class KafkaRequestHandlerPool(val requestChannel: RequestChannel,
+ val apis: KafkaApis,
numThreads: Int) extends Logging {
- this.logIdent = "Kafka Request Handler on Broker " + brokerId + ", "
+
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
- runnables(i) = new KafkaRequestHandler(i, brokerId, requestChannel, apis)
+ runnables(i) = new KafkaRequestHandler(requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}
def shutdown() {
- info("shutting down")
+ info("Shutting down request handlers")
for(handler <- runnables)
handler.shutdown
for(thread <- threads)
thread.join
- info("shutted down completely")
+ info("Request handlers shut down")
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Tue Jul 31 22:50:59 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -24,9 +24,8 @@ import kafka.utils._
import java.util.concurrent._
import atomic.AtomicBoolean
import kafka.cluster.Replica
-import kafka.api.LeaderAndISR
-import scala.collection._
import org.I0Itec.zkclient.ZkClient
+import kafka.common.KafkaZookeeperClient
/**
@@ -34,7 +33,7 @@ import org.I0Itec.zkclient.ZkClient
* to start up and shutdown a single Kafka node.
*/
class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
- this.logIdent = "Kafka Server " + config.brokerId + ", "
+
val CleanShutdownFile = ".kafka_cleanshutdown"
private var isShuttingDown = new AtomicBoolean(false)
private var shutdownLatch = new CountDownLatch(1)
@@ -45,7 +44,7 @@ class KafkaServer(val config: KafkaConfi
var kafkaZookeeper: KafkaZooKeeper = null
var replicaManager: ReplicaManager = null
private var apis: KafkaApis = null
- var kafkaController: KafkaController = null
+ var kafkaController: KafkaController = new KafkaController(config)
val kafkaScheduler = new KafkaScheduler(4)
var zkClient: ZkClient = null
@@ -54,7 +53,7 @@ class KafkaServer(val config: KafkaConfi
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
- info("starting")
+ info("Starting Kafka server..." + config.brokerId)
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)
var needRecovery = true
@@ -63,10 +62,11 @@ class KafkaServer(val config: KafkaConfi
needRecovery = false
cleanShutDownFile.delete
}
-
+ /* start client */
+ info("Connecting to ZK: " + config.zkConnect)
+ zkClient = KafkaZookeeperClient.getZookeeperClient(config)
/* start scheduler */
kafkaScheduler.startUp
-
/* start log manager */
logManager = new LogManager(config,
kafkaScheduler,
@@ -75,107 +75,88 @@ class KafkaServer(val config: KafkaConfi
1000L * 60 * 60 * config.logRetentionHours,
needRecovery)
logManager.startup()
-
- socketServer = new SocketServer(config.brokerId,
- config.port,
+
+ socketServer = new SocketServer(config.port,
config.numNetworkThreads,
config.monitoringPeriodSecs,
config.numQueuedRequests,
config.maxSocketRequestSize)
-
- socketServer.startup
-
Utils.registerMBean(socketServer.stats, statsMBeanName)
- /* start client */
- kafkaZookeeper = new KafkaZooKeeper(config)
- // starting relevant replicas and leader election for partitions assigned to this broker
- kafkaZookeeper.startup
+ kafkaZookeeper = new KafkaZooKeeper(config, zkClient, addReplica, getReplica, makeLeader, makeFollower)
- info("Connecting to ZK: " + config.zkConnect)
+ replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler)
- replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, deleteLog)
+ apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
+ requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
+ socketServer.startup()
- kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
- apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper,
- addReplica, stopReplica, makeLeader, makeFollower, config.brokerId)
- requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
Mx4jLoader.maybeLoad
- /**
- * Registers this broker in ZK. After this, consumers can connect to broker.
- * So this should happen after socket server start.
- */
+ // starting relevant replicas and leader election for partitions assigned to this broker
+ kafkaZookeeper.startup()
// start the replica manager
replicaManager.startup()
// start the controller
kafkaController.startup()
- info("started")
- }
-
+ info("Server started.")
+ }
+
/**
* Shutdown API for shutting down a single instance of the Kafka server.
* Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread
*/
def shutdown() {
- info("shutting down")
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
- if(requestHandlerPool != null)
- requestHandlerPool.shutdown()
+ info("Shutting down Kafka server with id " + config.brokerId)
kafkaScheduler.shutdown()
apis.close()
- kafkaZookeeper.shutdown()
if(replicaManager != null)
replicaManager.shutdown()
if (socketServer != null)
socketServer.shutdown()
+ if(requestHandlerPool != null)
+ requestHandlerPool.shutdown()
Utils.unregisterMBean(statsMBeanName)
if(logManager != null)
logManager.shutdown()
-
if(kafkaController != null)
kafkaController.shutDown()
-
+ kafkaZookeeper.shutdown()
+ zkClient.close()
val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
- debug("creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
+ debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
cleanShutDownFile.createNewFile
shutdownLatch.countDown()
- info("shutted down completed")
+ info("Kafka server with id %d shut down completed".format(config.brokerId))
}
}
-
+
/**
* After calling shutdown(), use this API to wait until the shutdown is complete
*/
def awaitShutdown(): Unit = shutdownLatch.await()
def addReplica(topic: String, partition: Int, assignedReplicas: Set[Int]): Replica = {
+ info("Added local replica for topic %s partition %d on broker %d".format(topic, partition, config.brokerId))
+ // get local log
val log = logManager.getOrCreateLog(topic, partition)
replicaManager.addLocalReplica(topic, partition, log, assignedReplicas)
}
- def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
- replicaManager.makeLeader(replica, leaderAndISR)
+ def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
+ replicaManager.makeLeader(replica, currentISRInZk)
}
- def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
- replicaManager.makeFollower(replica, leaderAndISR)
+ def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
+ replicaManager.makeFollower(replica, leaderBrokerId, zkClient)
}
def getReplica(topic: String, partition: Int): Option[Replica] =
replicaManager.getReplica(topic, partition)
- def stopReplica(topic: String, partition: Int): Short = {
- replicaManager.stopReplica(topic, partition)
- }
-
- def deleteLog(topic: String, partition: Int): Unit = {
- /* TODO: handle deleteLog in a better way */
- //logManager.deleteLog(topic, partition)
- }
-
def getLogManager(): LogManager = logManager
def getStats(): SocketServerStats = socketServer.stats
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Tue Jul 31 22:50:59 2012
@@ -18,29 +18,46 @@
package kafka.server
import java.net.InetAddress
+import kafka.cluster.Replica
import kafka.utils._
import org.apache.zookeeper.Watcher.Event.KeeperState
-import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
+import kafka.admin.AdminUtils
+import java.lang.Thread
+import collection.mutable.HashSet
import kafka.common._
-
/**
* Handles the server's interaction with zookeeper. The server needs to register the following paths:
* /topics/[topic]/[node_id-partition_num]
* /brokers/[0...N] --> host:port
+ *
*/
-class KafkaZooKeeper(config: KafkaConfig) extends Logging {
+class KafkaZooKeeper(config: KafkaConfig,
+ zkClient: ZkClient,
+ addReplicaCbk: (String, Int, Set[Int]) => Replica,
+ getReplicaCbk: (String, Int) => Option[Replica],
+ becomeLeader: (Replica, Seq[Int]) => Unit,
+ becomeFollower: (Replica, Int, ZkClient) => Unit) extends Logging {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
- private var zkClient: ZkClient = null
-
- def startup() {
- /* start client */
- info("connecting to ZK: " + config.zkConnect)
- zkClient = KafkaZookeeperClient.getZookeeperClient(config)
- zkClient.subscribeStateChanges(new SessionExpireListener)
- registerBrokerInZk()
- }
+ private var leaderChangeListener: LeaderChangeListener = null
+ private var topicPartitionsChangeListener: TopicChangeListener = null
+ private var stateChangeHandler: StateChangeCommandHandler = null
+
+ private val topicListenerLock = new Object
+ private val leaderChangeLock = new Object
+
+ def startup() {
+ leaderChangeListener = new LeaderChangeListener
+ topicPartitionsChangeListener = new TopicChangeListener
+ leaderChangeListener = new LeaderChangeListener
+ topicPartitionsChangeListener = new TopicChangeListener
+ startStateChangeCommandHandler()
+ zkClient.subscribeStateChanges(new SessionExpireListener)
+ registerBrokerInZk()
+ subscribeToTopicAndPartitionsChanges(true)
+ }
private def registerBrokerInZk() {
info("Registering broker " + brokerIdPath)
@@ -49,6 +66,13 @@ class KafkaZooKeeper(config: KafkaConfig
ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
}
+ private def startStateChangeCommandHandler() {
+ val stateChangeQ = new ZkQueue(zkClient, ZkUtils.getBrokerStateChangePath(config.brokerId), config.stateChangeQSize)
+ stateChangeHandler = new StateChangeCommandHandler("StateChangeCommandHandler", config, stateChangeQ,
+ ensureStateChangeCommandValidityOnThisBroker, ensureEpochValidity)
+ stateChangeHandler.start()
+ }
+
/**
* When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
* connection for us. We need to re-register this broker in the broker registry.
@@ -72,24 +96,20 @@ class KafkaZooKeeper(config: KafkaConfig
registerBrokerInZk()
info("done re-registering broker")
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
+ zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
+ val topics = ZkUtils.getAllTopics(zkClient)
+ debug("Existing topics are %s".format(topics.mkString(",")))
+ topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener))
+ handleNewTopics(topics)
}
}
def shutdown() {
- if (zkClient != null) {
- info("Closing zookeeper client...")
- zkClient.close()
- }
- }
-
- private def doesTopicExistInCluster(topic: String) : Boolean = {
- val allTopics = ZkUtils.getAllTopics(zkClient)
- trace("all topics, %s, topic %s".format(allTopics, topic))
- allTopics.contains(topic)
+ stateChangeHandler.shutdown()
}
def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
- if(!doesTopicExistInCluster(topic))
+ if(!topicPartitionsChangeListener.doesTopicExistInCluster(topic))
throw new UnknownTopicException("Topic %s doesn't exist in the cluster".format(topic))
// check if partition id is invalid
if(partition < 0)
@@ -104,7 +124,256 @@ class KafkaZooKeeper(config: KafkaConfig
}
}
- def getZookeeperClient = {
- zkClient
+ def getZookeeperClient = zkClient
+
+ def handleNewTopics(topics: Seq[String]) {
+ // get relevant partitions to this broker
+ val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
+ debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
+ for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
+ // subscribe to leader changes for these partitions
+ subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
+ // start replicas for these partitions
+ startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
+ }
+ }
+
+ def subscribeToTopicAndPartitionsChanges(startReplicas: Boolean) {
+ info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
+ zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
+ val topics = ZkUtils.getAllTopics(zkClient)
+ val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
+ debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
+ for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
+ // subscribe to leader changes for these partitions
+ subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
+
+ // start replicas for these partitions
+ if(startReplicas)
+ startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
+ }
+ }
+
+ private def subscribeToLeaderForPartitions(topic: String, partitions: Seq[Int]) {
+ partitions.foreach { partition =>
+ info("Broker %d subscribing to leader changes for topic %s partition %d".format(config.brokerId, topic, partition))
+ // register leader change listener
+ zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
+ }
+ }
+
+ private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) {
+ partitions.foreach { partition =>
+ val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
+ info("Assigned replicas list for topic %s partition %d is %s".format(topic, partition, assignedReplicas.mkString(",")))
+ if(assignedReplicas.contains(config.brokerId)) {
+ val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet)
+ startReplica(replica)
+ } else
+ warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it"
+ .format(partition, topic, config.brokerId))
+ }
+ }
+
+ private def startReplica(replica: Replica) {
+ info("Starting replica for topic %s partition %d on broker %d"
+ .format(replica.topic, replica.partition.partitionId, replica.brokerId))
+ ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
+ case Some(leader) =>
+ info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,leader))
+ // check if this broker is the leader, if not, then become follower
+ if(leader != config.brokerId)
+ becomeFollower(replica, leader, zkClient)
+ case None => // leader election
+ leaderElection(replica)
+ }
+ }
+
+ def leaderElection(replica: Replica) {
+ info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partitionId))
+ // read the AR list for replica.partition from ZK
+ val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId).map(_.toInt)
+ val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId)
+ val liveBrokers = ZkUtils.getSortedBrokerList(zkClient).map(_.toInt)
+ if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId, assignedReplicas, inSyncReplicas, liveBrokers)) {
+ info("Broker %d will participate in leader election for topic %s partition %d"
+ .format(config.brokerId, replica.topic, replica.partition.partitionId))
+ // wait for some time if it is not the preferred replica
+ try {
+ if(replica.brokerId != assignedReplicas.head) {
+ // sleep only if the preferred replica is alive
+ if(liveBrokers.contains(assignedReplicas.head)) {
+ info("Preferred replica %d for topic %s ".format(assignedReplicas.head, replica.topic) +
+ "partition %d is alive. Waiting for %d ms to allow it to become leader"
+ .format(replica.partition.partitionId, config.preferredReplicaWaitTime))
+ Thread.sleep(config.preferredReplicaWaitTime)
+ }
+ }
+ } catch {
+ case e => // ignoring
+ }
+ val newLeaderEpochAndISR = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic,
+ replica.partition.partitionId, replica.brokerId)
+ newLeaderEpochAndISR match {
+ case Some(epochAndISR) =>
+ info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic,
+ replica.partition.partitionId))
+ info("Current ISR for topic %s partition %d is %s".format(replica.topic, replica.partition.partitionId,
+ epochAndISR._2.mkString(",")))
+ becomeLeader(replica, epochAndISR._2)
+ case None =>
+ ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
+ case Some(leader) =>
+ becomeFollower(replica, leader, zkClient)
+ case None =>
+ error("Lost leader for topic %s partition %d right after leader election".format(replica.topic,
+ replica.partition.partitionId))
+ }
+ }
+ }
+ }
+
+ private def canBecomeLeader(brokerId: Int, topic: String, partition: Int, assignedReplicas: Seq[Int],
+ inSyncReplicas: Seq[Int], liveBrokers: Seq[Int]): Boolean = {
+ // TODO: raise alert, mark the partition offline if no broker in the assigned replicas list is alive
+ assert(assignedReplicas.size > 0, "There should be at least one replica in the assigned replicas list for topic " +
+ " %s partition %d".format(topic, partition))
+ inSyncReplicas.size > 0 match {
+ case true => // check if this broker is in the ISR. If yes, return true
+ inSyncReplicas.contains(brokerId) match {
+ case true =>
+ info("Broker %d can become leader since it is in the ISR %s".format(brokerId, inSyncReplicas.mkString(",")) +
+ " for topic %s partition %d".format(topic, partition))
+ true
+ case false =>
+ // check if any broker in the ISR is alive. If not, return true only if this broker is in the AR
+ val liveBrokersInISR = inSyncReplicas.filter(r => liveBrokers.contains(r))
+ liveBrokersInISR.isEmpty match {
+ case true =>
+ if(assignedReplicas.contains(brokerId)) {
+ info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
+ " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
+ .format(partition, brokerId, assignedReplicas.mkString(",")))
+ true
+ } else {
+ info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
+ " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
+ .format(partition, brokerId, assignedReplicas.mkString(",")))
+ false
+ }
+ case false =>
+ info("ISR for topic %s partition %d is %s. Out of these %s brokers are alive. Broker %d "
+ .format(topic, partition, inSyncReplicas.mkString(",")) + "cannot become leader since it doesn't exist " +
+ "in the ISR")
+ false // let one of the live brokers in the ISR become the leader
+ }
+ }
+ case false =>
+ if(assignedReplicas.contains(brokerId)) {
+ info("ISR for topic %s partition %d is empty. Broker %d can become leader since it "
+ .format(topic, partition, brokerId) + "is part of the assigned replicas list")
+ true
+ } else {
+ info("ISR for topic %s partition %d is empty. Broker %d cannot become leader since it "
+ .format(topic, partition, brokerId) + "is not part of the assigned replicas list")
+ false
+ }
+ }
+ }
+
+ class TopicChangeListener extends IZkChildListener with Logging {
+ private val allTopics = new HashSet[String]()
+ // read existing topics, if any
+ allTopics ++= ZkUtils.getAllTopics(zkClient)
+
+ @throws(classOf[Exception])
+ def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+ import collection.JavaConversions
+ topicListenerLock.synchronized {
+ debug("Topic/partition change listener fired for path " + parentPath)
+ val currentChildren = JavaConversions.asBuffer(curChilds).toSet
+ val newTopics = currentChildren -- allTopics
+ val deletedTopics = allTopics -- currentChildren
+ allTopics.clear()
+ allTopics ++= currentChildren
+
+ debug("New topics: [%s]. Deleted topics: [%s]".format(newTopics.mkString(","), deletedTopics.mkString(",")))
+ debug("Current topics in the cluster: [%s]".format(allTopics.mkString(",")))
+ handleNewTopics(newTopics.toSeq)
+ // TODO: Handle topic deletions
+ // handleDeletedTopics(deletedTopics.toSeq)
+ }
+ }
+
+ def doesTopicExistInCluster(topic: String): Boolean = {
+ topicListenerLock.synchronized {
+ allTopics.contains(topic)
+ }
+ }
+ }
+
+ private def ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand: StateChangeCommand): Boolean = {
+ // check if this broker hosts a replica for this topic and partition
+ ZkUtils.isPartitionOnBroker(zkClient, stateChangeCommand.topic, stateChangeCommand.partition, config.brokerId)
+ }
+
+ private def ensureEpochValidity(stateChangeCommand: StateChangeCommand): Boolean = {
+ // get the topic and partition that this request is meant for
+ val topic = stateChangeCommand.topic
+ val partition = stateChangeCommand.partition
+ val epoch = stateChangeCommand.epoch
+
+ val currentLeaderEpoch = ZkUtils.getEpochForPartition(zkClient, topic, partition)
+ // check if the request's epoch matches the current leader's epoch OR the admin command's epoch
+ val validEpoch = (currentLeaderEpoch == epoch) || (epoch == AdminUtils.AdminEpoch)
+ if(epoch > currentLeaderEpoch)
+ throw new IllegalStateException(("Illegal epoch state. Request's epoch %d larger than registered epoch %d for " +
+ "topic %s partition %d").format(epoch, currentLeaderEpoch, topic, partition))
+ validEpoch
+ }
+
+ class LeaderChangeListener extends IZkDataListener with Logging {
+
+ @throws(classOf[Exception])
+ def handleDataChange(dataPath: String, data: Object) {
+ // handle leader change event for path
+ val newLeaderAndEpochInfo: String = data.asInstanceOf[String]
+ val newLeader = newLeaderAndEpochInfo.split(";").head.toInt
+ val newEpoch = newLeaderAndEpochInfo.split(";").last.toInt
+ debug("Leader change listener fired on broker %d for path %s. New leader is %d. New epoch is %d".format(config.brokerId,
+ dataPath, newLeader, newEpoch))
+ val topicPartitionInfo = dataPath.split("/")
+ val topic = topicPartitionInfo.takeRight(4).head
+ val partition = topicPartitionInfo.takeRight(2).head.toInt
+ info("Updating leader change information in replica for topic %s partition %d".format(topic, partition))
+ val replica = getReplicaCbk(topic, partition).getOrElse(null)
+ assert(replica != null, "Replica for topic %s partition %d should exist on broker %d"
+ .format(topic, partition, config.brokerId))
+ replica.partition.leaderId(Some(newLeader))
+ assert(getReplicaCbk(topic, partition).get.partition.leaderId().get == newLeader, "New leader should be set correctly")
+ }
+
+ @throws(classOf[Exception])
+ def handleDataDeleted(dataPath: String) {
+ leaderChangeLock.synchronized {
+ // leader is deleted for topic partition
+ val topic = dataPath.split("/").takeRight(4).head
+ val partitionId = dataPath.split("/").takeRight(2).head.toInt
+ debug("Leader deleted listener fired for topic %s partition %d on broker %d"
+ .format(topic, partitionId, config.brokerId))
+ val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionId).map(r => r.toInt)
+ if(assignedReplicas.contains(config.brokerId)) {
+ val replica = getReplicaCbk(topic, partitionId)
+ replica match {
+ case Some(r) => leaderElection(r)
+ case None => error("No replica exists for topic %s partition %s on broker %d"
+ .format(topic, partitionId, config.brokerId))
+ }
+ }
+ }
+ }
}
}
+
+
+
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala Tue Jul 31 22:50:59 2012
@@ -20,10 +20,10 @@ package kafka.server
import kafka.cluster.Broker
class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
- extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) {
+ extends AbstractFetcherManager("ReplicaFetcherManager", brokerConfig.numReplicaFetchers) {
override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
- new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d on broker %d, ".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
+ new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(sourceBroker.id, fetcherId), sourceBroker, brokerConfig, replicaMgr)
}
def shutdown() {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Tue Jul 31 22:50:59 2012
@@ -18,88 +18,64 @@ package kafka.server
import kafka.log.Log
import kafka.cluster.{Partition, Replica}
-import collection._
+import collection.mutable
import mutable.ListBuffer
import org.I0Itec.zkclient.ZkClient
import java.util.concurrent.locks.ReentrantLock
import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
-import kafka.api.LeaderAndISR
-import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{BrokerNotExistException, KafkaException, ErrorMapping, InvalidPartitionException}
+import kafka.common.{KafkaException, InvalidPartitionException}
+class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient, kafkaScheduler: KafkaScheduler)
+ extends Logging {
-class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler, deleteLocalLog: (String, Int) => Unit) extends Logging {
-
- var allPartitions = new mutable.HashMap[(String, Int), Partition]()
+ private var allReplicas = new mutable.HashMap[(String, Int), Partition]()
private var leaderReplicas = new ListBuffer[Partition]()
private val leaderReplicaLock = new ReentrantLock()
private val replicaFetcherManager = new ReplicaFetcherManager(config, this)
- this.logIdent = "Replica Manager on Broker " + config.brokerId + ", "
-
- val hwCheckPointThreadStarted = new AtomicBoolean(false)
private val highwaterMarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
- info("Created highwatermark file %s".format(highwaterMarkCheckpoint.name))
-
- def startHighWaterMarksCheckPointThread() = {
- if(hwCheckPointThreadStarted.compareAndSet(false, true))
- kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs)
- }
+ info("Created highwatermark file %s on broker %d".format(highwaterMarkCheckpoint.name, config.brokerId))
def startup() {
// start the highwatermark checkpoint thread
+ kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0,
+ config.defaultFlushIntervalMs)
// start ISR expiration thread
kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
}
def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
- var retReplica : Replica = null
+ val localReplica = new Replica(config.brokerId, partition, topic, time,
+ Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log))
+
val replicaOpt = partition.getReplica(config.brokerId)
replicaOpt match {
case Some(replica) =>
- info("changing remote replica %s into a local replica".format(replica.toString))
+ info("Changing remote replica %s into a local replica".format(replica.toString))
replica.log match {
case None =>
replica.log = Some(log)
case Some(log) => // nothing to do since log already exists
}
- retReplica = replica
case None =>
- val localReplica = new Replica(config.brokerId, partition, topic, time,
- Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log))
partition.addReplica(localReplica)
- info("adding local replica %d for topic %s partition %s on broker %d".format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
- retReplica = localReplica
}
val assignedReplicas = assignedReplicaIds.map(partition.getReplica(_).get)
partition.assignedReplicas(Some(assignedReplicas))
// get the replica objects for the assigned replicas for this partition
- retReplica
+ info("Added local replica %d for topic %s partition %s on broker %d"
+ .format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
+ localReplica
}
- def stopReplica(topic: String, partition: Int): Short = {
- trace("handling stop replica for partition [%s, %d]".format(topic, partition))
- val errorCode = ErrorMapping.NoError
- val replica = getReplica(topic, partition)
- if(replica.isDefined){
- replicaFetcherManager.removeFetcher(topic, partition)
- deleteLocalLog(topic, partition)
- allPartitions.remove((topic, partition))
- info("after removing partition (%s, %d), the rest of allReplicas is: [%s]".format(topic, partition, allPartitions))
- }
- trace("finishes handling stop replica [%s, %d]".format(topic, partition))
- errorCode
- }
-
-
def getOrCreatePartition(topic: String, partitionId: Int, assignedReplicaIds: Set[Int]): Partition = {
- val newPartition = allPartitions.contains((topic, partitionId))
+ val newPartition = allReplicas.contains((topic, partitionId))
newPartition match {
case true => // partition exists, do nothing
- allPartitions.get((topic, partitionId)).get
+ allReplicas.get((topic, partitionId)).get
case false => // create remote replicas for each replica id in assignedReplicas
val partition = new Partition(topic, partitionId, time)
- allPartitions += (topic, partitionId) -> partition
+ allReplicas += (topic, partitionId) -> partition
(assignedReplicaIds - config.brokerId).foreach(
replicaId => addRemoteReplica(topic, partitionId, replicaId, partition))
partition
@@ -107,11 +83,12 @@ class ReplicaManager(val config: KafkaCo
}
def ensurePartitionExists(topic: String, partitionId: Int): Partition = {
- val partitionOpt = allPartitions.get((topic, partitionId))
+ val partitionOpt = allReplicas.get((topic, partitionId))
partitionOpt match {
case Some(partition) => partition
case None =>
- throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d".format(topic, partitionId, config.brokerId))
+ throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d"
+ .format(topic, partitionId, config.brokerId))
}
}
@@ -120,34 +97,32 @@ class ReplicaManager(val config: KafkaCo
val replicaAdded = partition.addReplica(remoteReplica)
if(replicaAdded)
- info("added remote replica %d for topic %s partition %s".format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId))
+ info("Added remote replica %d for topic %s partition %s on broker %d"
+ .format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId, config.brokerId))
remoteReplica
}
def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = {
- val partitionOpt = allPartitions.get((topic, partitionId))
- partitionOpt match {
- case Some(partition) =>
- partition.getReplica(replicaId)
+ val replicasOpt = allReplicas.get((topic, partitionId))
+ replicasOpt match {
+ case Some(replicas) =>
+ replicas.getReplica(replicaId)
case None =>
None
}
}
def getLeaderReplica(topic: String, partitionId: Int): Option[Replica] = {
- val replicasOpt = allPartitions.get((topic, partitionId))
+ val replicasOpt = allReplicas.get((topic, partitionId))
replicasOpt match {
case Some(replicas) =>
Some(replicas.leaderReplica())
case None =>
throw new KafkaException("Getting leader replica failed. Partition replica metadata for topic " +
- "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
+ "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
}
}
- def getPartition(topic: String, partitionId: Int): Option[Partition] =
- allPartitions.get((topic, partitionId))
-
private def updateReplicaLeo(replica: Replica, fetchOffset: Long) {
// set the replica leo
val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
@@ -162,41 +137,38 @@ class ReplicaManager(val config: KafkaCo
val newHw = allLeos.min
val oldHw = partition.leaderHW()
if(newHw > oldHw) {
+ debug("Updating leader HW for topic %s partition %d to %d".format(replica.topic, replica.partition.partitionId, newHw))
partition.leaderHW(Some(newHw))
}else
debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s".format(replica.topic,
- replica.partition.partitionId, oldHw, newHw, allLeos.mkString(",")))
+ replica.partition.partitionId, oldHw, newHw, allLeos.mkString(",")))
}
- def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
- info("becoming Leader for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId))
- info("started the leader state transition for topic %s partition %d"
- .format(replica.topic, replica.partition.partitionId))
+ def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
+ info("Broker %d started the leader state transition for topic %s partition %d"
+ .format(config.brokerId, replica.topic, replica.partition.partitionId))
try {
// read and cache the ISR
replica.partition.leaderId(Some(replica.brokerId))
- replica.partition.updateISR(leaderAndISR.ISR.toSet)
+ replica.partition.updateISR(currentISRInZk.toSet)
// stop replica fetcher thread, if any
replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
// also add this partition to the list of partitions for which the leader is the current broker
leaderReplicaLock.lock()
leaderReplicas += replica.partition
- info("completed the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId))
- ErrorMapping.NoError
+ info("Broker %d completed the leader state transition for topic %s partition %d"
+ .format(config.brokerId, replica.topic, replica.partition.partitionId))
}catch {
- case e => error("failed to complete the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId), e)
- ErrorMapping.UnknownCode
- /* TODO: add specific error code */
+ case e => error("Broker %d failed to complete the leader state transition for topic %s partition %d"
+ .format(config.brokerId, replica.topic, replica.partition.partitionId), e)
}finally {
leaderReplicaLock.unlock()
}
}
-
- def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
- val leaderBrokerId: Int = leaderAndISR.leader
- info("starting the follower state transition to follow leader %d for topic %s partition %d"
- .format(leaderBrokerId, replica.topic, replica.partition.partitionId))
+ def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
+ info("Broker %d starting the follower state transition to follow leader %d for topic %s partition %d"
+ .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
try {
// set the leader for this partition correctly on this broker
replica.partition.leaderId(Some(leaderBrokerId))
@@ -205,13 +177,13 @@ class ReplicaManager(val config: KafkaCo
log.truncateTo(replica.highWatermark())
case None =>
}
- debug("for partition [%s, %d], the leaderBroker is [%d]".format(replica.topic, replica.partition.partitionId, leaderAndISR.leader))
// get leader for this replica
val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId)
// become follower only if it is not already following the same leader
if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) {
- info("becoming follower to leader %d for topic %s partition %d".format(leaderBrokerId, replica.topic, replica.partition.partitionId))
+ info("broker %d becoming follower to leader %d for topic %s partition %d"
+ .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
// stop fetcher thread to previous leader
replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
// start fetcher thread to current leader
@@ -220,15 +192,11 @@ class ReplicaManager(val config: KafkaCo
// remove this replica's partition from the ISR expiration queue
leaderReplicaLock.lock()
leaderReplicas -= replica.partition
- info("completed the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId))
- ErrorMapping.NoError
- } catch {
- case e: BrokerNotExistException =>
- error("failed to complete the follower state transition to follow leader %d for topic %s partition %d because the leader broker does not exist in the cluster".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e)
- ErrorMapping.BrokerNotExistInZookeeperCode
- case e =>
- error("failed to complete the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e)
- ErrorMapping.UnknownCode
+ info("Broker %d completed the follower state transition to follow leader %d for topic %s partition %d"
+ .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+ }catch {
+ case e => error("Broker %d failed to complete the follower state transition to follow leader %d for topic %s partition %d"
+ .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId), e)
}finally {
leaderReplicaLock.unlock()
}
@@ -236,18 +204,21 @@ class ReplicaManager(val config: KafkaCo
private def maybeShrinkISR(): Unit = {
try {
- info("evaluating ISR list of partitions to see which replicas can be removed from the ISR")
+ info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
+ .format(config.replicaMaxLagTimeMs))
leaderReplicaLock.lock()
- leaderReplicas.foreach(partition => {
+ leaderReplicas.foreach { partition =>
+ // shrink ISR if a follower is slow or stuck
val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)
if(outOfSyncReplicas.size > 0) {
val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.size > 0)
- info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
+ info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId,
+ newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in zk and in memory
partition.updateISR(newInSyncReplicas.map(_.brokerId), Some(zkClient))
}
- })
+ }
}catch {
case e1 => error("Error in ISR expiration thread. Shutting down due to ", e1)
}finally {
@@ -262,7 +233,8 @@ class ReplicaManager(val config: KafkaCo
val leaderHW = partition.leaderHW()
replica.logEndOffset() >= leaderHW
}
- else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) + " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
+ else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) +
+ " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
}
def recordFollowerPosition(topic: String, partition: Int, replicaId: Int, offset: Long, zkClient: ZkClient) = {
@@ -296,21 +268,21 @@ class ReplicaManager(val config: KafkaCo
* Flushes the highwatermark value for all partitions to the highwatermark file
*/
private def checkpointHighwaterMarks() {
- val highwaterMarksForAllPartitions = allPartitions.map
- { partition =>
- val topic = partition._1._1
- val partitionId = partition._1._2
- val localReplicaOpt = partition._2.getReplica(config.brokerId)
- val hw = localReplicaOpt match {
- case Some(localReplica) => localReplica.highWatermark()
- case None =>
- error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) + " Replica metadata doesn't exist")
- 0L
- }
- (topic, partitionId) -> hw
- }.toMap
+ val highwaterMarksForAllPartitions = allReplicas.map { partition =>
+ val topic = partition._1._1
+ val partitionId = partition._1._2
+ val localReplicaOpt = partition._2.getReplica(config.brokerId)
+ val hw = localReplicaOpt match {
+ case Some(localReplica) => localReplica.highWatermark()
+ case None =>
+ error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) +
+ " Replica metadata doesn't exist in replica manager on broker " + config.brokerId)
+ 0L
+ }
+ (topic, partitionId) -> hw
+ }.toMap
highwaterMarkCheckpoint.write(highwaterMarksForAllPartitions)
- info("Checkpointed high watermark data: %s".format(highwaterMarksForAllPartitions))
+ info("Checkpointed highwatermarks")
}
/**
@@ -320,9 +292,8 @@ class ReplicaManager(val config: KafkaCo
def readCheckpointedHighWatermark(topic: String, partition: Int): Long = highwaterMarkCheckpoint.read(topic, partition)
def shutdown() {
- info("shut down")
replicaFetcherManager.shutdown()
checkpointHighwaterMarks()
- info("shuttedd down completely")
+ info("Replica manager shutdown on broker " + config.brokerId)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Tue Jul 31 22:50:59 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -39,7 +39,7 @@ class DelayedRequest(val keys: Seq[Any],
* request to be satisfied. For example it could be that we are waiting for user-specified number of acks on a given (topic, partition)
* to be able to respond to a request or it could be that we are waiting for a given number of bytes to accumulate on a given request
* to be able to respond to that request (in the simple case we might wait for at least one byte to avoid busy waiting).
- *
+ *
* For us the key is generally a (topic, partition) pair.
* By calling
* watch(delayedRequest)
@@ -47,27 +47,27 @@ class DelayedRequest(val keys: Seq[Any],
* val satisfied = update(key, request)
* when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this
* new request.
- *
+ *
* An implementation provides extends two helper functions
* def checkSatisfied(request: R, delayed: T): Boolean
* this function returns true if the given request (in combination with whatever previous requests have happened) satisfies the delayed
* request delayed. This method will likely also need to do whatever bookkeeping is necessary.
- *
+ *
* The second function is
* def expire(delayed: T)
* this function handles delayed requests that have hit their time limit without being satisfied.
- *
+ *
*/
-abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) extends Logging{
- this.logIdent = logPrefix
+abstract class RequestPurgatory[T <: DelayedRequest, R] {
+
/* a list of requests watching each key */
private val watchersForKey = new ConcurrentHashMap[Any, Watchers]
-
+
/* background thread expiring requests that have been waiting too long */
- private val expiredRequestReaper = new ExpiredRequestReaper(logPrefix)
+ private val expiredRequestReaper = new ExpiredRequestReaper
private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
expirationThread.start()
-
+
/**
* Add a new delayed request watching the contained keys
*/
@@ -78,7 +78,7 @@ abstract class RequestPurgatory[T <: Del
}
expiredRequestReaper.enqueue(delayedRequest)
}
-
+
/**
* Update any watchers and return a list of newly satisfied requests.
*/
@@ -89,7 +89,7 @@ abstract class RequestPurgatory[T <: Del
else
w.collectSatisfiedRequests(request)
}
-
+
private def watchersFor(key: Any): Watchers = {
var lst = watchersForKey.get(key)
if(lst == null) {
@@ -98,46 +98,46 @@ abstract class RequestPurgatory[T <: Del
}
lst
}
-
+
/**
* Check if this request satisfied this delayed request
*/
protected def checkSatisfied(request: R, delayed: T): Boolean
-
+
/**
* Handle an expired delayed request
*/
protected def expire(delayed: T)
-
+
/**
* Shutdown the expirey thread
*/
def shutdown() {
expiredRequestReaper.shutdown()
}
-
+
/**
* A linked list of DelayedRequests watching some key with some associated bookeeping logic
*/
private class Watchers {
-
+
/* a few magic parameters to help do cleanup to avoid accumulating old watchers */
private val CleanupThresholdSize = 100
private val CleanupThresholdPrct = 0.5
-
+
private val requests = new LinkedList[T]
-
+
/* you can only change this if you have added something or marked something satisfied */
var liveCount = 0.0
-
+
def add(t: T) {
synchronized {
- requests.add(t)
- liveCount += 1
- maybePurge()
- }
+ requests.add(t)
+ liveCount += 1
+ maybePurge()
+ }
}
-
+
private def maybePurge() {
if(requests.size > CleanupThresholdSize && liveCount / requests.size < CleanupThresholdPrct) {
val iter = requests.iterator()
@@ -148,56 +148,55 @@ abstract class RequestPurgatory[T <: Del
}
}
}
-
+
def decLiveCount() {
synchronized {
- liveCount -= 1
- }
+ liveCount -= 1
+ }
}
-
+
def collectSatisfiedRequests(request: R): Seq[T] = {
val response = new mutable.ArrayBuffer[T]
synchronized {
- val iter = requests.iterator()
- while(iter.hasNext) {
- val curr = iter.next
- if(curr.satisfied.get) {
- // another thread has satisfied this request, remove it
- iter.remove()
- } else {
- if(checkSatisfied(request, curr)) {
- iter.remove()
- val updated = curr.satisfied.compareAndSet(false, true)
- if(updated == true) {
- response += curr
- liveCount -= 1
- expiredRequestReaper.satisfyRequest()
- }
- }
- }
- }
- }
+ val iter = requests.iterator()
+ while(iter.hasNext) {
+ val curr = iter.next
+ if(curr.satisfied.get) {
+ // another thread has satisfied this request, remove it
+ iter.remove()
+ } else {
+ if(checkSatisfied(request, curr)) {
+ iter.remove()
+ val updated = curr.satisfied.compareAndSet(false, true)
+ if(updated == true) {
+ response += curr
+ liveCount -= 1
+ expiredRequestReaper.satisfyRequest()
+ }
+ }
+ }
+ }
+ }
response
}
}
-
+
/**
* Runnable to expire requests that have sat unfullfilled past their deadline
*/
- private class ExpiredRequestReaper(logPrefix: String) extends Runnable with Logging {
- this.logIdent = "ExpiredRequestReaper for " + logPrefix
-
+ private class ExpiredRequestReaper extends Runnable with Logging {
+
/* a few magic parameters to help do cleanup to avoid accumulating old watchers */
private val CleanupThresholdSize = 100
private val CleanupThresholdPrct = 0.5
-
+
private val delayed = new DelayQueue[T]
private val running = new AtomicBoolean(true)
private val shutdownLatch = new CountDownLatch(1)
private val needsPurge = new AtomicBoolean(false)
/* The count of elements in the delay queue that are unsatisfied */
private val unsatisfied = new AtomicInteger(0)
-
+
/** Main loop for the expiry thread */
def run() {
while(running.get) {
@@ -205,18 +204,18 @@ abstract class RequestPurgatory[T <: Del
val curr = pollExpired()
expire(curr)
} catch {
- case ie: InterruptedException =>
+ case ie: InterruptedException =>
if(needsPurge.getAndSet(false)) {
val purged = purgeSatisfied()
debug("Forced purge of " + purged + " requests from delay queue.")
}
- case e: Exception =>
+ case e: Exception =>
error("Error in long poll expiry thread: ", e)
}
}
shutdownLatch.countDown()
}
-
+
/** Add a request to be expired */
def enqueue(t: T) {
delayed.add(t)
@@ -224,24 +223,23 @@ abstract class RequestPurgatory[T <: Del
if(unsatisfied.get > CleanupThresholdSize && unsatisfied.get / delayed.size.toDouble < CleanupThresholdPrct)
forcePurge()
}
-
+
private def forcePurge() {
needsPurge.set(true)
expirationThread.interrupt()
}
-
+
/** Shutdown the expiry thread*/
def shutdown() {
- debug("shutting down")
+ debug("Shutting down request expiry thread")
running.set(false)
expirationThread.interrupt()
shutdownLatch.await()
- debug("shut down completely")
}
-
+
/** Record the fact that we satisfied a request in the stats for the expiry queue */
def satisfyRequest(): Unit = unsatisfied.getAndDecrement()
-
+
/**
* Get the next expired event
*/
@@ -258,7 +256,7 @@ abstract class RequestPurgatory[T <: Del
}
throw new RuntimeException("This should not happen")
}
-
+
/**
* Delete all expired events from the delay queue
*/
@@ -275,5 +273,5 @@ abstract class RequestPurgatory[T <: Del
purged
}
}
-
+
}
\ No newline at end of file