You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/02/07 05:18:38 UTC
[2/3] KAFKA-330 Delete topic;
reviewed by Jun Rao, Guozhang Wang and Joel Koshy
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 483559a..613aec6 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -23,6 +23,8 @@ import kafka.common.{TopicAndPartition, StateChangeFailedException}
import kafka.utils.{ZkUtils, Logging}
import org.I0Itec.zkclient.IZkChildListener
import org.apache.log4j.Logger
+import kafka.controller.Callbacks._
+import kafka.utils.Utils._
/**
* This class represents the state machine for replicas. It defines the states that a replica can be in, and
@@ -35,15 +37,19 @@ import org.apache.log4j.Logger
* Valid previous state are NewReplica, OnlineReplica or OfflineReplica
* 3. OfflineReplica : If a replica dies, it moves to this state. This happens when the broker hosting the replica
* is down. Valid previous state are NewReplica, OnlineReplica
- * 4. NonExistentReplica: If a replica is deleted, it is moved to this state. Valid previous state is OfflineReplica
+ * 4. ReplicaDeletionStarted: If replica deletion starts, it is moved to this state. Valid previous state is OfflineReplica
+ * 5. ReplicaDeletionSuccessful: If replica responds with no error code in response to a delete replica request, it is
+ * moved to this state. Valid previous state is ReplicaDeletionStarted
+ * 6. ReplicaDeletionFailed: If replica deletion fails, it is moved to this state. Valid previous state is ReplicaDeletionStarted
+ * 7. NonExistentReplica: If a replica is deleted successfully, it is moved to this state. Valid previous state is
+ * ReplicaDeletionSuccessful
*/
class ReplicaStateMachine(controller: KafkaController) extends Logging {
private val controllerContext = controller.controllerContext
private val controllerId = controller.config.brokerId
private val zkClient = controllerContext.zkClient
- var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
- val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.controllerContext, controller.sendRequest,
- controllerId, controller.clientId)
+ var replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
+ val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
private val hasStarted = new AtomicBoolean(false)
this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
@@ -81,14 +87,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
* @param targetState The state that the replicas should be moved to
* The controller's allLeaders cache should have been updated before this
*/
- def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState) {
- info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
- try {
- brokerRequestBatch.newBatch()
- replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
- brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
- }catch {
- case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
+ def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
+ callbacks: Callbacks = (new CallbackBuilder).build) {
+ if(replicas.size > 0) {
+ info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
+ try {
+ brokerRequestBatch.newBatch()
+ replicas.foreach(r => handleStateChange(r, targetState, callbacks))
+ brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
+ }catch {
+ case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
+ }
}
}
@@ -96,38 +105,53 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
* This API exercises the replica's state machine. It ensures that every state transition happens from a legal
* previous state to the target state. Valid state transitions are:
* NonExistentReplica --> NewReplica
- * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
+ * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
+ * partition to every live broker
*
* NewReplica -> OnlineReplica
* --add the new replica to the assigned replica list if needed
*
* OnlineReplica,OfflineReplica -> OnlineReplica
- * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
+ * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
+ * partition to every live broker
*
- * NewReplica,OnlineReplica -> OfflineReplica
+ * NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionFailed -> OfflineReplica
* --send StopReplicaRequest to the replica (w/o deletion)
- * --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and UpdateMetadata request for the partition to every live broker.
+ * --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and
+ * UpdateMetadata request for the partition to every live broker.
*
- * OfflineReplica -> NonExistentReplica
+ * OfflineReplica -> ReplicaDeletionStarted
* --send StopReplicaRequest to the replica (with deletion)
*
- * @param topic The topic of the replica for which the state transition is invoked
- * @param partition The partition of the replica for which the state transition is invoked
- * @param replicaId The replica for which the state transition is invoked
+ * ReplicaDeletionStarted -> ReplicaDeletionSuccessful
+ * -- mark the state of the replica in the state machine
+ *
+ * ReplicaDeletionStarted -> ReplicaDeletionFailed
+ * -- mark the state of the replica in the state machine
+ *
+ * ReplicaDeletionSuccessful -> NonExistentReplica
+ * -- remove the replica from the in memory partition replica assignment cache
+
+
+ * @param partitionAndReplica The replica for which the state transition is invoked
* @param targetState The end state that the replica should be moved to
*/
- def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) {
+ def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,
+ callbacks: Callbacks) {
+ val topic = partitionAndReplica.topic
+ val partition = partitionAndReplica.partition
+ val replicaId = partitionAndReplica.replica
val topicAndPartition = TopicAndPartition(topic, partition)
if (!hasStarted.get)
throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " +
"to %s failed because replica state machine has not started")
.format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))
try {
- replicaState.getOrElseUpdate((topic, partition, replicaId), NonExistentReplica)
+ replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
targetState match {
case NewReplica =>
- assertValidPreviousStates(topic, partition, replicaId, List(NonExistentReplica), targetState)
+ assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
// start replica as a follower to the current leader for its partition
val leaderIsrAndControllerEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
leaderIsrAndControllerEpochOpt match {
@@ -140,22 +164,39 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
replicaAssignment)
case None => // new leader request will be sent to this replica when one gets elected
}
- replicaState.put((topic, partition, replicaId), NewReplica)
+ replicaState.put(partitionAndReplica, NewReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NewReplica"
.format(controllerId, controller.epoch, replicaId, topicAndPartition))
- case NonExistentReplica =>
- assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
+ case ReplicaDeletionStarted =>
+ assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
+ replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
// send stop replica command
- brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true)
+ brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
+ callbacks.stopReplicaResponseCallback)
+ stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to ReplicaDeletionStarted"
+ .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+ case ReplicaDeletionFailed =>
+ assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
+ replicaState.put(partitionAndReplica, ReplicaDeletionFailed)
+ stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to ReplicaDeletionFailed"
+ .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+ case ReplicaDeletionSuccessful =>
+ assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
+ replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
+ stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to ReplicaDeletionSuccessful"
+ .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+ case NonExistentReplica =>
+ assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
// remove this replica from the assigned replicas list for its partition
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
- replicaState.remove((topic, partition, replicaId))
+ replicaState.remove(partitionAndReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NonExistentReplica"
- .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+ .format(controllerId, controller.epoch, replicaId, topicAndPartition))
case OnlineReplica =>
- assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
- replicaState((topic, partition, replicaId)) match {
+ assertValidPreviousStates(partitionAndReplica,
+ List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionFailed), targetState)
+ replicaState(partitionAndReplica) match {
case NewReplica =>
// add this replica to the assigned replicas list for its partition
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
@@ -169,17 +210,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
case Some(leaderIsrAndControllerEpoch) =>
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment)
- replicaState.put((topic, partition, replicaId), OnlineReplica)
+ replicaState.put(partitionAndReplica, OnlineReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
.format(controllerId, controller.epoch, replicaId, topicAndPartition))
case None => // that means the partition was never in OnlinePartition state, this means the broker never
// started a log for that partition and does not have a high watermark value for this partition
}
-
}
- replicaState.put((topic, partition, replicaId), OnlineReplica)
+ replicaState.put(partitionAndReplica, OnlineReplica)
case OfflineReplica =>
- assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
+ assertValidPreviousStates(partitionAndReplica,
+ List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionFailed), targetState)
// send stop replica command to the replica so that it stops fetching from the leader
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
// As an optimization, the controller removes dead replicas from the ISR
@@ -191,7 +232,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
// send the shrunk ISR state change request only to the leader
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
- replicaState.put((topic, partition, replicaId), OfflineReplica)
+ replicaState.put(partitionAndReplica, OfflineReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica"
.format(controllerId, controller.epoch, replicaId, topicAndPartition))
false
@@ -214,12 +255,34 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
}
}
- private def assertValidPreviousStates(topic: String, partition: Int, replicaId: Int, fromStates: Seq[ReplicaState],
+ def areAllReplicasForTopicDeleted(topic: String): Boolean = {
+ val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
+ val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap
+ debug("Are all replicas for topic %s deleted %s".format(topic, replicaStatesForTopic))
+ replicaStatesForTopic.foldLeft(true)((deletionState, r) => deletionState && r._2 == ReplicaDeletionSuccessful)
+ }
+
+ def isAtLeastOneReplicaInDeletionStartedState(topic: String): Boolean = {
+ val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
+ val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap
+ replicaStatesForTopic.foldLeft(false)((deletionState, r) => deletionState || r._2 == ReplicaDeletionStarted)
+ }
+
+ def replicasInState(topic: String, state: ReplicaState): Set[PartitionAndReplica] = {
+ replicaState.filter(r => r._1.topic.equals(topic) && r._2 == state).keySet
+ }
+
+ def replicasInDeletionStates(topic: String): Set[PartitionAndReplica] = {
+ val deletionStates = Set(ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionFailed)
+ replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet
+ }
+
+ private def assertValidPreviousStates(partitionAndReplica: PartitionAndReplica, fromStates: Seq[ReplicaState],
targetState: ReplicaState) {
- assert(fromStates.contains(replicaState((topic, partition, replicaId))),
- "Replica %s for partition [%s,%d] should be in the %s states before moving to %s state"
- .format(replicaId, topic, partition, fromStates.mkString(","), targetState) +
- ". Instead it is in %s state".format(replicaState((topic, partition, replicaId))))
+ assert(fromStates.contains(replicaState(partitionAndReplica)),
+ "Replica %s should be in the %s states before moving to %s state"
+ .format(partitionAndReplica, fromStates.mkString(","), targetState) +
+ ". Instead it is in %s state".format(replicaState(partitionAndReplica)))
}
private def registerBrokerChangeListener() = {
@@ -235,14 +298,23 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
val topic = topicPartition.topic
val partition = topicPartition.partition
assignedReplicas.foreach { replicaId =>
+ val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
controllerContext.liveBrokerIds.contains(replicaId) match {
- case true => replicaState.put((topic, partition, replicaId), OnlineReplica)
- case false => replicaState.put((topic, partition, replicaId), OfflineReplica)
+ case true => replicaState.put(partitionAndReplica, OnlineReplica)
+ case false =>
+ // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
+ // This is required during controller failover since during controller failover a broker can go down,
+ // so the replicas on that broker should be moved to ReplicaDeletionFailed to be on the safer side.
+ replicaState.put(partitionAndReplica, ReplicaDeletionFailed)
}
}
}
}
+ def partitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = {
+ controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq
+ }
+
/**
* This is the zookeeper listener that triggers all the state transitions for a replica
*/
@@ -250,7 +322,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
ControllerStats.leaderElectionTimer.time {
try {
@@ -282,6 +354,9 @@ sealed trait ReplicaState { def state: Byte }
case object NewReplica extends ReplicaState { val state: Byte = 1 }
case object OnlineReplica extends ReplicaState { val state: Byte = 2 }
case object OfflineReplica extends ReplicaState { val state: Byte = 3 }
-case object NonExistentReplica extends ReplicaState { val state: Byte = 4 }
+case object ReplicaDeletionStarted extends ReplicaState { val state: Byte = 4}
+case object ReplicaDeletionSuccessful extends ReplicaState { val state: Byte = 5}
+case object ReplicaDeletionFailed extends ReplicaState { val state: Byte = 6}
+case object NonExistentReplica extends ReplicaState { val state: Byte = 7 }
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
new file mode 100644
index 0000000..91a446d
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import collection.mutable
+import kafka.utils.{ShutdownableThread, Logging, ZkUtils}
+import kafka.utils.Utils._
+import collection.Set
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.api.{StopReplicaResponse, RequestOrResponse}
+
+/**
+ * This manages the state machine for topic deletion.
+ * 1. TopicCommand issues topic deletion by creating a new admin path /admin/delete_topics/<topic>
+ * 2. The controller listens for child changes on /admin/delete_topic and starts topic deletion for the respective topics
+ * 3. The controller has a background thread that handles topic deletion. The purpose of having this background thread
+ * is to accommodate the TTL feature, when we have it. This thread is signaled whenever deletion for a topic needs to
+ * be started or resumed. Currently, a topic's deletion can be started only by the onPartitionDeletion callback on the
+ * controller. In the future, it can be triggered based on the configured TTL for the topic. A topic's deletion will
+ * be halted in the following scenarios -
+ * 3.1 broker hosting one of the replicas for that topic goes down
+ * 3.2 partition reassignment for partitions of that topic is in progress
+ * 3.3 preferred replica election for partitions of that topic is in progress
+ * (though this is not strictly required since it holds the controller lock for the entire duration from start to end)
+ * 4. Topic deletion is resumed when -
+ * 4.1 broker hosting one of the replicas for that topic is started
+ * 4.2 preferred replica election for partitions of that topic completes
+ * 4.3 partition reassignment for partitions of that topic completes
+ * 5. Every replica for a topic being deleted is in either of the 3 states -
+ * 5.1 TopicDeletionStarted (Replica enters TopicDeletionStarted phase when the onPartitionDeletion callback is invoked.
+ * This happens when the child change watch for /admin/delete_topics fires on the controller. As part of this state
+ * change, the controller sends StopReplicaRequests to all replicas. It registers a callback for the
+ * StopReplicaResponse when deletePartition=true thereby invoking a callback when a response for delete replica
+ * is received from every replica)
+ * 5.2 TopicDeletionSuccessful (deleteTopicStopReplicaCallback() moves replicas from
+ * TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in StopReplicaResponse)
+ * 5.3 TopicDeletionFailed. (deleteTopicStopReplicaCallback() moves replicas from
+ * TopicDeletionStarted->TopicDeletionFailed depending on the error codes in StopReplicaResponse.
+ * In general, if a broker dies and if it hosted replicas for topics being deleted, the controller marks the
+ * respective replicas in TopicDeletionFailed state in the onBrokerFailure callback. The reason is that if a
+ * broker fails before the request is sent and after the replica is in TopicDeletionStarted state,
+ * it is possible that the replica will mistakenly remain in TopicDeletionStarted state and topic deletion
+ * will not be retried when the broker comes back up.)
+ * 6. The delete topic thread marks a topic successfully deleted only if all replicas are in TopicDeletionSuccessful
+ * state and it starts the topic deletion teardown mode where it deletes all topic state from the controllerContext
+ * as well as from zookeeper. This is the only time the /brokers/topics/<topic> path gets deleted. On the other hand,
+ * if no replica is in TopicDeletionStarted state and at least one replica is in TopicDeletionFailed state, then
+ * it marks the topic for deletion retry.
+ * @param controller
+ * @param initialTopicsToBeDeleted The topics that are queued up for deletion in zookeeper at the time of controller failover
+ * @param initialHaltedTopicsForDeletion The topics for which deletion is halted due to any of the conditions mentioned in #3 above
+ */
+class TopicDeletionManager(controller: KafkaController,
+ initialTopicsToBeDeleted: Set[String] = Set.empty,
+ initialHaltedTopicsForDeletion: Set[String] = Set.empty) extends Logging {
+ val controllerContext = controller.controllerContext
+ val partitionStateMachine = controller.partitionStateMachine
+ val replicaStateMachine = controller.replicaStateMachine
+ var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted
+ var haltedTopicsForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
+ (initialHaltedTopicsForDeletion & initialTopicsToBeDeleted)
+ val deleteTopicsCond = controllerContext.controllerLock.newCondition()
+ var deleteTopicStateChanged: Boolean = false
+ var deleteTopicsThread: DeleteTopicsThread = null
+
+ /**
+ * Invoked at the end of new controller initiation
+ */
+ def start() {
+ deleteTopicsThread = new DeleteTopicsThread()
+ deleteTopicStateChanged = true
+ deleteTopicsThread.start()
+ }
+
+ /**
+ * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared
+ */
+ def shutdown() {
+ deleteTopicsThread.shutdown()
+ topicsToBeDeleted.clear()
+ haltedTopicsForDeletion.clear()
+ }
+
+ /**
+ * Invoked by the child change listener on /admin/delete_topics to queue up the topics for deletion. The topic gets added
+ * to the topicsToBeDeleted list and only gets removed from the list when the topic deletion has completed successfully
+ * i.e. all replicas of all partitions of that topic are deleted successfully.
+ * @param topics Topics that should be deleted
+ */
+ def enqueueTopicsForDeletion(topics: Set[String]) {
+ topicsToBeDeleted ++= topics
+ resumeTopicDeletionThread()
+ }
+
+ /**
+ * Invoked when any event that can possibly resume topic deletion occurs. These events include -
+ * 1. New broker starts up. Any replicas belonging to topics queued up for deletion can be deleted since the broker is up
+ * 2. Partition reassignment completes. Any partitions belonging to topics queued up for deletion finished reassignment
+ * 3. Preferred replica election completes. Any partitions belonging to topics queued up for deletion finished
+ * preferred replica election
+ * @param topics Topics for which deletion can be resumed
+ */
+ def resumeDeletionForTopics(topics: Set[String] = Set.empty) {
+ val topicsToResumeDeletion = topics & topicsToBeDeleted
+ if(topicsToResumeDeletion.size > 0) {
+ haltedTopicsForDeletion --= topicsToResumeDeletion
+ resumeTopicDeletionThread()
+ }
+ }
+
+ /**
+ * Invoked when a broker that hosts replicas for topics to be deleted goes down. Also invoked when the callback for
+ * StopReplicaResponse receives an error code for the replicas of a topic to be deleted. As part of this, the replicas
+ * are moved from ReplicaDeletionStarted to ReplicaDeletionFailed state. Also, the topic is added to the list of topics
+ * for which deletion is halted until further notice. The delete topic thread is notified so it can retry topic deletion
+ * if it has received a response for all replicas of a topic to be deleted
+ * @param replicas Replicas for which deletion has failed
+ */
+ def failReplicaDeletion(replicas: Set[PartitionAndReplica]) {
+ val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
+ if(replicasThatFailedToDelete.size > 0) {
+ val topics = replicasThatFailedToDelete.map(_.topic)
+ debug("Deletion failed for replicas %s. Halting deletion for topics %s"
+ .format(replicasThatFailedToDelete.mkString(","), topics))
+ controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionFailed)
+ haltTopicDeletion(topics)
+ resumeTopicDeletionThread()
+ }
+ }
+
+ /**
+ * Halt delete topic if -
+ * 1. replicas being down
+ * 2. partition reassignment in progress for some partitions of the topic
+ * 3. preferred replica election in progress for some partitions of the topic
+ * @param topics Topics for which deletion should be halted. No op if the topic is was not previously queued up for deletion
+ */
+ def haltTopicDeletion(topics: Set[String]) {
+ val newTopicsToHaltDeletion = topicsToBeDeleted & topics
+ haltedTopicsForDeletion ++= newTopicsToHaltDeletion
+ if(newTopicsToHaltDeletion.size > 0)
+ info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
+ }
+
+ def isTopicDeletionHalted(topic: String): Boolean = {
+ haltedTopicsForDeletion.contains(topic)
+ }
+
+ def isTopicDeletionInProgress(topic: String): Boolean = {
+ controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)
+ }
+
+ def isTopicQueuedUpForDeletion(topic: String): Boolean = {
+ topicsToBeDeleted.contains(topic)
+ }
+
+ /**
+ * Invoked by the delete-topic-thread to wait until events that either trigger, restart or halt topic deletion occur.
+ * controllerLock should be acquired before invoking this API
+ */
+ private def awaitTopicDeletionNotification() {
+ while(!deleteTopicStateChanged) {
+ info("Waiting for signal to start or continue topic deletion")
+ deleteTopicsCond.await()
+ }
+ deleteTopicStateChanged = false
+ }
+
+ /**
+ * Signals the delete-topic-thread to process topic deletion
+ */
+ private def resumeTopicDeletionThread() {
+ deleteTopicStateChanged = true
+ deleteTopicsCond.signal()
+ }
+
+ /**
+ * Invoked by the StopReplicaResponse callback when it receives no error code for a replica of a topic to be deleted.
+ * As part of this, the replicas are moved from ReplicaDeletionStarted to ReplicaDeletionSuccessful state. The delete
+ * topic thread is notified so it can tear down the topic if all replicas of a topic have been successfully deleted
+ * @param replicas Replicas that were successfully deleted by the broker
+ */
+ private def completeReplicaDeletion(replicas: Set[PartitionAndReplica]) {
+ val successfullyDeletedReplicas = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
+ debug("Deletion successfully completed for replicas %s".format(successfullyDeletedReplicas.mkString(",")))
+ controller.replicaStateMachine.handleStateChanges(successfullyDeletedReplicas, ReplicaDeletionSuccessful)
+ resumeTopicDeletionThread()
+ }
+
+ /**
+ * Topic deletion can be retried if -
+ * 1. Topic deletion is not already complete
+ * 2. Topic deletion is currently not in progress for that topic
+ * 3. Topic deletion is currently halted for that topic
+ * @param topic Topic
+ * @return Whether or not deletion can be retried for the topic
+ */
+ private def isTopicEligibleForDeletion(topic: String): Boolean = {
+ topicsToBeDeleted.contains(topic) && (!isTopicDeletionInProgress(topic) && !isTopicDeletionHalted(topic))
+ }
+
+ /**
+ * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
+ * To ensure a successful retry, reset states for respective replicas from ReplicaDeletionFailed to OfflineReplica state
+ *@param topic Topic for which deletion should be retried
+ */
+ private def markTopicForDeletionRetry(topic: String) {
+ // reset replica states from ReplicaDeletionFailed to OfflineReplica
+ val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionFailed)
+ controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
+ }
+
+ private def completeDeleteTopic(topic: String) {
+ val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
+ // controller will remove this replica from the state machine as well as its partition assignment cache
+ replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
+ val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
+ // move respective partition to OfflinePartition and NonExistentPartition state
+ partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
+ partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
+ topicsToBeDeleted -= topic
+ controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
+ controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
+ controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
+ controllerContext.removeTopic(topic)
+ }
+
+ /**
+ * This callback is invoked by the DeleteTopics thread with the list of topics to be deleted
+ * It invokes the delete partition callback for all partitions of a topic
+ */
+ private def onTopicDeletion(topics: Set[String]) {
+ info("Topic deletion callback for %s".format(topics.mkString(",")))
+ val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
+ topics.foreach { topic =>
+ onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)
+ }
+ }
+
+ /**
+ * Invoked by the onPartitionDeletion callback. It is the 2nd step of topic deletion, the first being sending
+ * UpdateMetadata requests to all brokers to start rejecting requests for deleted topics. As part of starting deletion,
+ * the topics are added to the in progress list. As long as a topic is in the in progress list, deletion for that topic
+ * is never retried. A topic is removed from the in progress list when
+ * 1. Either the topic is successfully deleted OR
+ * 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionFailed state
+ * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
+ * As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends
+ * the replicas a StopReplicaRequest (delete=true)
+ * This callback does the following things -
+ * 1. Send metadata request to all brokers excluding the topics to be deleted
+ * 2. Move all dead replicas directly to ReplicaDeletionFailed state. Also halt the deletion of respective topics if
+ * some replicas are dead since it won't complete successfully anyway
+ * 3. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
+ *@param replicasForTopicsToBeDeleted
+ */
+ private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
+ replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>
+ // send update metadata so that brokers stop serving data
+ controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+ var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))
+ val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
+ val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
+ val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
+ // move dead replicas directly to failed state
+ replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionFailed)
+ // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
+ replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
+ debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
+ controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
+ new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
+ if(deadReplicasForTopic.size > 0)
+ haltTopicDeletion(Set(topic))
+ }
+ }
+
+ /**
+ * This callback is invoked by the delete topic callback with the list of partitions for topics to be deleted
+ * It does the following -
+ * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) with all partitions except those for
+ * which the topics are being deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException
+ * 2. Move all replicas for the partitions to OfflineReplica state. This will send StopReplicaRequest to the replicas
+ * and LeaderAndIsrRequest to the leader with the shrunk ISR. When the leader replica itself is moved to OfflineReplica state,
+ * it will skip sending the LeaderAndIsrRequest since the leader will be updated to -1
+ * 3. Move all replicas to ReplicaDeletionStarted state. This will send StopReplicaRequest with deletePartition=true. And
+ * will delete all persistent data from all replicas of the respective partitions
+ */
+ private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
+ info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))
+ val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
+ startReplicaDeletion(replicasPerPartition)
+ }
+
+ private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: RequestOrResponse, replicaId: Int) {
+ val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
+ debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
+ val partitionsInError = if(stopReplicaResponse.errorCode != ErrorMapping.NoError) {
+ stopReplicaResponse.responseMap.keySet
+ } else
+ stopReplicaResponse.responseMap.filter(p => p._2 != ErrorMapping.NoError).map(_._1).toSet
+ val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
+ inLock(controllerContext.controllerLock) {
+ // move all the failed replicas to ReplicaDeletionFailed
+ failReplicaDeletion(replicasInError)
+ if(replicasInError.size != stopReplicaResponse.responseMap.size) {
+ // some replicas could have been successfully deleted
+ val deletedReplicas = stopReplicaResponse.responseMap.keySet -- partitionsInError
+ completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
+ }
+ }
+ }
+
+ class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") {
+ val zkClient = controllerContext.zkClient
+ override def doWork() {
+ inLock(controllerContext.controllerLock) {
+ awaitTopicDeletionNotification()
+ val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
+ if(topicsQueuedForDeletion.size > 0)
+ info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))
+ topicsQueuedForDeletion.foreach { topic =>
+ // if all replicas are marked as deleted successfully, then topic deletion is done
+ if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
+ // clear up all state for this topic from controller cache and zookeeper
+ completeDeleteTopic(topic)
+ info("Deletion of topic %s successfully completed".format(topic))
+ } else {
+ if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
+ // ignore since topic deletion is in progress
+ val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
+ val replicaIds = replicasInDeletionStartedState.map(_.replica)
+ val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))
+ info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),
+ partitions.mkString(","), topic))
+ } else {
+ // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
+ // TopicDeletionSuccessful. That means, there is at least one failed replica, which means topic deletion
+ // should be retried
+ val replicasInTopicDeletionFailedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionFailed)
+ // mark topic for deletion retry
+ markTopicForDeletionRetry(topic)
+ info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
+ .format(topic, replicasInTopicDeletionFailedState.mkString(",")))
+ }
+ }
+ // Try delete topic if it is eligible for deletion.
+ if(isTopicEligibleForDeletion(topic)) {
+ info("Deletion of topic %s (re)started".format(topic))
+ // topic deletion will be kicked off
+ onTopicDeletion(Set(topic))
+ } else if(isTopicDeletionHalted(topic)) {
+ info("Not retrying deletion of topic %s at this time since it is halted".format(topic))
+ }
+ }
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index d22dabd..ab04b3f 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -76,7 +76,7 @@ class BlockingChannel( val host: String,
// but let's do it to be sure.
swallow(channel.close())
swallow(channel.socket.close())
- swallow(readChannel.close())
+ if(readChannel != null) swallow(readChannel.close())
channel = null; readChannel = null; writeChannel = null
connected = false
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index bd7940b..c56ad50 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -23,9 +23,7 @@ import kafka.message._
import kafka.network._
import kafka.log._
import kafka.utils.ZKGroupTopicDirs
-import org.apache.log4j.Logger
import scala.collection._
-import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic._
import kafka.metrics.KafkaMetricsGroup
@@ -54,7 +52,7 @@ class KafkaApis(val requestChannel: RequestChannel,
private val delayedRequestMetrics = new DelayedRequestMetrics
/* following 3 data structures are updated by the update metadata request
* and is queried by the topic metadata request. */
- var leaderCache: mutable.Map[TopicAndPartition, PartitionStateInfo] =
+ var metadataCache: mutable.Map[TopicAndPartition, PartitionStateInfo] =
new mutable.HashMap[TopicAndPartition, PartitionStateInfo]()
private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
private val partitionMetadataLock = new Object
@@ -87,7 +85,16 @@ class KafkaApis(val requestChannel: RequestChannel,
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
+ // ensureTopicExists is only for client facing requests
+ private def ensureTopicExists(topic: String) = {
+ if(!metadataCache.exists { case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic.equals(topic)} )
+ throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted")
+ }
+
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
+ // ensureTopicExists is only for client facing requests
+ // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
+ // stop serving data to clients for the topic being deleted
val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
try {
val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
@@ -101,6 +108,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleStopReplicaRequest(request: RequestChannel.Request) {
+ // ensureTopicExists is only for client facing requests
+ // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
+ // stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error)
@@ -110,6 +120,9 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleUpdateMetadataRequest(request: RequestChannel.Request) {
val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
+ // ensureTopicExists is only for client facing requests
+ // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
+ // stop serving data to clients for the topic being deleted
val stateChangeLogger = replicaManager.stateChangeLogger
if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) {
val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
@@ -124,18 +137,38 @@ class KafkaApis(val requestChannel: RequestChannel,
// cache the list of alive brokers in the cluster
updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
- leaderCache.put(partitionState._1, partitionState._2)
+ metadataCache.put(partitionState._1, partitionState._2)
if(stateChangeLogger.isTraceEnabled)
stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
"sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1,
updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
}
+ // remove the topics that don't exist in the UpdateMetadata request since those are the topics that are
+ // currently being deleted by the controller
+ val topicsKnownToThisBroker = metadataCache.map{
+ case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet
+ val topicsKnownToTheController = updateMetadataRequest.partitionStateInfos.map {
+ case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet
+ val deletedTopics = topicsKnownToThisBroker -- topicsKnownToTheController
+ val partitionsToBeDeleted = metadataCache.filter {
+ case(topicAndPartition, partitionStateInfo) => deletedTopics.contains(topicAndPartition.topic)
+ }.keySet
+ partitionsToBeDeleted.foreach { partition =>
+ metadataCache.remove(partition)
+ if(stateChangeLogger.isTraceEnabled)
+ stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
+ "sent by controller %d epoch %d with correlation id %d").format(brokerId, partition,
+ updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+ }
}
val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse)))
}
def handleControlledShutdownRequest(request: RequestChannel.Request) {
+ // ensureTopicExists is only for client facing requests
+ // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
+ // stop serving data to clients for the topic being deleted
val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
@@ -245,6 +278,7 @@ class KafkaApis(val requestChannel: RequestChannel,
BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
try {
+ ensureTopicExists(topicAndPartition.topic)
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
val info =
partitionOpt match {
@@ -347,6 +381,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
val partitionData =
try {
+ ensureTopicExists(topic)
val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes)
@@ -417,6 +452,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseMap = offsetRequest.requestInfo.map(elem => {
val (topicAndPartition, partitionOffsetRequestInfo) = elem
try {
+ ensureTopicExists(topicAndPartition.topic)
// ensure leader exists
val localReplica = if(!offsetRequest.isFromDebuggingClient)
replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
@@ -524,18 +560,18 @@ class KafkaApis(val requestChannel: RequestChannel,
metadataRequest.topics.toSet
else {
partitionMetadataLock synchronized {
- leaderCache.keySet.map(_.topic)
+ metadataCache.keySet.map(_.topic)
}
}
}
val topicMetadataList =
partitionMetadataLock synchronized {
uniqueTopics.map { topic =>
- if(leaderCache.keySet.map(_.topic).contains(topic)) {
- val partitionStateInfo = leaderCache.filter(p => p._1.topic.equals(topic))
+ if(metadataCache.keySet.map(_.topic).contains(topic)) {
+ val partitionStateInfo = metadataCache.filter(p => p._1.topic.equals(topic))
val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition)
val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) =>
- val replicas = leaderCache(topicAndPartition).allReplicas
+ val replicas = metadataCache(topicAndPartition).allReplicas
var replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq
var leaderInfo: Option[Broker] = None
var isrInfo: Seq[Broker] = Nil
@@ -607,6 +643,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case (topicAndPartition, metaAndError) => {
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
try {
+ ensureTopicExists(topicAndPartition.topic)
if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
(topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
} else {
@@ -632,6 +669,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseInfo = offsetFetchRequest.requestInfo.map( t => {
val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic)
try {
+ ensureTopicExists(t.topic)
val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition)._1
payloadOpt match {
case Some(payload) => {
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 9dca55c..8c69d09 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -20,7 +20,6 @@ package kafka.server
import kafka.utils._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
-import kafka.common._
import java.net.InetAddress
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index b5719f8..19f61a9 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -19,7 +19,6 @@ package kafka.server
import scala.collection._
import kafka.utils.Logging
import kafka.common._
-import java.util.concurrent.locks.ReentrantLock
import java.io._
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f9d10d3..21bba48 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -114,7 +114,8 @@ class ReplicaManager(val config: KafkaConfig,
}
def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
- stateChangeLogger.trace("Broker %d handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId))
+ stateChangeLogger.trace("Broker %d handling stop replica (delete=%s) for partition [%s,%d]".format(localBrokerId,
+ deletePartition.toString, topic, partitionId))
val errorCode = ErrorMapping.NoError
getPartition(topic, partitionId) match {
case Some(partition) =>
@@ -126,15 +127,18 @@ class ReplicaManager(val config: KafkaConfig,
if (removedPartition != null)
removedPartition.delete() // this will delete the local log
}
- case None => //do nothing if replica no longer exists
+ case None => //do nothing if replica no longer exists. This can happen during delete topic retries
+ stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker"
+ .format(localBrokerId, deletePartition, topic, partitionId))
}
- stateChangeLogger.trace("Broker %d finished handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId))
+ stateChangeLogger.trace("Broker %d finished handling stop replica (delete=%s) for partition [%s,%d]"
+ .format(localBrokerId, deletePartition, topic, partitionId))
errorCode
}
- def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = {
+ def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicAndPartition, Short], Short) = {
replicaStateChangeLock synchronized {
- val responseMap = new collection.mutable.HashMap[(String, Int), Short]
+ val responseMap = new collection.mutable.HashMap[TopicAndPartition, Short]
if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d."
.format(localBrokerId, stopReplicaRequest.controllerEpoch) +
@@ -142,14 +146,11 @@ class ReplicaManager(val config: KafkaConfig,
(responseMap, ErrorMapping.StaleControllerEpochCode)
} else {
controllerEpoch = stopReplicaRequest.controllerEpoch
- val responseMap = new HashMap[(String, Int), Short]
// First stop fetchers for all partitions, then stop the corresponding replicas
- replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map {
- case (topic, partition) => TopicAndPartition(topic, partition)
- })
- for((topic, partitionId) <- stopReplicaRequest.partitions){
- val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
- responseMap.put((topic, partitionId), errorCode)
+ replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition(r.topic, r.partition)))
+ for(topicAndPartition <- stopReplicaRequest.partitions){
+ val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition, stopReplicaRequest.deletePartitions)
+ responseMap.put(topicAndPartition, errorCode)
}
(responseMap, ErrorMapping.NoError)
}
@@ -252,10 +253,10 @@ class ReplicaManager(val config: KafkaConfig,
val partitionsTobeLeader = partitionState
.filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
- val partitionsTobeFollower = (partitionState -- partitionsTobeLeader.keys)
+ val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap)
- if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
+ if (!partitionsToBeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/server/TopicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala
index 42e98dd..d41fd33 100644
--- a/core/src/main/scala/kafka/server/TopicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala
@@ -105,8 +105,13 @@ class TopicConfigManager(private val zkClient: ZkClient,
log.config = logConfig
lastExecutedChange = changeId
info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
- } else if (now - stat.getCtime > changeExpirationMs) {
- /* this change is now obsolete, try to delete it unless it is the last change left */
+ } else {
+ if (now - stat.getCtime > changeExpirationMs) {
+ /* this change is now obsolete, try to delete it unless it is the last change left */
+ error("Ignoring topic config change %d for topic %s since the change has expired")
+ } else {
+ error("Ignoring topic config change %d for topic %s since the topic may have been deleted")
+ }
ZkUtils.deletePath(zkClient, changeZnode)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index b189619..e5b6ff1 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,12 +17,12 @@
package kafka.server
import kafka.utils.ZkUtils._
-import kafka.utils.{Json, Utils, SystemTime, Logging}
+import kafka.utils.Utils._
+import kafka.utils.{Json, SystemTime, Logging}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.IZkDataListener
import kafka.controller.ControllerContext
import kafka.controller.KafkaController
-import kafka.common.KafkaException
/**
* This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle
@@ -44,7 +44,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
val leaderChangeListener = new LeaderChangeListener
def startup {
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
elect
}
@@ -102,7 +102,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
*/
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
leaderId = KafkaController.parseControllerId(data.toString)
info("New leader is %d".format(leaderId))
}
@@ -115,7 +115,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
.format(brokerId, dataPath))
if(amILeader)
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index b42e52b..fa86bb9 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -47,20 +47,24 @@ object ZkUtils extends Logging {
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
val ReassignPartitionsPath = "/admin/reassign_partitions"
+ val DeleteTopicsPath = "/admin/delete_topics"
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
- def getTopicPath(topic: String): String ={
+ def getTopicPath(topic: String): String = {
BrokerTopicsPath + "/" + topic
}
- def getTopicPartitionsPath(topic: String): String ={
+ def getTopicPartitionsPath(topic: String): String = {
getTopicPath(topic) + "/partitions"
}
def getTopicConfigPath(topic: String): String =
TopicConfigPath + "/" + topic
-
- def getController(zkClient: ZkClient): Int= {
+
+ def getDeleteTopicPath(topic: String): String =
+ DeleteTopicsPath + "/" + topic
+
+ def getController(zkClient: ZkClient): Int = {
readDataMaybeNull(zkClient, ControllerPath)._1 match {
case Some(controller) => KafkaController.parseControllerId(controller)
case None => throw new KafkaException("Controller doesn't exist")
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 59de1b4..d5644ea 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -27,6 +27,7 @@ import kafka.utils.{Logging, ZkUtils, TestUtils}
import kafka.common.{TopicExistsException, TopicAndPartition}
import kafka.server.{KafkaServer, KafkaConfig}
import java.io.File
+import TestUtils._
class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
@@ -161,9 +162,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}, 1000)
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
// in sync replicas should not have any replica that is not in the new assigned replicas
- checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
+ checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
- ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+ ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
servers.foreach(_.shutdown())
}
@@ -190,8 +191,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}, 1000)
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
- checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
- ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+ checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+ ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
servers.foreach(_.shutdown())
}
@@ -218,8 +219,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}, 2000)
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
- checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
- ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+ checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+ ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
servers.foreach(_.shutdown())
}
@@ -255,12 +256,12 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
reassignPartitionsCommand.reassignPartitions
// create brokers
val servers = TestUtils.createBrokerConfigs(2).map(b => TestUtils.createServer(new KafkaConfig(b)))
- TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1000)
+ TestUtils.waitUntilTrue(() => checkIfReassignPartitionPathExists(zkClient), 1000)
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
- checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
+ checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
// ensure that there are no under replicated partitions
- ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+ ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
servers.foreach(_.shutdown())
}
@@ -319,9 +320,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
try {
// wait for the update metadata request to trickle to the brokers
assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() =>
- activeServers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000))
+ activeServers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000))
assertEquals(0, partitionsRemaining.size)
- var partitionStateInfo = activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition))
+ var partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic, partition))
var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
assertEquals(0, leaderAfterShutdown)
assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
@@ -330,15 +331,15 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
partitionsRemaining = controller.shutdownBroker(1)
assertEquals(0, partitionsRemaining.size)
activeServers = servers.filter(s => s.config.brokerId == 0)
- partitionStateInfo = activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition))
+ partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic, partition))
leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
assertEquals(0, leaderAfterShutdown)
- assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+ assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
partitionsRemaining = controller.shutdownBroker(0)
assertEquals(1, partitionsRemaining.size)
// leader doesn't change since all the replicas are shut down
- assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+ assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
}
finally {
servers.foreach(_.shutdown())
@@ -389,27 +390,4 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}
}
- private def checkForPhantomInSyncReplicas(topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int]) {
- val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned)
- // in sync replicas should not have any replica that is not in the new assigned replicas
- val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet
- assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas, assignedReplicas),
- phantomInSyncReplicas.size == 0)
- }
-
- private def ensureNoUnderReplicatedPartitions(topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int],
- servers: Seq[KafkaServer]) {
- val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned)
- assertFalse("Reassigned partition [%s,%d] is underreplicated".format(topic, partitionToBeReassigned),
- inSyncReplicas.size < assignedReplicas.size)
- val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionToBeReassigned)
- assertTrue("Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned), leader.isDefined)
- val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head
- assertTrue("Reassigned partition [%s,%d] is underreplicated as reported by the leader %d".format(topic, partitionToBeReassigned, leader.get),
- leaderBroker.replicaManager.underReplicatedPartitionCount() == 0)
- }
-
- private def checkIfReassignPartitionPathExists(): Boolean = {
- ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
- }
}