You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/01 18:14:01 UTC
svn commit: r1368092 [2/4] - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/
main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/
main/scala/kafka/network/ main/sca...
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Wed Aug 1 16:13:59 2012
@@ -16,12 +16,11 @@
*/
package kafka.server
-import kafka.common.KafkaZookeeperClient
import collection.mutable.HashMap
+import collection._
import collection.immutable.Set
import kafka.cluster.Broker
import kafka.api._
-import java.lang.Object
import kafka.network.{Receive, BlockingChannel}
import kafka.utils.{ZkUtils, Logging}
import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, BlockingQueue}
@@ -29,36 +28,44 @@ import org.I0Itec.zkclient.exception.ZkN
import java.util.concurrent.atomic.AtomicBoolean
import org.I0Itec.zkclient.{IZkStateListener, ZkClient, IZkDataListener, IZkChildListener}
import org.apache.zookeeper.Watcher.Event.KeeperState
+import collection.JavaConversions._
+import java.lang.Object
-
-class RequestSendThread(val brokerId: Int,
+class RequestSendThread(val controllerId: Int,
+ val toBrokerId: Int,
val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
val channel: BlockingChannel)
- extends Thread("requestSendThread-" + brokerId) with Logging {
+ extends Thread("requestSendThread-" + toBrokerId) with Logging {
+ this.logIdent = "Controller %d, request send thread to broker %d, ".format(controllerId, toBrokerId)
val isRunning: AtomicBoolean = new AtomicBoolean(true)
private val shutDownLatch = new CountDownLatch(1)
- private val lock = new Object
+ private val lock = new Object()
def shutDown(): Unit = {
- info("Shutting down controller request send thread to broker %d".format(brokerId))
+ info("shutting down")
isRunning.set(false)
interrupt()
shutDownLatch.await()
- info("Controller request send thread to broker %d shutting down completed".format(brokerId))
+ info("shutted down completed")
}
override def run(): Unit = {
try{
- info("In controller, thread for broker: " + brokerId + " started running")
while(isRunning.get()){
val queueItem = queue.take()
val request = queueItem._1
val callback = queueItem._2
var receive: Receive = null
- lock synchronized {
- channel.send(request)
- receive = channel.receive()
+ try{
+ lock synchronized {
+ channel.send(request)
+ receive = channel.receive()
+ }
+ } catch {
+ case e =>
+ // log it and let it go. Let controller shut it down.
+ debug("Exception occurs", e)
}
var response: RequestOrResponse = null
@@ -68,13 +75,15 @@ class RequestSendThread(val brokerId: In
case RequestKeys.StopReplicaRequest =>
response = StopReplicaResponse.readFrom(receive.buffer)
}
+ trace("got a response %s".format(controllerId, response, toBrokerId))
+
if(callback != null){
callback(response)
}
}
} catch{
- case e: InterruptedException => warn("Controller request send thread to broker %d is intterrupted. Shutting down".format(brokerId))
- case e1 => error("Error in controller request send thread to broker %d down due to ".format(brokerId), e1)
+ case e: InterruptedException => warn("intterrupted. Shutting down")
+ case e1 => error("Error due to ", e1)
}
shutDownLatch.countDown()
}
@@ -85,9 +94,10 @@ class ControllerChannelManager(allBroker
private val messageChannels = new HashMap[Int, BlockingChannel]
private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]]
private val messageThreads = new HashMap[Int, RequestSendThread]
+ this.logIdent = "Channel manager on controller " + config.brokerId + ", "
for(broker <- allBrokers){
brokers.put(broker.id, broker)
- info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
+ info("channel to broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
val channel = new BlockingChannel(broker.host, broker.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
@@ -99,7 +109,7 @@ class ControllerChannelManager(allBroker
def startUp() = {
for((brokerId, broker) <- brokers){
- val thread = new RequestSendThread(brokerId, messageQueues(brokerId), messageChannels(brokerId))
+ val thread = new RequestSendThread(config.brokerId, brokerId, messageQueues(brokerId), messageChannels(brokerId))
thread.setDaemon(false)
thread.start()
messageThreads.put(broker.id, thread)
@@ -119,14 +129,13 @@ class ControllerChannelManager(allBroker
def addBroker(broker: Broker){
brokers.put(broker.id, broker)
messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
- info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
val channel = new BlockingChannel(broker.host, broker.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
channel.connect()
messageChannels.put(broker.id, channel)
- val thread = new RequestSendThread(broker.id, messageQueues(broker.id), messageChannels(broker.id))
+ val thread = new RequestSendThread(config.brokerId, broker.id, messageQueues(broker.id), messageChannels(broker.id))
thread.setDaemon(false)
thread.start()
messageThreads.put(broker.id, thread)
@@ -146,38 +155,62 @@ class ControllerChannelManager(allBroker
}
}
-class KafkaController(config : KafkaConfig) extends Logging {
- info("controller startup");
- private val lock = new Object
-
- private var zkClient: ZkClient = null
+class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging {
+ this.logIdent = "Controller " + config.brokerId + ", "
+ info("startup");
+ private val controllerLock = new Object
private var controllerChannelManager: ControllerChannelManager = null
private var allBrokers : Set[Broker] = null
+ private var allBrokerIds : Set[Int] = null
private var allTopics: Set[String] = null
+ private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null
+ private var allLeaders: mutable.Map[(String, Int), Int] = null
- private def tryToBecomeController() = {
- lock synchronized {
- val curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
- if (curController == null){
- try {
- ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString())
-
- // Only the broker successfully registering as the controller can execute following code, otherwise
- // some exception will be thrown.
- registerBrokerChangeListener()
- registerTopicChangeListener()
- allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
- allTopics = ZkUtils.getAllTopics(zkClient).toSet
- controllerChannelManager = new ControllerChannelManager(allBrokers, config)
- controllerChannelManager.startUp()
- } catch {
- case e: ZkNodeExistsException =>
- registerControllerExistListener()
- info("Broker " + config.brokerId + " didn't succeed registering as the controller since it's taken by someone else")
- case e2 => throw e2
+ // Return true if this controller succeeds in the controller competition
+ private def tryToBecomeController(): Boolean = {
+ try {
+ ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString)
+ // Only the broker successfully registering as the controller can execute following code, otherwise
+ // some exception will be thrown.
+ registerBrokerChangeListener()
+ registerTopicChangeListener()
+ allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
+ allBrokerIds = allBrokers.map(_.id)
+ info("all brokers: %s".format(allBrokerIds))
+ allTopics = ZkUtils.getAllTopics(zkClient).toSet
+ info("all topics: %s".format(allTopics))
+ allPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics.iterator)
+ info("allPartitionReplicaAssignment: %s".format(allPartitionReplicaAssignment))
+ allLeaders = new mutable.HashMap[(String, Int), Int]
+ controllerChannelManager = new ControllerChannelManager(allBrokers, config)
+ controllerChannelManager.startUp()
+ return true
+ } catch {
+ case e: ZkNodeExistsException =>
+ registerControllerExistListener()
+ info("broker didn't succeed registering as the controller since it's taken by someone else")
+ return false
+ case e2 => throw e2
+ }
+ }
+
+ private def controllerRegisterOrFailover(){
+ info("try to become controller")
+ if(tryToBecomeController() == true){
+ info("won the controller competition and work on leader and isr recovery")
+ deliverLeaderAndISRFromZookeeper(allBrokerIds, allTopics)
+ debug("work on broker changes")
+ onBrokerChange()
+
+ // If there are some partition with leader not initialized, init the leader for them
+ val partitionReplicaAssignment = allPartitionReplicaAssignment.clone()
+ for((topicPartition, replicas) <- partitionReplicaAssignment){
+ if (allLeaders.contains(topicPartition)){
+ partitionReplicaAssignment.remove(topicPartition)
}
}
- else info("Broker " + config.brokerId + " see not null skip " + " current controller " + curController)
+ debug("work on init leaders: %s, current cache for all leader is: %s".format(partitionReplicaAssignment.toString(), allLeaders))
+ initLeaders(partitionReplicaAssignment)
}
}
@@ -186,17 +219,22 @@ class KafkaController(config : KafkaConf
}
def startup() = {
- zkClient = KafkaZookeeperClient.getZookeeperClient(config)
- registerSessionExpirationListener()
- registerControllerExistListener()
- tryToBecomeController()
+ controllerLock synchronized {
+ registerSessionExpirationListener()
+ registerControllerExistListener()
+ controllerRegisterOrFailover()
+ }
}
def shutDown() = {
- if(controllerChannelManager != null)
- controllerChannelManager.shutDown()
- if(zkClient != null)
- zkClient.close()
+ controllerLock synchronized {
+ if(controllerChannelManager != null){
+ info("shut down")
+ controllerChannelManager.shutDown()
+ controllerChannelManager = null
+ info("shutted down completely")
+ }
+ }
}
def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
@@ -219,7 +257,8 @@ class KafkaController(config : KafkaConf
zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistListener())
}
- class SessionExpireListener() extends IZkStateListener {
+ class SessionExpireListener() extends IZkStateListener with Logging {
+ this.logIdent = "Controller " + config.brokerId + ", "
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
@@ -234,50 +273,256 @@ class KafkaController(config : KafkaConf
*/
@throws(classOf[Exception])
def handleNewSession() {
- info("Controller session expires, clean up the state, current controller: " + config.brokerId)
- controllerChannelManager.shutDown()
- controllerChannelManager = null
- info("Controller session expires, the channel manager shut downr: " + config.brokerId)
- tryToBecomeController()
+ controllerLock synchronized {
+ info("session expires, clean up the state")
+ controllerChannelManager.shutDown()
+ controllerChannelManager = null
+ controllerRegisterOrFailover()
+ }
}
}
+ /**
+ * Used to populate the leaderAndISR from zookeeper to affected brokers when the brokers comes up
+ */
+ private def deliverLeaderAndISRFromZookeeper(brokerIds: Set[Int], topics: Set[String]) = {
+ val leaderAndISRInfos = ZkUtils.getPartitionLeaderAndISRForTopics(zkClient, topics.iterator)
+ val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]]
+ for((topicPartition, leaderAndISR) <- leaderAndISRInfos){
+ // If the leader specified in the leaderAndISR is no longer alive, there is no need to recover it
+ if(allBrokerIds.contains(leaderAndISR.leader)){
+ val brokersAssignedToThisPartitionOpt = allPartitionReplicaAssignment.get(topicPartition)
+ if(brokersAssignedToThisPartitionOpt == None){
+ warn("during leaderAndISR recovery, there's no replica assignment for partition [%s, %d] with allPartitionReplicaAssignment: %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
+ } else{
+ val relatedBrokersAssignedToThisPartition = brokersAssignedToThisPartitionOpt.get.filter(brokerIds.contains(_))
+ relatedBrokersAssignedToThisPartition.foreach(b => {
+ if(!brokerToLeaderAndISRInfosMap.contains(b))
+ brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
+ brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
+ })
+ allLeaders.put(topicPartition, leaderAndISR.leader)
+ }
+ } else
+ debug("during leaderAndISR recovery, the leader %d is not alive any more, just ignore it".format(leaderAndISR.leader))
+ }
+ info("during leaderAndISR recovery, the broker to request map is [%s]".format(brokerToLeaderAndISRInfosMap.toString()))
+
+ brokerToLeaderAndISRInfosMap.foreach(m =>{
+ val broker = m._1
+ val leaderAndISRs = m._2
+ val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.IsInit, leaderAndISRs)
+ info("during leaderAndISR recovery, the leaderAndISRRequest sent to new broker [%s] is [%s]".format(broker, leaderAndISRRequest.toString))
+ sendRequest(broker, leaderAndISRRequest)
+ })
+
+ info("after leaderAndISR recovery for brokers %s, the leaders assignment is %s".format(brokerIds, allLeaders))
+ }
+
+
+ private def initLeaders(partitionReplicaAssignment: collection.mutable.Map[(String, Int), Seq[Int]]) {
+ val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int),LeaderAndISR]]
+ for((topicPartition, replicaAssignment) <- partitionReplicaAssignment) {
+ val liveAssignedReplicas = replicaAssignment.filter(r => allBrokerIds.contains(r))
+ debug("for topic [%s], partition [%d], live assigned replicas are: [%s]"
+ .format(topicPartition._1,
+ topicPartition._2,
+ liveAssignedReplicas))
+ if(!liveAssignedReplicas.isEmpty){
+ debug("live assigned replica is not empty, check zkClient: %s".format(zkClient))
+ val leader = liveAssignedReplicas.head
+ var leaderAndISR: LeaderAndISR = null
+ var updateLeaderISRZKPathSucceeded: Boolean = false
+ while(!updateLeaderISRZKPathSucceeded){
+ val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2)
+ debug("curLeaderAndISROpt is %s, zkClient is %s ".format(curLeaderAndISROpt, zkClient))
+ if(curLeaderAndISROpt == None){
+ debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is empty".format(topicPartition._1, topicPartition._2))
+ leaderAndISR = new LeaderAndISR(leader, liveAssignedReplicas.toList)
+ ZkUtils.createPersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString)
+ updateLeaderISRZKPathSucceeded = true
+ } else{
+ debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is not empty".format(topicPartition._1, topicPartition._2))
+ val curZkPathVersion = curLeaderAndISROpt.get.zkVersion
+ leaderAndISR = new LeaderAndISR(leader, curLeaderAndISROpt.get.leaderEpoch + 1,liveAssignedReplicas.toList, curLeaderAndISROpt.get.zkVersion + 1)
+ val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion)
+ if(updateSucceeded){
+ leaderAndISR.zkVersion = newVersion
+ }
+ updateLeaderISRZKPathSucceeded = updateSucceeded
+ }
+ }
+ liveAssignedReplicas.foreach(b => {
+ if(!brokerToLeaderAndISRInfosMap.contains(b))
+ brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
+ brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
+ }
+ )
+ allLeaders.put(topicPartition, leaderAndISR.leader)
+ }
+ else{
+ warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment, allBrokerIds))
+ }
+ }
+
+ info("after leaders initialization for partition replica assignments %s, the cached leaders in controller is %s, and the broker to request map is: %s".format(partitionReplicaAssignment, allLeaders, brokerToLeaderAndISRInfosMap))
+ brokerToLeaderAndISRInfosMap.foreach(m =>{
+ val broker = m._1
+ val leaderAndISRs = m._2
+ val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRs)
+ info("at initializing leaders for new partitions, the leaderAndISR request sent to broker %d is %s".format(broker, leaderAndISRRequest))
+ sendRequest(broker, leaderAndISRRequest)
+ })
+ }
+
+
+ private def onBrokerChange(newBrokers: Set[Int] = null){
+ /** handle the new brokers, send request for them to initialize the local log **/
+ if(newBrokers != null)
+ deliverLeaderAndISRFromZookeeper(newBrokers, allTopics)
+
+ /** handle leader election for the partitions whose leader is no longer alive **/
+ val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]]
+ allLeaders.foreach(m =>{
+ val topicPartition = m._1
+ val leader = m._2
+ // We only care about the partitions, whose leader is no longer alive
+ if(!allBrokerIds.contains(leader)){
+ var updateLeaderISRZKPathSucceeded: Boolean = false
+ while(!updateLeaderISRZKPathSucceeded){
+ val assignedReplicasOpt = allPartitionReplicaAssignment.get(topicPartition)
+ if(assignedReplicasOpt == None)
+ throw new IllegalStateException("On broker changes, the assigned replica for [%s, %d], shouldn't be None, the general assignment is %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
+ val assignedReplicas = assignedReplicasOpt.get
+ val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => allBrokerIds.contains(r))
+ val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2)
+ if(curLeaderAndISROpt == None){
+ throw new IllegalStateException("On broker change, there's no leaderAndISR information for partition (%s, %d) in zookeeper".format(topicPartition._1, topicPartition._2))
+ }
+ val curLeaderAndISR = curLeaderAndISROpt.get
+ val leader = curLeaderAndISR.leader
+ var newLeader: Int = -1
+ val leaderEpoch = curLeaderAndISR.leaderEpoch
+ val ISR = curLeaderAndISR.ISR
+ val curZkPathVersion = curLeaderAndISR.zkVersion
+ debug("leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]".format(topicPartition._1, topicPartition._2, leader, leaderEpoch, ISR, curZkPathVersion))
+ // The leader is no longer alive, need reelection, we only care about the leader change here, the ISR change can be handled by the leader
+ var leaderAndISR: LeaderAndISR = null
+ // The ISR contains at least 1 broker in the live broker list
+ val liveBrokersInISR = ISR.filter(r => allBrokerIds.contains(r))
+ if(!liveBrokersInISR.isEmpty){
+ newLeader = liveBrokersInISR.head
+ leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch +1, liveBrokersInISR.toList, curZkPathVersion + 1)
+ debug("some broker in ISR is alive, new leader and ISR is %s".format(leaderAndISR.toString()))
+ } else{
+ debug("live broker in ISR is empty, see live assigned replicas: %s".format(liveAssignedReplicasToThisPartition))
+ if (!liveAssignedReplicasToThisPartition.isEmpty){
+ newLeader = liveAssignedReplicasToThisPartition.head
+ leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch + 1, List(newLeader), curZkPathVersion + 1)
+ warn("on broker change, no broker in ISR is alive, new leader elected is [%s], there's potential data loss".format(newLeader))
+ } else
+ error("on broker change, for partition ([%s, %d]), live brokers are: [%s], assigned replicas are: [%s]; no asigned replica is alive".format(topicPartition._1, topicPartition._2, allBrokerIds, assignedReplicas))
+ }
+ debug("the leader and ISR converted string: [%s]".format(leaderAndISR))
+ val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion)
+ if(updateSucceeded){
+ leaderAndISR.zkVersion = newVersion
+ liveAssignedReplicasToThisPartition.foreach(b => {
+ if(!brokerToLeaderAndISRInfosMap.contains(b))
+ brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
+ brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
+ })
+ allLeaders.put(topicPartition, newLeader)
+ info("on broker changes, allLeader is updated to %s".format(allLeaders))
+ }
+ updateLeaderISRZKPathSucceeded = updateSucceeded
+ }
+ }
+ })
+ trace("after acting on broker change, the broker to leaderAndISR request map is".format(brokerToLeaderAndISRInfosMap))
+ brokerToLeaderAndISRInfosMap.foreach(m => {
+ val broker = m._1
+ val leaderAndISRInfos = m._2
+ val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRInfos)
+ sendRequest(broker, leaderAndISRRequest)
+ info("on broker change, the LeaderAndISRRequest send to brokers [%d] is [%s]".format(leaderAndISRRequest, broker))
+ })
+ }
+
class BrokerChangeListener() extends IZkChildListener with Logging {
+ this.logIdent = "Controller " + config.brokerId + ", "
def handleChildChange(parentPath : String, javaCurChildren : java.util.List[String]) {
- import scala.collection.JavaConversions._
- lock synchronized {
- info("Broker change listener at controller triggerred")
- val allBrokerIds = allBrokers.map(_.id)
+ controllerLock synchronized {
+ info("broker change listener triggered")
val curChildrenSeq: Seq[String] = javaCurChildren
val curBrokerIdsSeq = curChildrenSeq.map(_.toInt)
val curBrokerIds = curBrokerIdsSeq.toSet
val addedBrokerIds = curBrokerIds -- allBrokerIds
val addedBrokersSeq = ZkUtils.getBrokerInfoFromIds(zkClient, addedBrokerIds.toSeq)
- info("Added brokers: " + addedBrokerIds.toString())
val deletedBrokerIds = allBrokerIds -- curBrokerIds
- info("Deleted brokers: " + deletedBrokerIds.toString())
-
allBrokers = ZkUtils.getBrokerInfoFromIds(zkClient, curBrokerIdsSeq).toSet
+ allBrokerIds = allBrokers.map(_.id)
+ info("added brokers: %s, deleted brokers: %s, all brokers: %s".format(addedBrokerIds, deletedBrokerIds, allBrokerIds))
+ addedBrokersSeq.foreach(controllerChannelManager.addBroker(_))
+ deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_))
+ onBrokerChange(addedBrokerIds)
+ }
+ }
+ }
- for(broker <- addedBrokersSeq){
- controllerChannelManager.addBroker(broker)
- }
- for (brokerId <- deletedBrokerIds){
- controllerChannelManager.removeBroker(brokerId)
- }
- /** TODO: add other broker change handler logic**/
+ private def handleNewTopics(topics: Set[String], partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
+ // get relevant partitions to this broker
+ val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => topics.contains(p._1._1))
+ trace("handling new topics, the partition replica assignment to be handled is %s".format(partitionReplicaAssignment))
+ initLeaders(partitionReplicaAssignment)
+ }
+
+ private def handleDeletedTopics(topics: Set[String], partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
+ val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
+ for((topicPartition, brokers) <- partitionReplicaAssignment){
+ for (broker <- brokers){
+ if (!brokerToPartitionToStopReplicaMap.contains(broker))
+ brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
+ brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
}
+ allLeaders.remove(topicPartition)
+ info("after deleting topics %s, allLeader is updated to %s and the broker to stop replia request map is %s".format(topics, allLeaders, brokerToPartitionToStopReplicaMap))
+ ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2))
+ }
+ for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
+ val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
+ info("handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
+ sendRequest(broker, stopReplicaRequest)
}
+ /*TODO: kafka-330 remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
}
class TopicChangeListener extends IZkChildListener with Logging {
+ this.logIdent = "Controller " + config.brokerId + ", "
+
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
- // TODO: Incomplete, do not need to review this time
+ controllerLock synchronized {
+ info("topic/partition change listener fired for path " + parentPath)
+ val currentChildren = JavaConversions.asBuffer(curChilds).toSet
+ val newTopics = currentChildren -- allTopics
+ val deletedTopics = allTopics -- currentChildren
+ val deletedPartitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => deletedTopics.contains(p._1._1))
+ allTopics = currentChildren
+
+ val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.iterator)
+ allPartitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1._1))
+ allPartitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
+ info("new topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics, deletedTopics, allPartitionReplicaAssignment))
+ handleNewTopics(newTopics, addedPartitionReplicaAssignment)
+ handleDeletedTopics(deletedTopics, deletedPartitionReplicaAssignment)
+ }
}
}
class ControllerExistListener extends IZkDataListener with Logging {
+ this.logIdent = "Controller " + config.brokerId + ", "
+
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
// do nothing, since No logic is needed here
@@ -285,8 +530,10 @@ class KafkaController(config : KafkaConf
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
- info("Controller fail over, broker " + config.brokerId + " try to become controller")
- tryToBecomeController()
+ controllerLock synchronized {
+ info("the current controller failed, competes to be new controller")
+ controllerRegisterOrFailover()
+ }
}
}
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Wed Aug 1 16:13:59 2012
@@ -24,41 +24,44 @@ import java.util.concurrent.atomic.Atomi
/**
* A thread that answers kafka requests.
*/
-class KafkaRequestHandler(val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
-
+class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
+ this.logIdent = "Kafka Request Handler " + id + " on Broker " + brokerId + ", "
+
def run() {
while(true) {
val req = requestChannel.receiveRequest()
- trace("Processor " + Thread.currentThread.getName + " got request " + req)
- if(req == RequestChannel.AllDone)
+ if(req == RequestChannel.AllDone){
+ trace("receives shut down command, shut down".format(brokerId, id))
return
+ }
+ debug("handles request " + req)
apis.handle(req)
}
}
def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
-
}
-class KafkaRequestHandlerPool(val requestChannel: RequestChannel,
- val apis: KafkaApis,
+class KafkaRequestHandlerPool(val brokerId: Int,
+ val requestChannel: RequestChannel,
+ val apis: KafkaApis,
numThreads: Int) extends Logging {
-
+ this.logIdent = "Kafka Request Handler on Broker " + brokerId + ", "
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
- runnables(i) = new KafkaRequestHandler(requestChannel, apis)
+ runnables(i) = new KafkaRequestHandler(i, brokerId, requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}
def shutdown() {
- info("Shutting down request handlers")
+ info("shutting down")
for(handler <- runnables)
handler.shutdown
for(thread <- threads)
thread.join
- info("Request handlers shut down")
+ info("shutted down completely")
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Wed Aug 1 16:13:59 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -24,8 +24,9 @@ import kafka.utils._
import java.util.concurrent._
import atomic.AtomicBoolean
import kafka.cluster.Replica
+import kafka.api.LeaderAndISR
+import scala.collection._
import org.I0Itec.zkclient.ZkClient
-import kafka.common.KafkaZookeeperClient
/**
@@ -33,7 +34,7 @@ import kafka.common.KafkaZookeeperClient
* to start up and shutdown a single Kafka node.
*/
class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
-
+ this.logIdent = "Kafka Server " + config.brokerId + ", "
val CleanShutdownFile = ".kafka_cleanshutdown"
private var isShuttingDown = new AtomicBoolean(false)
private var shutdownLatch = new CountDownLatch(1)
@@ -44,7 +45,7 @@ class KafkaServer(val config: KafkaConfi
var kafkaZookeeper: KafkaZooKeeper = null
var replicaManager: ReplicaManager = null
private var apis: KafkaApis = null
- var kafkaController: KafkaController = new KafkaController(config)
+ var kafkaController: KafkaController = null
val kafkaScheduler = new KafkaScheduler(4)
var zkClient: ZkClient = null
@@ -53,7 +54,7 @@ class KafkaServer(val config: KafkaConfi
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
- info("Starting Kafka server..." + config.brokerId)
+ info("starting")
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)
var needRecovery = true
@@ -62,11 +63,10 @@ class KafkaServer(val config: KafkaConfi
needRecovery = false
cleanShutDownFile.delete
}
- /* start client */
- info("Connecting to ZK: " + config.zkConnect)
- zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+
/* start scheduler */
kafkaScheduler.startUp
+
/* start log manager */
logManager = new LogManager(config,
kafkaScheduler,
@@ -75,88 +75,107 @@ class KafkaServer(val config: KafkaConfi
1000L * 60 * 60 * config.logRetentionHours,
needRecovery)
logManager.startup()
-
- socketServer = new SocketServer(config.port,
+
+ socketServer = new SocketServer(config.brokerId,
+ config.port,
config.numNetworkThreads,
config.monitoringPeriodSecs,
config.numQueuedRequests,
config.maxSocketRequestSize)
+
+ socketServer.startup
+
Utils.registerMBean(socketServer.stats, statsMBeanName)
- kafkaZookeeper = new KafkaZooKeeper(config, zkClient, addReplica, getReplica, makeLeader, makeFollower)
+ /* start client */
+ kafkaZookeeper = new KafkaZooKeeper(config)
+ // starting relevant replicas and leader election for partitions assigned to this broker
+ kafkaZookeeper.startup
- replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler)
+ info("Connecting to ZK: " + config.zkConnect)
- apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
- requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
- socketServer.startup()
+ replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, deleteLog)
+ kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
+ apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper,
+ addReplica, stopReplica, makeLeader, makeFollower, config.brokerId)
+ requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
Mx4jLoader.maybeLoad
- // starting relevant replicas and leader election for partitions assigned to this broker
- kafkaZookeeper.startup()
+ /**
+ * Registers this broker in ZK. After this, consumers can connect to broker.
+ * So this should happen after socket server start.
+ */
// start the replica manager
replicaManager.startup()
// start the controller
kafkaController.startup()
-
- info("Server started.")
+ info("started")
}
-
+
+
/**
* Shutdown API for shutting down a single instance of the Kafka server.
* Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread
*/
def shutdown() {
+ info("shutting down")
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
- info("Shutting down Kafka server with id " + config.brokerId)
+ if(requestHandlerPool != null)
+ requestHandlerPool.shutdown()
kafkaScheduler.shutdown()
apis.close()
+ kafkaZookeeper.shutdown()
if(replicaManager != null)
replicaManager.shutdown()
if (socketServer != null)
socketServer.shutdown()
- if(requestHandlerPool != null)
- requestHandlerPool.shutdown()
Utils.unregisterMBean(statsMBeanName)
if(logManager != null)
logManager.shutdown()
+
if(kafkaController != null)
kafkaController.shutDown()
- kafkaZookeeper.shutdown()
- zkClient.close()
+
val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
- debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
+ debug("creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
cleanShutDownFile.createNewFile
shutdownLatch.countDown()
- info("Kafka server with id %d shut down completed".format(config.brokerId))
+ info("shutted down completed")
}
}
-
+
/**
* After calling shutdown(), use this API to wait until the shutdown is complete
*/
def awaitShutdown(): Unit = shutdownLatch.await()
def addReplica(topic: String, partition: Int, assignedReplicas: Set[Int]): Replica = {
- info("Added local replica for topic %s partition %d on broker %d".format(topic, partition, config.brokerId))
- // get local log
val log = logManager.getOrCreateLog(topic, partition)
replicaManager.addLocalReplica(topic, partition, log, assignedReplicas)
}
- def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
- replicaManager.makeLeader(replica, currentISRInZk)
+ def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
+ replicaManager.makeLeader(replica, leaderAndISR)
}
- def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
- replicaManager.makeFollower(replica, leaderBrokerId, zkClient)
+ def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
+ replicaManager.makeFollower(replica, leaderAndISR)
}
def getReplica(topic: String, partition: Int): Option[Replica] =
replicaManager.getReplica(topic, partition)
+ def stopReplica(topic: String, partition: Int): Short = {
+ replicaManager.stopReplica(topic, partition)
+ }
+
+ def deleteLog(topic: String, partition: Int): Unit = {
+ /* TODO: handle deleteLog in a better way */
+ //logManager.deleteLog(topic, partition)
+ }
+
def getLogManager(): LogManager = logManager
def getStats(): SocketServerStats = socketServer.stats
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Wed Aug 1 16:13:59 2012
@@ -18,46 +18,29 @@
package kafka.server
import java.net.InetAddress
-import kafka.cluster.Replica
import kafka.utils._
import org.apache.zookeeper.Watcher.Event.KeeperState
-import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
-import kafka.admin.AdminUtils
-import java.lang.Thread
-import collection.mutable.HashSet
+import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
import kafka.common._
+
/**
* Handles the server's interaction with zookeeper. The server needs to register the following paths:
* /topics/[topic]/[node_id-partition_num]
* /brokers/[0...N] --> host:port
- *
*/
-class KafkaZooKeeper(config: KafkaConfig,
- zkClient: ZkClient,
- addReplicaCbk: (String, Int, Set[Int]) => Replica,
- getReplicaCbk: (String, Int) => Option[Replica],
- becomeLeader: (Replica, Seq[Int]) => Unit,
- becomeFollower: (Replica, Int, ZkClient) => Unit) extends Logging {
+class KafkaZooKeeper(config: KafkaConfig) extends Logging {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
- private var leaderChangeListener: LeaderChangeListener = null
- private var topicPartitionsChangeListener: TopicChangeListener = null
- private var stateChangeHandler: StateChangeCommandHandler = null
-
- private val topicListenerLock = new Object
- private val leaderChangeLock = new Object
-
- def startup() {
- leaderChangeListener = new LeaderChangeListener
- topicPartitionsChangeListener = new TopicChangeListener
- leaderChangeListener = new LeaderChangeListener
- topicPartitionsChangeListener = new TopicChangeListener
- startStateChangeCommandHandler()
- zkClient.subscribeStateChanges(new SessionExpireListener)
- registerBrokerInZk()
- subscribeToTopicAndPartitionsChanges(true)
- }
+ private var zkClient: ZkClient = null
+
+ def startup() {
+ /* start client */
+ info("connecting to ZK: " + config.zkConnect)
+ zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+ zkClient.subscribeStateChanges(new SessionExpireListener)
+ registerBrokerInZk()
+ }
private def registerBrokerInZk() {
info("Registering broker " + brokerIdPath)
@@ -66,13 +49,6 @@ class KafkaZooKeeper(config: KafkaConfig
ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
}
- private def startStateChangeCommandHandler() {
- val stateChangeQ = new ZkQueue(zkClient, ZkUtils.getBrokerStateChangePath(config.brokerId), config.stateChangeQSize)
- stateChangeHandler = new StateChangeCommandHandler("StateChangeCommandHandler", config, stateChangeQ,
- ensureStateChangeCommandValidityOnThisBroker, ensureEpochValidity)
- stateChangeHandler.start()
- }
-
/**
* When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
* connection for us. We need to re-register this broker in the broker registry.
@@ -96,20 +72,24 @@ class KafkaZooKeeper(config: KafkaConfig
registerBrokerInZk()
info("done re-registering broker")
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
- zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
- val topics = ZkUtils.getAllTopics(zkClient)
- debug("Existing topics are %s".format(topics.mkString(",")))
- topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener))
- handleNewTopics(topics)
}
}
def shutdown() {
- stateChangeHandler.shutdown()
+ if (zkClient != null) {
+ info("Closing zookeeper client...")
+ zkClient.close()
+ }
+ }
+
+ private def doesTopicExistInCluster(topic: String) : Boolean = {
+ val allTopics = ZkUtils.getAllTopics(zkClient)
+ trace("all topics, %s, topic %s".format(allTopics, topic))
+ allTopics.contains(topic)
}
def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
- if(!topicPartitionsChangeListener.doesTopicExistInCluster(topic))
+ if(!doesTopicExistInCluster(topic))
throw new UnknownTopicException("Topic %s doesn't exist in the cluster".format(topic))
// check if partition id is invalid
if(partition < 0)
@@ -124,256 +104,7 @@ class KafkaZooKeeper(config: KafkaConfig
}
}
- def getZookeeperClient = zkClient
-
- def handleNewTopics(topics: Seq[String]) {
- // get relevant partitions to this broker
- val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
- debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
- for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
- // subscribe to leader changes for these partitions
- subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
- // start replicas for these partitions
- startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
- }
- }
-
- def subscribeToTopicAndPartitionsChanges(startReplicas: Boolean) {
- info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
- zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
- val topics = ZkUtils.getAllTopics(zkClient)
- val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
- debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
- for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
- // subscribe to leader changes for these partitions
- subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
-
- // start replicas for these partitions
- if(startReplicas)
- startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
- }
- }
-
- private def subscribeToLeaderForPartitions(topic: String, partitions: Seq[Int]) {
- partitions.foreach { partition =>
- info("Broker %d subscribing to leader changes for topic %s partition %d".format(config.brokerId, topic, partition))
- // register leader change listener
- zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
- }
- }
-
- private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) {
- partitions.foreach { partition =>
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
- info("Assigned replicas list for topic %s partition %d is %s".format(topic, partition, assignedReplicas.mkString(",")))
- if(assignedReplicas.contains(config.brokerId)) {
- val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet)
- startReplica(replica)
- } else
- warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it"
- .format(partition, topic, config.brokerId))
- }
- }
-
- private def startReplica(replica: Replica) {
- info("Starting replica for topic %s partition %d on broker %d"
- .format(replica.topic, replica.partition.partitionId, replica.brokerId))
- ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
- case Some(leader) =>
- info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,leader))
- // check if this broker is the leader, if not, then become follower
- if(leader != config.brokerId)
- becomeFollower(replica, leader, zkClient)
- case None => // leader election
- leaderElection(replica)
- }
- }
-
- def leaderElection(replica: Replica) {
- info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partitionId))
- // read the AR list for replica.partition from ZK
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId).map(_.toInt)
- val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId)
- val liveBrokers = ZkUtils.getSortedBrokerList(zkClient).map(_.toInt)
- if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId, assignedReplicas, inSyncReplicas, liveBrokers)) {
- info("Broker %d will participate in leader election for topic %s partition %d"
- .format(config.brokerId, replica.topic, replica.partition.partitionId))
- // wait for some time if it is not the preferred replica
- try {
- if(replica.brokerId != assignedReplicas.head) {
- // sleep only if the preferred replica is alive
- if(liveBrokers.contains(assignedReplicas.head)) {
- info("Preferred replica %d for topic %s ".format(assignedReplicas.head, replica.topic) +
- "partition %d is alive. Waiting for %d ms to allow it to become leader"
- .format(replica.partition.partitionId, config.preferredReplicaWaitTime))
- Thread.sleep(config.preferredReplicaWaitTime)
- }
- }
- } catch {
- case e => // ignoring
- }
- val newLeaderEpochAndISR = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic,
- replica.partition.partitionId, replica.brokerId)
- newLeaderEpochAndISR match {
- case Some(epochAndISR) =>
- info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic,
- replica.partition.partitionId))
- info("Current ISR for topic %s partition %d is %s".format(replica.topic, replica.partition.partitionId,
- epochAndISR._2.mkString(",")))
- becomeLeader(replica, epochAndISR._2)
- case None =>
- ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
- case Some(leader) =>
- becomeFollower(replica, leader, zkClient)
- case None =>
- error("Lost leader for topic %s partition %d right after leader election".format(replica.topic,
- replica.partition.partitionId))
- }
- }
- }
- }
-
- private def canBecomeLeader(brokerId: Int, topic: String, partition: Int, assignedReplicas: Seq[Int],
- inSyncReplicas: Seq[Int], liveBrokers: Seq[Int]): Boolean = {
- // TODO: raise alert, mark the partition offline if no broker in the assigned replicas list is alive
- assert(assignedReplicas.size > 0, "There should be at least one replica in the assigned replicas list for topic " +
- " %s partition %d".format(topic, partition))
- inSyncReplicas.size > 0 match {
- case true => // check if this broker is in the ISR. If yes, return true
- inSyncReplicas.contains(brokerId) match {
- case true =>
- info("Broker %d can become leader since it is in the ISR %s".format(brokerId, inSyncReplicas.mkString(",")) +
- " for topic %s partition %d".format(topic, partition))
- true
- case false =>
- // check if any broker in the ISR is alive. If not, return true only if this broker is in the AR
- val liveBrokersInISR = inSyncReplicas.filter(r => liveBrokers.contains(r))
- liveBrokersInISR.isEmpty match {
- case true =>
- if(assignedReplicas.contains(brokerId)) {
- info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
- " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
- .format(partition, brokerId, assignedReplicas.mkString(",")))
- true
- } else {
- info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
- " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
- .format(partition, brokerId, assignedReplicas.mkString(",")))
- false
- }
- case false =>
- info("ISR for topic %s partition %d is %s. Out of these %s brokers are alive. Broker %d "
- .format(topic, partition, inSyncReplicas.mkString(",")) + "cannot become leader since it doesn't exist " +
- "in the ISR")
- false // let one of the live brokers in the ISR become the leader
- }
- }
- case false =>
- if(assignedReplicas.contains(brokerId)) {
- info("ISR for topic %s partition %d is empty. Broker %d can become leader since it "
- .format(topic, partition, brokerId) + "is part of the assigned replicas list")
- true
- } else {
- info("ISR for topic %s partition %d is empty. Broker %d cannot become leader since it "
- .format(topic, partition, brokerId) + "is not part of the assigned replicas list")
- false
- }
- }
- }
-
- class TopicChangeListener extends IZkChildListener with Logging {
- private val allTopics = new HashSet[String]()
- // read existing topics, if any
- allTopics ++= ZkUtils.getAllTopics(zkClient)
-
- @throws(classOf[Exception])
- def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
- import collection.JavaConversions
- topicListenerLock.synchronized {
- debug("Topic/partition change listener fired for path " + parentPath)
- val currentChildren = JavaConversions.asBuffer(curChilds).toSet
- val newTopics = currentChildren -- allTopics
- val deletedTopics = allTopics -- currentChildren
- allTopics.clear()
- allTopics ++= currentChildren
-
- debug("New topics: [%s]. Deleted topics: [%s]".format(newTopics.mkString(","), deletedTopics.mkString(",")))
- debug("Current topics in the cluster: [%s]".format(allTopics.mkString(",")))
- handleNewTopics(newTopics.toSeq)
- // TODO: Handle topic deletions
- // handleDeletedTopics(deletedTopics.toSeq)
- }
- }
-
- def doesTopicExistInCluster(topic: String): Boolean = {
- topicListenerLock.synchronized {
- allTopics.contains(topic)
- }
- }
- }
-
- private def ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand: StateChangeCommand): Boolean = {
- // check if this broker hosts a replica for this topic and partition
- ZkUtils.isPartitionOnBroker(zkClient, stateChangeCommand.topic, stateChangeCommand.partition, config.brokerId)
- }
-
- private def ensureEpochValidity(stateChangeCommand: StateChangeCommand): Boolean = {
- // get the topic and partition that this request is meant for
- val topic = stateChangeCommand.topic
- val partition = stateChangeCommand.partition
- val epoch = stateChangeCommand.epoch
-
- val currentLeaderEpoch = ZkUtils.getEpochForPartition(zkClient, topic, partition)
- // check if the request's epoch matches the current leader's epoch OR the admin command's epoch
- val validEpoch = (currentLeaderEpoch == epoch) || (epoch == AdminUtils.AdminEpoch)
- if(epoch > currentLeaderEpoch)
- throw new IllegalStateException(("Illegal epoch state. Request's epoch %d larger than registered epoch %d for " +
- "topic %s partition %d").format(epoch, currentLeaderEpoch, topic, partition))
- validEpoch
- }
-
- class LeaderChangeListener extends IZkDataListener with Logging {
-
- @throws(classOf[Exception])
- def handleDataChange(dataPath: String, data: Object) {
- // handle leader change event for path
- val newLeaderAndEpochInfo: String = data.asInstanceOf[String]
- val newLeader = newLeaderAndEpochInfo.split(";").head.toInt
- val newEpoch = newLeaderAndEpochInfo.split(";").last.toInt
- debug("Leader change listener fired on broker %d for path %s. New leader is %d. New epoch is %d".format(config.brokerId,
- dataPath, newLeader, newEpoch))
- val topicPartitionInfo = dataPath.split("/")
- val topic = topicPartitionInfo.takeRight(4).head
- val partition = topicPartitionInfo.takeRight(2).head.toInt
- info("Updating leader change information in replica for topic %s partition %d".format(topic, partition))
- val replica = getReplicaCbk(topic, partition).getOrElse(null)
- assert(replica != null, "Replica for topic %s partition %d should exist on broker %d"
- .format(topic, partition, config.brokerId))
- replica.partition.leaderId(Some(newLeader))
- assert(getReplicaCbk(topic, partition).get.partition.leaderId().get == newLeader, "New leader should be set correctly")
- }
-
- @throws(classOf[Exception])
- def handleDataDeleted(dataPath: String) {
- leaderChangeLock.synchronized {
- // leader is deleted for topic partition
- val topic = dataPath.split("/").takeRight(4).head
- val partitionId = dataPath.split("/").takeRight(2).head.toInt
- debug("Leader deleted listener fired for topic %s partition %d on broker %d"
- .format(topic, partitionId, config.brokerId))
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionId).map(r => r.toInt)
- if(assignedReplicas.contains(config.brokerId)) {
- val replica = getReplicaCbk(topic, partitionId)
- replica match {
- case Some(r) => leaderElection(r)
- case None => error("No replica exists for topic %s partition %s on broker %d"
- .format(topic, partitionId, config.brokerId))
- }
- }
- }
- }
+ def getZookeeperClient = {
+ zkClient
}
}
-
-
-
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala Wed Aug 1 16:13:59 2012
@@ -20,10 +20,10 @@ package kafka.server
import kafka.cluster.Broker
class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
- extends AbstractFetcherManager("ReplicaFetcherManager", brokerConfig.numReplicaFetchers) {
+ extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) {
override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
- new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(sourceBroker.id, fetcherId), sourceBroker, brokerConfig, replicaMgr)
+ new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d on broker %d, ".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
}
def shutdown() {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Wed Aug 1 16:13:59 2012
@@ -18,64 +18,88 @@ package kafka.server
import kafka.log.Log
import kafka.cluster.{Partition, Replica}
-import collection.mutable
+import collection._
import mutable.ListBuffer
import org.I0Itec.zkclient.ZkClient
import java.util.concurrent.locks.ReentrantLock
import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
-import kafka.common.{KafkaException, InvalidPartitionException}
+import kafka.api.LeaderAndISR
+import java.util.concurrent.atomic.AtomicBoolean
+import kafka.common.{BrokerNotExistException, KafkaException, ErrorMapping, InvalidPartitionException}
-class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient, kafkaScheduler: KafkaScheduler)
- extends Logging {
- private var allReplicas = new mutable.HashMap[(String, Int), Partition]()
+class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler, deleteLocalLog: (String, Int) => Unit) extends Logging {
+
+ var allPartitions = new mutable.HashMap[(String, Int), Partition]()
private var leaderReplicas = new ListBuffer[Partition]()
private val leaderReplicaLock = new ReentrantLock()
private val replicaFetcherManager = new ReplicaFetcherManager(config, this)
+ this.logIdent = "Replica Manager on Broker " + config.brokerId + ", "
+
+ val hwCheckPointThreadStarted = new AtomicBoolean(false)
private val highwaterMarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
- info("Created highwatermark file %s on broker %d".format(highwaterMarkCheckpoint.name, config.brokerId))
+ info("Created highwatermark file %s".format(highwaterMarkCheckpoint.name))
+
+ def startHighWaterMarksCheckPointThread() = {
+ if(hwCheckPointThreadStarted.compareAndSet(false, true))
+ kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs)
+ }
def startup() {
// start the highwatermark checkpoint thread
- kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0,
- config.defaultFlushIntervalMs)
// start ISR expiration thread
kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
}
def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
- val localReplica = new Replica(config.brokerId, partition, topic, time,
- Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log))
-
+ var retReplica : Replica = null
val replicaOpt = partition.getReplica(config.brokerId)
replicaOpt match {
case Some(replica) =>
- info("Changing remote replica %s into a local replica".format(replica.toString))
+ info("changing remote replica %s into a local replica".format(replica.toString))
replica.log match {
case None =>
replica.log = Some(log)
case Some(log) => // nothing to do since log already exists
}
+ retReplica = replica
case None =>
+ val localReplica = new Replica(config.brokerId, partition, topic, time,
+ Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log))
partition.addReplica(localReplica)
+ info("adding local replica %d for topic %s partition %s on broker %d".format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
+ retReplica = localReplica
}
val assignedReplicas = assignedReplicaIds.map(partition.getReplica(_).get)
partition.assignedReplicas(Some(assignedReplicas))
// get the replica objects for the assigned replicas for this partition
- info("Added local replica %d for topic %s partition %s on broker %d"
- .format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
- localReplica
+ retReplica
}
+ def stopReplica(topic: String, partition: Int): Short = {
+ trace("handling stop replica for partition [%s, %d]".format(topic, partition))
+ val errorCode = ErrorMapping.NoError
+ val replica = getReplica(topic, partition)
+ if(replica.isDefined){
+ replicaFetcherManager.removeFetcher(topic, partition)
+ deleteLocalLog(topic, partition)
+ allPartitions.remove((topic, partition))
+ info("after removing partition (%s, %d), the rest of allReplicas is: [%s]".format(topic, partition, allPartitions))
+ }
+ trace("finishes handling stop replica [%s, %d]".format(topic, partition))
+ errorCode
+ }
+
+
def getOrCreatePartition(topic: String, partitionId: Int, assignedReplicaIds: Set[Int]): Partition = {
- val newPartition = allReplicas.contains((topic, partitionId))
+ val newPartition = allPartitions.contains((topic, partitionId))
newPartition match {
case true => // partition exists, do nothing
- allReplicas.get((topic, partitionId)).get
+ allPartitions.get((topic, partitionId)).get
case false => // create remote replicas for each replica id in assignedReplicas
val partition = new Partition(topic, partitionId, time)
- allReplicas += (topic, partitionId) -> partition
+ allPartitions += (topic, partitionId) -> partition
(assignedReplicaIds - config.brokerId).foreach(
replicaId => addRemoteReplica(topic, partitionId, replicaId, partition))
partition
@@ -83,12 +107,11 @@ class ReplicaManager(val config: KafkaCo
}
def ensurePartitionExists(topic: String, partitionId: Int): Partition = {
- val partitionOpt = allReplicas.get((topic, partitionId))
+ val partitionOpt = allPartitions.get((topic, partitionId))
partitionOpt match {
case Some(partition) => partition
case None =>
- throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d"
- .format(topic, partitionId, config.brokerId))
+ throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d".format(topic, partitionId, config.brokerId))
}
}
@@ -97,32 +120,34 @@ class ReplicaManager(val config: KafkaCo
val replicaAdded = partition.addReplica(remoteReplica)
if(replicaAdded)
- info("Added remote replica %d for topic %s partition %s on broker %d"
- .format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId, config.brokerId))
+ info("added remote replica %d for topic %s partition %s".format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId))
remoteReplica
}
def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = {
- val replicasOpt = allReplicas.get((topic, partitionId))
- replicasOpt match {
- case Some(replicas) =>
- replicas.getReplica(replicaId)
+ val partitionOpt = allPartitions.get((topic, partitionId))
+ partitionOpt match {
+ case Some(partition) =>
+ partition.getReplica(replicaId)
case None =>
None
}
}
def getLeaderReplica(topic: String, partitionId: Int): Option[Replica] = {
- val replicasOpt = allReplicas.get((topic, partitionId))
+ val replicasOpt = allPartitions.get((topic, partitionId))
replicasOpt match {
case Some(replicas) =>
Some(replicas.leaderReplica())
case None =>
throw new KafkaException("Getting leader replica failed. Partition replica metadata for topic " +
- "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
+ "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
}
}
+ def getPartition(topic: String, partitionId: Int): Option[Partition] =
+ allPartitions.get((topic, partitionId))
+
private def updateReplicaLeo(replica: Replica, fetchOffset: Long) {
// set the replica leo
val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
@@ -137,38 +162,41 @@ class ReplicaManager(val config: KafkaCo
val newHw = allLeos.min
val oldHw = partition.leaderHW()
if(newHw > oldHw) {
- debug("Updating leader HW for topic %s partition %d to %d".format(replica.topic, replica.partition.partitionId, newHw))
partition.leaderHW(Some(newHw))
}else
debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s".format(replica.topic,
- replica.partition.partitionId, oldHw, newHw, allLeos.mkString(",")))
+ replica.partition.partitionId, oldHw, newHw, allLeos.mkString(",")))
}
- def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
- info("Broker %d started the leader state transition for topic %s partition %d"
- .format(config.brokerId, replica.topic, replica.partition.partitionId))
+ def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
+ info("becoming Leader for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId))
+ info("started the leader state transition for topic %s partition %d"
+ .format(replica.topic, replica.partition.partitionId))
try {
// read and cache the ISR
replica.partition.leaderId(Some(replica.brokerId))
- replica.partition.updateISR(currentISRInZk.toSet)
+ replica.partition.updateISR(leaderAndISR.ISR.toSet)
// stop replica fetcher thread, if any
replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
// also add this partition to the list of partitions for which the leader is the current broker
leaderReplicaLock.lock()
leaderReplicas += replica.partition
- info("Broker %d completed the leader state transition for topic %s partition %d"
- .format(config.brokerId, replica.topic, replica.partition.partitionId))
+ info("completed the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId))
+ ErrorMapping.NoError
}catch {
- case e => error("Broker %d failed to complete the leader state transition for topic %s partition %d"
- .format(config.brokerId, replica.topic, replica.partition.partitionId), e)
+ case e => error("failed to complete the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId), e)
+ ErrorMapping.UnknownCode
+ /* TODO: add specific error code */
}finally {
leaderReplicaLock.unlock()
}
}
- def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
- info("Broker %d starting the follower state transition to follow leader %d for topic %s partition %d"
- .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+
+ def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
+ val leaderBrokerId: Int = leaderAndISR.leader
+ info("starting the follower state transition to follow leader %d for topic %s partition %d"
+ .format(leaderBrokerId, replica.topic, replica.partition.partitionId))
try {
// set the leader for this partition correctly on this broker
replica.partition.leaderId(Some(leaderBrokerId))
@@ -177,13 +205,13 @@ class ReplicaManager(val config: KafkaCo
log.truncateTo(replica.highWatermark())
case None =>
}
+ debug("for partition [%s, %d], the leaderBroker is [%d]".format(replica.topic, replica.partition.partitionId, leaderAndISR.leader))
// get leader for this replica
val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId)
// become follower only if it is not already following the same leader
if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) {
- info("broker %d becoming follower to leader %d for topic %s partition %d"
- .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+ info("becoming follower to leader %d for topic %s partition %d".format(leaderBrokerId, replica.topic, replica.partition.partitionId))
// stop fetcher thread to previous leader
replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
// start fetcher thread to current leader
@@ -192,11 +220,15 @@ class ReplicaManager(val config: KafkaCo
// remove this replica's partition from the ISR expiration queue
leaderReplicaLock.lock()
leaderReplicas -= replica.partition
- info("Broker %d completed the follower state transition to follow leader %d for topic %s partition %d"
- .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
- }catch {
- case e => error("Broker %d failed to complete the follower state transition to follow leader %d for topic %s partition %d"
- .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId), e)
+ info("completed the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId))
+ ErrorMapping.NoError
+ } catch {
+ case e: BrokerNotExistException =>
+ error("failed to complete the follower state transition to follow leader %d for topic %s partition %d because the leader broker does not exist in the cluster".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e)
+ ErrorMapping.BrokerNotExistInZookeeperCode
+ case e =>
+ error("failed to complete the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e)
+ ErrorMapping.UnknownCode
}finally {
leaderReplicaLock.unlock()
}
@@ -204,21 +236,18 @@ class ReplicaManager(val config: KafkaCo
private def maybeShrinkISR(): Unit = {
try {
- info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
- .format(config.replicaMaxLagTimeMs))
+ info("evaluating ISR list of partitions to see which replicas can be removed from the ISR")
leaderReplicaLock.lock()
- leaderReplicas.foreach { partition =>
- // shrink ISR if a follower is slow or stuck
+ leaderReplicas.foreach(partition => {
val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)
if(outOfSyncReplicas.size > 0) {
val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.size > 0)
- info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId,
- newInSyncReplicas.map(_.brokerId).mkString(",")))
+ info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in zk and in memory
partition.updateISR(newInSyncReplicas.map(_.brokerId), Some(zkClient))
}
- }
+ })
}catch {
case e1 => error("Error in ISR expiration thread. Shutting down due to ", e1)
}finally {
@@ -233,8 +262,7 @@ class ReplicaManager(val config: KafkaCo
val leaderHW = partition.leaderHW()
replica.logEndOffset() >= leaderHW
}
- else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) +
- " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
+ else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) + " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
}
def recordFollowerPosition(topic: String, partition: Int, replicaId: Int, offset: Long, zkClient: ZkClient) = {
@@ -268,21 +296,21 @@ class ReplicaManager(val config: KafkaCo
* Flushes the highwatermark value for all partitions to the highwatermark file
*/
def checkpointHighwaterMarks() {
- val highwaterMarksForAllPartitions = allReplicas.map { partition =>
- val topic = partition._1._1
- val partitionId = partition._1._2
- val localReplicaOpt = partition._2.getReplica(config.brokerId)
- val hw = localReplicaOpt match {
- case Some(localReplica) => localReplica.highWatermark()
- case None =>
- error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) +
- " Replica metadata doesn't exist in replica manager on broker " + config.brokerId)
- 0L
- }
- (topic, partitionId) -> hw
- }.toMap
+ val highwaterMarksForAllPartitions = allPartitions.map
+ { partition =>
+ val topic = partition._1._1
+ val partitionId = partition._1._2
+ val localReplicaOpt = partition._2.getReplica(config.brokerId)
+ val hw = localReplicaOpt match {
+ case Some(localReplica) => localReplica.highWatermark()
+ case None =>
+ error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) + " Replica metadata doesn't exist")
+ 0L
+ }
+ (topic, partitionId) -> hw
+ }.toMap
highwaterMarkCheckpoint.write(highwaterMarksForAllPartitions)
- info("Checkpointed highwatermarks")
+ info("Checkpointed high watermark data: %s".format(highwaterMarksForAllPartitions))
}
/**
@@ -292,8 +320,9 @@ class ReplicaManager(val config: KafkaCo
def readCheckpointedHighWatermark(topic: String, partition: Int): Long = highwaterMarkCheckpoint.read(topic, partition)
def shutdown() {
+ info("shut down")
replicaFetcherManager.shutdown()
checkpointHighwaterMarks()
- info("Replica manager shutdown on broker " + config.brokerId)
+ info("shuttedd down completely")
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Wed Aug 1 16:13:59 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -39,7 +39,7 @@ class DelayedRequest(val keys: Seq[Any],
* request to be satisfied. For example it could be that we are waiting for user-specified number of acks on a given (topic, partition)
* to be able to respond to a request or it could be that we are waiting for a given number of bytes to accumulate on a given request
* to be able to respond to that request (in the simple case we might wait for at least one byte to avoid busy waiting).
- *
+ *
* For us the key is generally a (topic, partition) pair.
* By calling
* watch(delayedRequest)
@@ -47,27 +47,27 @@ class DelayedRequest(val keys: Seq[Any],
* val satisfied = update(key, request)
* when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this
* new request.
- *
+ *
* An implementation provides extends two helper functions
* def checkSatisfied(request: R, delayed: T): Boolean
* this function returns true if the given request (in combination with whatever previous requests have happened) satisfies the delayed
* request delayed. This method will likely also need to do whatever bookkeeping is necessary.
- *
+ *
* The second function is
* def expire(delayed: T)
* this function handles delayed requests that have hit their time limit without being satisfied.
- *
+ *
*/
-abstract class RequestPurgatory[T <: DelayedRequest, R] {
-
+abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) extends Logging{
+ this.logIdent = logPrefix
/* a list of requests watching each key */
private val watchersForKey = new ConcurrentHashMap[Any, Watchers]
-
+
/* background thread expiring requests that have been waiting too long */
- private val expiredRequestReaper = new ExpiredRequestReaper
+ private val expiredRequestReaper = new ExpiredRequestReaper(logPrefix)
private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
expirationThread.start()
-
+
/**
* Add a new delayed request watching the contained keys
*/
@@ -78,7 +78,7 @@ abstract class RequestPurgatory[T <: Del
}
expiredRequestReaper.enqueue(delayedRequest)
}
-
+
/**
* Update any watchers and return a list of newly satisfied requests.
*/
@@ -89,7 +89,7 @@ abstract class RequestPurgatory[T <: Del
else
w.collectSatisfiedRequests(request)
}
-
+
private def watchersFor(key: Any): Watchers = {
var lst = watchersForKey.get(key)
if(lst == null) {
@@ -98,46 +98,46 @@ abstract class RequestPurgatory[T <: Del
}
lst
}
-
+
/**
* Check if this request satisfied this delayed request
*/
protected def checkSatisfied(request: R, delayed: T): Boolean
-
+
/**
* Handle an expired delayed request
*/
protected def expire(delayed: T)
-
+
/**
* Shutdown the expirey thread
*/
def shutdown() {
expiredRequestReaper.shutdown()
}
-
+
/**
* A linked list of DelayedRequests watching some key with some associated bookeeping logic
*/
private class Watchers {
-
+
/* a few magic parameters to help do cleanup to avoid accumulating old watchers */
private val CleanupThresholdSize = 100
private val CleanupThresholdPrct = 0.5
-
+
private val requests = new LinkedList[T]
-
+
/* you can only change this if you have added something or marked something satisfied */
var liveCount = 0.0
-
+
def add(t: T) {
synchronized {
- requests.add(t)
- liveCount += 1
- maybePurge()
- }
+ requests.add(t)
+ liveCount += 1
+ maybePurge()
+ }
}
-
+
private def maybePurge() {
if(requests.size > CleanupThresholdSize && liveCount / requests.size < CleanupThresholdPrct) {
val iter = requests.iterator()
@@ -148,55 +148,56 @@ abstract class RequestPurgatory[T <: Del
}
}
}
-
+
def decLiveCount() {
synchronized {
- liveCount -= 1
- }
+ liveCount -= 1
+ }
}
-
+
def collectSatisfiedRequests(request: R): Seq[T] = {
val response = new mutable.ArrayBuffer[T]
synchronized {
- val iter = requests.iterator()
- while(iter.hasNext) {
- val curr = iter.next
- if(curr.satisfied.get) {
- // another thread has satisfied this request, remove it
- iter.remove()
- } else {
- if(checkSatisfied(request, curr)) {
- iter.remove()
- val updated = curr.satisfied.compareAndSet(false, true)
- if(updated == true) {
- response += curr
- liveCount -= 1
- expiredRequestReaper.satisfyRequest()
- }
- }
- }
- }
- }
+ val iter = requests.iterator()
+ while(iter.hasNext) {
+ val curr = iter.next
+ if(curr.satisfied.get) {
+ // another thread has satisfied this request, remove it
+ iter.remove()
+ } else {
+ if(checkSatisfied(request, curr)) {
+ iter.remove()
+ val updated = curr.satisfied.compareAndSet(false, true)
+ if(updated == true) {
+ response += curr
+ liveCount -= 1
+ expiredRequestReaper.satisfyRequest()
+ }
+ }
+ }
+ }
+ }
response
}
}
-
+
/**
* Runnable to expire requests that have sat unfullfilled past their deadline
*/
- private class ExpiredRequestReaper extends Runnable with Logging {
-
+ private class ExpiredRequestReaper(logPrefix: String) extends Runnable with Logging {
+ this.logIdent = "ExpiredRequestReaper for " + logPrefix
+
/* a few magic parameters to help do cleanup to avoid accumulating old watchers */
private val CleanupThresholdSize = 100
private val CleanupThresholdPrct = 0.5
-
+
private val delayed = new DelayQueue[T]
private val running = new AtomicBoolean(true)
private val shutdownLatch = new CountDownLatch(1)
private val needsPurge = new AtomicBoolean(false)
/* The count of elements in the delay queue that are unsatisfied */
private val unsatisfied = new AtomicInteger(0)
-
+
/** Main loop for the expiry thread */
def run() {
while(running.get) {
@@ -204,18 +205,18 @@ abstract class RequestPurgatory[T <: Del
val curr = pollExpired()
expire(curr)
} catch {
- case ie: InterruptedException =>
+ case ie: InterruptedException =>
if(needsPurge.getAndSet(false)) {
val purged = purgeSatisfied()
debug("Forced purge of " + purged + " requests from delay queue.")
}
- case e: Exception =>
+ case e: Exception =>
error("Error in long poll expiry thread: ", e)
}
}
shutdownLatch.countDown()
}
-
+
/** Add a request to be expired */
def enqueue(t: T) {
delayed.add(t)
@@ -223,23 +224,24 @@ abstract class RequestPurgatory[T <: Del
if(unsatisfied.get > CleanupThresholdSize && unsatisfied.get / delayed.size.toDouble < CleanupThresholdPrct)
forcePurge()
}
-
+
private def forcePurge() {
needsPurge.set(true)
expirationThread.interrupt()
}
-
+
/** Shutdown the expiry thread*/
def shutdown() {
- debug("Shutting down request expiry thread")
+ debug("shutting down")
running.set(false)
expirationThread.interrupt()
shutdownLatch.await()
+ debug("shut down completely")
}
-
+
/** Record the fact that we satisfied a request in the stats for the expiry queue */
def satisfyRequest(): Unit = unsatisfied.getAndDecrement()
-
+
/**
* Get the next expired event
*/
@@ -256,7 +258,7 @@ abstract class RequestPurgatory[T <: Del
}
throw new RuntimeException("This should not happen")
}
-
+
/**
* Delete all expired events from the delay queue
*/
@@ -273,5 +275,5 @@ abstract class RequestPurgatory[T <: Del
purged
}
}
-
+
}
\ No newline at end of file