You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/02/07 05:18:39 UTC
[3/3] git commit: KAFKA-330 Delete topic;
reviewed by Jun Rao, Guozhang Wang and Joel Koshy
KAFKA-330 Delete topic; reviewed by Jun Rao, Guozhang Wang and Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/167acb83
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/167acb83
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/167acb83
Branch: refs/heads/trunk
Commit: 167acb832d7f104eb2c344dcfd7b914c763d881d
Parents: fa6339c
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Feb 6 20:18:10 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Feb 6 20:18:24 2014 -0800
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 3 +-
.../main/scala/kafka/admin/TopicCommand.scala | 3 +-
.../kafka/api/ControlledShutdownResponse.scala | 4 +-
.../scala/kafka/api/LeaderAndIsrRequest.scala | 1 +
.../scala/kafka/api/StopReplicaRequest.scala | 22 +-
.../scala/kafka/api/StopReplicaResponse.scala | 18 +-
.../scala/kafka/api/UpdateMetadataRequest.scala | 1 +
.../controller/ControllerChannelManager.scala | 127 ++++---
.../kafka/controller/KafkaController.scala | 319 ++++++++++------
.../controller/PartitionLeaderSelector.scala | 16 +-
.../controller/PartitionStateMachine.scala | 106 +++++-
.../kafka/controller/ReplicaStateMachine.scala | 165 +++++---
.../kafka/controller/TopicDeletionManager.scala | 373 ++++++++++++++++++
.../scala/kafka/network/BlockingChannel.scala | 2 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 54 ++-
.../scala/kafka/server/KafkaHealthcheck.scala | 1 -
.../scala/kafka/server/OffsetCheckpoint.scala | 1 -
.../scala/kafka/server/ReplicaManager.scala | 29 +-
.../scala/kafka/server/TopicConfigManager.scala | 9 +-
.../kafka/server/ZookeeperLeaderElector.scala | 10 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 12 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 52 +--
.../unit/kafka/admin/DeleteTopicTest.scala | 377 +++++++++++++++++++
.../api/RequestResponseSerializationTest.scala | 6 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 4 +-
.../unit/kafka/server/OffsetCommitTest.scala | 32 +-
.../unit/kafka/server/SimpleFetchTest.scala | 11 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 27 +-
28 files changed, 1444 insertions(+), 341 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index a167756..36ddeb4 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -140,8 +140,7 @@ object AdminUtils extends Logging {
}
def deleteTopic(zkClient: ZkClient, topic: String) {
- zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
- zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
+ ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
}
def topicExists(zkClient: ZkClient, topic: String): Boolean =
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 842c110..65510eb 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -118,7 +118,7 @@ object TopicCommand {
val topics = getTopics(zkClient, opts)
topics.foreach { topic =>
AdminUtils.deleteTopic(zkClient, topic)
- println("Topic \"%s\" deleted.".format(topic))
+ println("Topic \"%s\" queued for deletion.".format(topic))
}
}
@@ -257,7 +257,6 @@ object TopicCommand {
val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
"if set when listing topics, only show topics that have overridden configs")
-
val options = parser.parse(args : _*)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index a80aa49..46ec3db 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -18,11 +18,9 @@
package kafka.api
import java.nio.ByteBuffer
-import collection.mutable.HashMap
-import collection.immutable.Map
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.api.ApiUtils._
-
+import collection.Set
object ControlledShutdownResponse {
def readFrom(buffer: ByteBuffer): ControlledShutdownResponse = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index a984878..0311737 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -26,6 +26,7 @@ import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.common.ErrorMapping
import kafka.network.RequestChannel.Response
+import collection.Set
object LeaderAndIsr {
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 820f0f5..68fc138 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -17,13 +17,13 @@
package kafka.api
-
import java.nio._
import kafka.api.ApiUtils._
import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException}
-import kafka.common.ErrorMapping
+import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.network.RequestChannel.Response
import kafka.utils.Logging
+import collection.Set
object StopReplicaRequest extends Logging {
@@ -44,9 +44,9 @@ object StopReplicaRequest extends Logging {
throw new InvalidRequestException("Invalid byte %d in delete partitions field. (Assuming false.)".format(x))
}
val topicPartitionPairCount = buffer.getInt
- val topicPartitionPairSet = new collection.mutable.HashSet[(String, Int)]()
+ val topicPartitionPairSet = new collection.mutable.HashSet[TopicAndPartition]()
(1 to topicPartitionPairCount) foreach { _ =>
- topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
+ topicPartitionPairSet.add(TopicAndPartition(readShortString(buffer), buffer.getInt))
}
StopReplicaRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
deletePartitions, topicPartitionPairSet.toSet)
@@ -59,10 +59,10 @@ case class StopReplicaRequest(versionId: Short,
controllerId: Int,
controllerEpoch: Int,
deletePartitions: Boolean,
- partitions: Set[(String, Int)])
+ partitions: Set[TopicAndPartition])
extends RequestOrResponse(Some(RequestKeys.StopReplicaKey), correlationId) {
- def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerId: Int, controllerEpoch: Int, correlationId: Int) = {
+ def this(deletePartitions: Boolean, partitions: Set[TopicAndPartition], controllerId: Int, controllerEpoch: Int, correlationId: Int) = {
this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId,
controllerId, controllerEpoch, deletePartitions, partitions)
}
@@ -75,9 +75,9 @@ case class StopReplicaRequest(versionId: Short,
buffer.putInt(controllerEpoch)
buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
buffer.putInt(partitions.size)
- for ((topic, partitionId) <- partitions){
- writeShortString(buffer, topic)
- buffer.putInt(partitionId)
+ for (topicAndPartition <- partitions) {
+ writeShortString(buffer, topicAndPartition.topic)
+ buffer.putInt(topicAndPartition.partition)
}
}
@@ -90,8 +90,8 @@ case class StopReplicaRequest(versionId: Short,
4 + /* controller epoch */
1 + /* deletePartitions */
4 /* partition count */
- for ((topic, partitionId) <- partitions){
- size += (ApiUtils.shortStringLength(topic)) +
+ for (topicAndPartition <- partitions){
+ size += (ApiUtils.shortStringLength(topicAndPartition.topic)) +
4 /* partition id */
}
size
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/api/StopReplicaResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
index d7e3630..c90ddee 100644
--- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
@@ -20,7 +20,7 @@ package kafka.api
import java.nio.ByteBuffer
import collection.mutable.HashMap
import collection.immutable.Map
-import kafka.common.ErrorMapping
+import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.api.ApiUtils._
@@ -30,12 +30,12 @@ object StopReplicaResponse {
val errorCode = buffer.getShort
val numEntries = buffer.getInt
- val responseMap = new HashMap[(String, Int), Short]()
+ val responseMap = new HashMap[TopicAndPartition, Short]()
for (i<- 0 until numEntries){
val topic = readShortString(buffer)
val partition = buffer.getInt
val partitionErrorCode = buffer.getShort()
- responseMap.put((topic, partition), partitionErrorCode)
+ responseMap.put(TopicAndPartition(topic, partition), partitionErrorCode)
}
new StopReplicaResponse(correlationId, responseMap.toMap, errorCode)
}
@@ -43,7 +43,7 @@ object StopReplicaResponse {
case class StopReplicaResponse(override val correlationId: Int,
- val responseMap: Map[(String, Int), Short],
+ val responseMap: Map[TopicAndPartition, Short],
val errorCode: Short = ErrorMapping.NoError)
extends RequestOrResponse(correlationId = correlationId) {
def sizeInBytes(): Int ={
@@ -53,7 +53,7 @@ case class StopReplicaResponse(override val correlationId: Int,
4 /* number of responses */
for ((key, value) <- responseMap) {
size +=
- 2 + key._1.length /* topic */ +
+ 2 + key.topic.length /* topic */ +
4 /* partition */ +
2 /* error code for this partition */
}
@@ -64,10 +64,10 @@ case class StopReplicaResponse(override val correlationId: Int,
buffer.putInt(correlationId)
buffer.putShort(errorCode)
buffer.putInt(responseMap.size)
- for ((key:(String, Int), value) <- responseMap){
- writeShortString(buffer, key._1)
- buffer.putInt(key._2)
- buffer.putShort(value)
+ for ((topicAndPartition, errorCode) <- responseMap){
+ writeShortString(buffer, topicAndPartition.topic)
+ buffer.putInt(topicAndPartition.partition)
+ buffer.putShort(errorCode)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index 54dd7bd..543e262 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -22,6 +22,7 @@ import kafka.cluster.Broker
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.network.RequestChannel.Response
+import collection.Set
object UpdateMetadataRequest {
val CurrentVersion = 0.shortValue
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ea8485b..a1ee5a7 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -25,7 +25,10 @@ import kafka.server.KafkaConfig
import collection.mutable
import kafka.api._
import org.apache.log4j.Logger
+import scala.Some
import kafka.common.TopicAndPartition
+import kafka.api.RequestOrResponse
+import collection.Set
class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging {
private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
@@ -118,10 +121,8 @@ class RequestSendThread(val controllerId: Int,
val queueItem = queue.take()
val request = queueItem._1
val callback = queueItem._2
-
var receive: Receive = null
-
- try{
+ try {
lock synchronized {
var isSendSuccessful = false
while(isRunning.get() && !isSendSuccessful) {
@@ -155,7 +156,7 @@ class RequestSendThread(val controllerId: Int,
stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %s"
.format(controllerId, controllerContext.epoch, response.correlationId, toBroker.toString()))
- if(callback != null){
+ if(callback != null) {
callback(response)
}
}
@@ -180,12 +181,12 @@ class RequestSendThread(val controllerId: Int,
}
}
-class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit,
- controllerId: Int, clientId: String)
- extends Logging {
+class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging {
+ val controllerContext = controller.controllerContext
+ val controllerId: Int = controller.config.brokerId
+ val clientId: String = controller.clientId
val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
- val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
- val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
+ val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[StopReplicaRequestInfo]]
val updateMetadataRequestMap = new mutable.HashMap[Int, mutable.HashMap[TopicAndPartition, PartitionStateInfo]]
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
@@ -200,52 +201,47 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
if(updateMetadataRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
- if(stopAndDeleteReplicaRequestMap.size > 0)
- throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
- "new one. Some StopReplica with delete state changes %s might be lost ".format(stopAndDeleteReplicaRequestMap.toString()))
}
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
- replicas: Seq[Int]) {
- brokerIds.foreach { brokerId =>
+ replicas: Seq[Int], callback: (RequestOrResponse) => Unit = null) {
+ brokerIds.filter(b => b >= 0).foreach { brokerId =>
leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo])
leaderAndIsrRequestMap(brokerId).put((topic, partition),
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
}
- addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(TopicAndPartition(topic, partition)))
+ addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
}
- def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean) {
- brokerIds.foreach { brokerId =>
- stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[(String, Int)])
- stopAndDeleteReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[(String, Int)])
- if (deletePartition) {
- val v = stopAndDeleteReplicaRequestMap(brokerId)
- stopAndDeleteReplicaRequestMap(brokerId) = v :+ (topic, partition)
- }
- else {
- val v = stopReplicaRequestMap(brokerId)
- stopReplicaRequestMap(brokerId) = v :+ (topic, partition)
- }
+ def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
+ callback: (RequestOrResponse, Int) => Unit = null) {
+ brokerIds.filter(b => b >= 0).foreach { brokerId =>
+ stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
+ val v = stopReplicaRequestMap(brokerId)
+ if(callback != null)
+ stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
+ deletePartition, (r: RequestOrResponse) => { callback(r, brokerId) })
+ else
+ stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
+ deletePartition)
}
}
+ /* Send UpdateMetadataRequest to the given brokers for all partitions except those being deleted as part of delete topic
+ *
+ */
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
- partitions:scala.collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
- val partitionList =
- if(partitions.isEmpty) {
- controllerContext.partitionLeadershipInfo.keySet
- } else {
- partitions
- }
+ callback: (RequestOrResponse) => Unit = null) {
+ val partitionList = controllerContext.partitionLeadershipInfo.keySet.dropWhile(
+ p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
partitionList.foreach { partition =>
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
- brokerIds.foreach { brokerId =>
+ brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo)
}
@@ -269,7 +265,7 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
p._2.leaderIsrAndControllerEpoch, correlationId, broker,
p._1._1, p._1._2))
}
- sendRequest(broker, leaderAndIsrRequest, null)
+ controller.sendRequest(broker, leaderAndIsrRequest, null)
}
leaderAndIsrRequestMap.clear()
updateMetadataRequestMap.foreach { m =>
@@ -280,24 +276,23 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " +
"correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
correlationId, broker, p._1)))
- sendRequest(broker, updateMetadataRequest, null)
+ controller.sendRequest(broker, updateMetadataRequest, null)
}
updateMetadataRequestMap.clear()
- Seq((stopReplicaRequestMap, false), (stopAndDeleteReplicaRequestMap, true)) foreach {
- case(m, deletePartitions) => {
- m foreach {
- case(broker, replicas) =>
- if (replicas.size > 0) {
- debug("The stop replica request (delete = %s) sent to broker %d is %s"
- .format(deletePartitions, broker, replicas.mkString(",")))
- val stopReplicaRequest = new StopReplicaRequest(deletePartitions, Set.empty[(String, Int)] ++ replicas, controllerId,
- controllerEpoch, correlationId)
- sendRequest(broker, stopReplicaRequest, null)
- }
- }
- m.clear()
+ stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
+ val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet
+ val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet
+ debug("The stop replica request (delete = true) sent to broker %d is %s"
+ .format(broker, stopReplicaWithDelete.mkString(",")))
+ debug("The stop replica request (delete = false) sent to broker %d is %s"
+ .format(broker, stopReplicaWithoutDelete.mkString(",")))
+ replicaInfoList.foreach { r =>
+ val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,
+ Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId)
+ controller.sendRequest(broker, stopReplicaRequest, r.callback)
}
}
+ stopReplicaRequestMap.clear()
}
}
@@ -306,3 +301,35 @@ case class ControllerBrokerStateInfo(channel: BlockingChannel,
messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
requestSendThread: RequestSendThread)
+case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: (RequestOrResponse) => Unit = null)
+
+class Callbacks private (var leaderAndIsrResponseCallback:(RequestOrResponse) => Unit = null,
+ var updateMetadataResponseCallback:(RequestOrResponse) => Unit = null,
+ var stopReplicaResponseCallback:(RequestOrResponse, Int) => Unit = null)
+
+object Callbacks {
+ class CallbackBuilder {
+ var leaderAndIsrResponseCbk:(RequestOrResponse) => Unit = null
+ var updateMetadataResponseCbk:(RequestOrResponse) => Unit = null
+ var stopReplicaResponseCbk:(RequestOrResponse, Int) => Unit = null
+
+ def leaderAndIsrCallback(cbk: (RequestOrResponse) => Unit): CallbackBuilder = {
+ leaderAndIsrResponseCbk = cbk
+ this
+ }
+
+ def updateMetadataCallback(cbk: (RequestOrResponse) => Unit): CallbackBuilder = {
+ updateMetadataResponseCbk = cbk
+ this
+ }
+
+ def stopReplicaCallback(cbk: (RequestOrResponse, Int) => Unit): CallbackBuilder = {
+ stopReplicaResponseCbk = cbk
+ this
+ }
+
+ def build: Callbacks = {
+ new Callbacks(leaderAndIsrResponseCbk, updateMetadataResponseCbk, stopReplicaResponseCbk)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a0267ae..d812cb4 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -17,42 +17,42 @@
package kafka.controller
import collection._
-import collection.immutable.Set
+import collection.Set
import com.yammer.metrics.core.Gauge
import java.lang.{IllegalStateException, Object}
import java.util.concurrent.TimeUnit
-import kafka.admin.{AdminOperationException, PreferredReplicaLeaderElectionCommand}
+import kafka.admin.PreferredReplicaLeaderElectionCommand
import kafka.api._
import kafka.cluster.Broker
import kafka.common._
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
import kafka.utils.ZkUtils._
-import kafka.utils.{Json, Utils, ZkUtils, Logging, KafkaScheduler}
+import kafka.utils._
+import kafka.utils.Utils._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
import java.util.concurrent.atomic.AtomicInteger
+import org.apache.log4j.Logger
import scala.Some
import kafka.common.TopicAndPartition
-import org.apache.log4j.Logger
+import java.util.concurrent.locks.ReentrantLock
class ControllerContext(val zkClient: ZkClient,
- val zkSessionTimeout: Int,
- var controllerChannelManager: ControllerChannelManager = null,
- val controllerLock: Object = new Object,
- var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
- val brokerShutdownLock: Object = new Object,
- var epoch: Int = KafkaController.InitialControllerEpoch - 1,
- var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1,
- val correlationId: AtomicInteger = new AtomicInteger(0),
- var allTopics: Set[String] = Set.empty,
- var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty,
- var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty,
- var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
- new mutable.HashMap,
- var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] =
- new mutable.HashSet) {
+ val zkSessionTimeout: Int) {
+ var controllerChannelManager: ControllerChannelManager = null
+ val controllerLock: ReentrantLock = new ReentrantLock()
+ var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
+ val brokerShutdownLock: Object = new Object
+ var epoch: Int = KafkaController.InitialControllerEpoch - 1
+ var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
+ val correlationId: AtomicInteger = new AtomicInteger(0)
+ var allTopics: Set[String] = Set.empty
+ var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty
+ var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
+ var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
+ var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet
private var liveBrokersUnderlying: Set[Broker] = Set.empty
private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
@@ -86,9 +86,37 @@ class ControllerContext(val zkClient: ZkClient,
}.flatten.toSet
}
+ def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
+ partitionReplicaAssignment
+ .filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }
+ .map { case(topicAndPartition, replicas) =>
+ replicas.map { r =>
+ new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
+ }
+ }.flatten.toSet
+ }
+
+ def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = {
+ partitionReplicaAssignment
+ .filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }.keySet
+ }
+
def allLiveReplicas(): Set[PartitionAndReplica] = {
replicasOnBrokers(liveBrokerIds)
}
+
+ def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = {
+ partitions.map { p =>
+ val replicas = partitionReplicaAssignment(p)
+ replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))
+ }.flatten
+ }
+
+ def removeTopic(topic: String) = {
+ partitionLeadershipInfo = partitionLeadershipInfo.dropWhile(p => p._1.topic.equals(topic))
+ partitionReplicaAssignment = partitionReplicaAssignment.dropWhile(p => p._1.topic.equals(topic))
+ allTopics -= topic
+ }
}
trait KafkaControllerMBean {
@@ -128,18 +156,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
private var isRunning = true
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
- private val partitionStateMachine = new PartitionStateMachine(this)
- private val replicaStateMachine = new ReplicaStateMachine(this)
+ val partitionStateMachine = new PartitionStateMachine(this)
+ val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
onControllerResignation, config.brokerId)
// have a separate scheduler for the controller to be able to start and stop independently of the
// kafka server
private val autoRebalanceScheduler = new KafkaScheduler(1)
+ var deleteTopicManager: TopicDeletionManager = null
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
- private val brokerRequestBatch = new ControllerBrokerRequestBatch(controllerContext, sendRequest, this.config.brokerId, this.clientId)
+ private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
registerControllerChangedListener()
newGauge(
@@ -153,7 +182,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
"OfflinePartitionsCount",
new Gauge[Int] {
def value(): Int = {
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
if (!isActive())
0
else
@@ -167,7 +196,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
"PreferredReplicaImbalanceCount",
new Gauge[Int] {
def value(): Int = {
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
if (!isActive())
0
else
@@ -200,7 +229,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.brokerShutdownLock synchronized {
info("Shutting down broker " + id)
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
@@ -211,7 +240,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] =
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
controllerContext.partitionsOnBroker(id)
.map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
}
@@ -219,7 +248,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
allPartitionsAndReplicationFactorOnBroker.foreach {
case(topicAndPartition, replicationFactor) =>
// Move leadership serially to relinquish lock.
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
// If the broker leads the topic partition, transition the leader and update isr. Updates zk and
@@ -231,7 +260,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
// Stop the replica first. The state change below initiates ZK changes which should take some time
// before which the stop replica request should be completed (in most cases)
brokerRequestBatch.newBatch()
- brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, topicAndPartition.partition, deletePartition = false)
+ brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
+ topicAndPartition.partition, deletePartition = false)
brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
// If the broker is a follower, updates the isr in ZK and notifies the current leader
@@ -242,7 +272,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
}
- def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized {
+ def replicatedPartitionsBrokerLeads() = inLock(controllerContext.controllerLock) {
trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
controllerContext.partitionLeadershipInfo.filter {
case (topicAndPartition, leaderIsrAndControllerEpoch) =>
@@ -283,8 +313,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
Utils.registerMBean(this, KafkaController.MBeanName)
info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
- initializeAndMaybeTriggerPartitionReassignment()
- initializeAndMaybeTriggerPreferredReplicaElection()
+ maybeTriggerPartitionReassignment()
+ maybeTriggerPreferredReplicaElection()
/* send partition leadership info to all live brokers */
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
if (config.autoLeaderRebalanceEnable) {
@@ -293,6 +323,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
}
+ deleteTopicManager.start()
}
else
info("Controller has been shut down, aborting startup/failover")
@@ -303,7 +334,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
* required to clean up internal controller data structures
*/
def onControllerResignation() {
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
+ autoRebalanceScheduler.shutdown()
+ deleteTopicManager.shutdown()
Utils.unregisterMBean(KafkaController.MBeanName)
partitionStateMachine.shutdown()
replicaStateMachine.shutdown()
@@ -318,7 +351,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
* Returns true if this broker is the current controller.
*/
def isActive(): Boolean = {
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
controllerContext.controllerChannelManager != null
}
}
@@ -338,7 +371,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
*/
def onBrokerStartup(newBrokers: Seq[Int]) {
info("New broker startup callback for %s".format(newBrokers.mkString(",")))
-
val newBrokersSet = newBrokers.toSet
// send update metadata request for all partitions to the newly restarted brokers. In cases of controlled shutdown
// leaders will not be elected when a new broker comes up. So at least in the common controlled shutdown case, the
@@ -346,16 +378,25 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
sendUpdateMetadataRequest(newBrokers)
// the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
// supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
- replicaStateMachine.handleStateChanges(controllerContext.replicasOnBrokers(newBrokersSet), OnlineReplica)
+ val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
+ replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
// when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
// to see if these brokers can become leaders for some/all of those
partitionStateMachine.triggerOnlinePartitionStateChange()
// check if reassignment of some partitions need to be restarted
- val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter{
- case (topicAndPartition, reassignmentContext) =>
- reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
+ val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
+ case (topicAndPartition, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
}
partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
+ // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
+ // on the newly restarted brokers, there is a chance that topic deletion can resume
+ val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
+ if(replicasForTopicsToBeDeleted.size > 0) {
+ info(("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. " +
+ "Signaling restart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","),
+ deleteTopicManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
+ deleteTopicManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
+ }
}
/**
@@ -371,20 +412,30 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
*/
def onBrokerFailure(deadBrokers: Seq[Int]) {
info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
-
val deadBrokersThatWereShuttingDown =
deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
-
val deadBrokersSet = deadBrokers.toSet
// trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
- deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader)).keySet
+ deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
+ !deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
// trigger OnlinePartition state changes for offline or new partitions
partitionStateMachine.triggerOnlinePartitionStateChange()
+ // filter out the replicas that belong to topics that are being deleted
+ var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
+ val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
// handle dead replicas
- replicaStateMachine.handleStateChanges(controllerContext.replicasOnBrokers(deadBrokersSet), OfflineReplica)
+ replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
+ // check if topic deletion state for the dead replicas needs to be updated
+ val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
+ if(replicasForTopicsToBeDeleted.size > 0) {
+ // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
+ // deleted when the broker is down. This will prevent the replica from being in TopicDeletionStarted state indefinitely
+ // since topic deletion cannot be retried if at least one replica is in TopicDeletionStarted state
+ deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
+ }
}
/**
@@ -401,7 +452,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
/**
- * This callback is invoked by the partition state machine's partition change listener with the list of new partitions.
+ * This callback is invoked by the topic change callback with the list of failed brokers as input.
* It does the following -
* 1. Move the newly created partitions to the NewPartition state
* 2. Move the newly created partitions from NewPartition->OnlinePartition state
@@ -409,9 +460,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
info("New partition creation callback for %s".format(newPartitions.mkString(",")))
partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
- replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), NewReplica)
+ replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
- replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), OnlineReplica)
+ replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
}
/**
@@ -493,8 +544,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
removePartitionFromReassignedPartitions(topicAndPartition)
info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
- //12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
- sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
+ //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
+ sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+ // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
+ deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
}
}
@@ -528,6 +581,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
// first register ISR change listener
watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
+ // halt topic deletion for the partitions being reassigned
+ deleteTopicManager.haltTopicDeletion(Set(topic))
onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
} else {
// some replica in RAR is not alive. Fail partition reassignment
@@ -550,11 +605,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
try {
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
+ deleteTopicManager.haltTopicDeletion(partitions.map(_.topic))
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
} catch {
case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
} finally {
removePartitionsFromPreferredReplicaElection(partitions)
+ deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
}
}
@@ -564,7 +621,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
* elector
*/
def startup() = {
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
info("Controller starting up");
registerSessionExpirationListener()
isRunning = true
@@ -579,7 +636,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
* shuts down the controller channel manager, if one exists (i.e. if it was the current controller)
*/
def shutdown() = {
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
isRunning = false
partitionStateMachine.shutdown()
replicaStateMachine.shutdown()
@@ -633,6 +690,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
private def initializeControllerContext() {
+ // update controller cache with delete topic information
controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
@@ -642,42 +700,74 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
updateLeaderAndIsrCache()
// start the channel manager
startChannelManager()
+ initializePreferredReplicaElection()
+ initializePartitionReassignment()
+ initializeTopicDeletion()
info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds))
info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
}
- private def initializeAndMaybeTriggerPartitionReassignment() {
+ private def initializePreferredReplicaElection() {
+ // initialize preferred replica election state
+ val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
+ // check if they are already completed or topic was deleted
+ val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
+ val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
+ val topicDeleted = replicasOpt.isEmpty
+ val successful =
+ if(!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicasOpt.get.head else false
+ successful || topicDeleted
+ }
+ controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
+ controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
+ info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
+ info("Partitions that completed preferred replica election: %s".format(partitionsThatCompletedPreferredReplicaElection.mkString(",")))
+ info("Resuming preferred replica election for partitions: %s".format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
+ }
+
+ private def initializePartitionReassignment() {
// read the partitions being reassigned from zookeeper path /admin/reassign_partitions
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
- // check if they are already completed
- val reassignedPartitions = partitionsBeingReassigned.filter(partition =>
- controllerContext.partitionReplicaAssignment(partition._1) == partition._2.newReplicas).map(_._1)
+ // check if they are already completed or topic was deleted
+ val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
+ val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
+ val topicDeleted = replicasOpt.isEmpty
+ val successful = if(!topicDeleted) replicasOpt.get == partition._2.newReplicas else false
+ topicDeleted || successful
+ }.map(_._1)
reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p))
var partitionsToReassign: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
partitionsToReassign ++= partitionsBeingReassigned
partitionsToReassign --= reassignedPartitions
-
+ controllerContext.partitionsBeingReassigned ++= partitionsToReassign
info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString()))
info("Partitions already reassigned: %s".format(reassignedPartitions.toString()))
info("Resuming reassignment of partitions: %s".format(partitionsToReassign.toString()))
+ }
- partitionsToReassign.foreach { topicPartitionToReassign =>
+ private def initializeTopicDeletion() {
+ val topicsQueuedForDeletion = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.DeleteTopicsPath).toSet
+ val replicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter(r =>
+ r._2.foldLeft(false)((res,r) => res || !controllerContext.liveBrokerIds.contains(r)))
+ val topicsWithReplicasOnDeadBrokers = replicasOnDeadBrokers.map(_._1.topic).toSet
+ val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
+ val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
+ val haltedTopicsForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress |
+ topicsForWhichPreferredReplicaElectionIsInProgress
+ info("List of topics to be deleted: %s".format(topicsQueuedForDeletion.mkString(",")))
+ info("List of topics halted for deletion: %s".format(haltedTopicsForDeletion.mkString(",")))
+ // initialize the topic deletion manager
+ deleteTopicManager = new TopicDeletionManager(this, topicsQueuedForDeletion, haltedTopicsForDeletion)
+ }
+
+ private def maybeTriggerPartitionReassignment() {
+ controllerContext.partitionsBeingReassigned.foreach { topicPartitionToReassign =>
initiateReassignReplicasForTopicPartition(topicPartitionToReassign._1, topicPartitionToReassign._2)
}
}
- private def initializeAndMaybeTriggerPreferredReplicaElection() {
- // read the partitions undergoing preferred replica election from zookeeper path
- val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
- // check if they are already completed
- val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter(partition =>
- controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == controllerContext.partitionReplicaAssignment(partition).head)
- controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
- controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
- info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
- info("Partitions that completed preferred replica election: %s".format(partitionsThatCompletedPreferredReplicaElection.mkString(",")))
- info("Resuming preferred replica election for partitions: %s".format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
+ private def maybeTriggerPreferredReplicaElection() {
onPreferredReplicaElection(controllerContext.partitionsUndergoingPreferredReplicaElection.toSet)
}
@@ -736,13 +826,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
val topic = topicAndPartition.topic
val partition = topicAndPartition.partition
// first move the replica to offline state (the controller removes it from the ISR)
- oldReplicas.foreach { replica =>
- replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topic, partition, replica)), OfflineReplica)
- }
+ val replicasToBeDeleted = oldReplicas.map(r => PartitionAndReplica(topic, partition, r))
+ replicaStateMachine.handleStateChanges(replicasToBeDeleted, OfflineReplica)
// send stop replica command to the old replicas
- oldReplicas.foreach { replica =>
- replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topic, partition, replica)), NonExistentReplica)
- }
+ replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionStarted)
+ // TODO: Eventually partition reassignment could use a callback that does retries if deletion failed
+ replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionSuccessful)
+ replicaStateMachine.handleStateChanges(replicasToBeDeleted, NonExistentReplica)
}
private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
@@ -838,22 +928,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
}
- private def getAllReplicasForPartition(partitions: Set[TopicAndPartition]): Set[PartitionAndReplica] = {
- partitions.map { p =>
- val replicas = controllerContext.partitionReplicaAssignment(p)
- replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))
- }.flatten
- }
-
/**
* Send the leader information for selected partitions to selected brokers so that they can correctly respond to
* metadata requests
* @param brokers The brokers that the update metadata request should be sent to
- * @param partitions The partitions for which the metadata is to be sent
*/
- private def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
+ def sendUpdateMetadataRequest(brokers: Seq[Int]) {
brokerRequestBatch.newBatch()
- brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
+ brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers)
brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
}
@@ -979,7 +1061,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
@throws(classOf[Exception])
def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
}
@@ -991,10 +1073,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
trace("checking need to trigger partition rebalance")
// get all the active brokers
var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
- controllerContext.controllerLock synchronized {
- preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy {
- case(topicAndPartition, assignedReplicas) => assignedReplicas.head
- }
+ inLock(controllerContext.controllerLock) {
+ preferredReplicasForTopicsByBrokers =
+ controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
+ case(topicAndPartition, assignedReplicas) => assignedReplicas.head
+ }
}
debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
// for each broker, check if a preferred replica election needs to be triggered
@@ -1002,7 +1085,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
case(leaderBroker, topicAndPartitionsForBroker) => {
var imbalanceRatio: Double = 0
var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
topicsNotInPreferredReplica =
topicAndPartitionsForBroker.filter {
case(topicPartition, replicas) => {
@@ -1018,7 +1101,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
// that need to be on this broker
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
// do this check only if the broker is live and there are no partitions being reassigned currently
// and preferred replica election is not in progress
if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
@@ -1070,11 +1153,19 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
.format(dataPath, data))
val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
- val newPartitions = partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
- newPartitions.foreach { partitionToBeReassigned =>
- controllerContext.controllerLock synchronized {
- val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
- controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
+ val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
+ partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
+ }
+ partitionsToBeReassigned.foreach { partitionToBeReassigned =>
+ inLock(controllerContext.controllerLock) {
+ if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
+ error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
+ .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
+ controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
+ } else {
+ val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
+ controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
+ }
}
}
}
@@ -1102,11 +1193,11 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
*/
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
- try {
- controllerContext.controllerLock synchronized {
- debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
+ inLock(controllerContext.controllerLock) {
+ debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
+ val topicAndPartition = TopicAndPartition(topic, partition)
+ try {
// check if this partition is still being reassigned or not
- val topicAndPartition = TopicAndPartition(topic, partition)
controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
case Some(reassignedPartitionContext) =>
// need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
@@ -1131,9 +1222,9 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
}
case None =>
}
+ } catch {
+ case e: Throwable => error("Error while handling partition reassignment", e)
}
- }catch {
- case e: Throwable => error("Error while handling partition reassignment", e)
}
}
@@ -1163,13 +1254,19 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
def handleDataChange(dataPath: String, data: Object) {
debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
.format(dataPath, data.toString))
- val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
-
- controllerContext.controllerLock synchronized {
- info("These partitions are already undergoing preferred replica election: %s"
- .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
- val newPartitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
- controller.onPreferredReplicaElection(newPartitions)
+ inLock(controllerContext.controllerLock) {
+ val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
+ if(controllerContext.partitionsUndergoingPreferredReplicaElection.size > 0)
+ info("These partitions are already undergoing preferred replica election: %s"
+ .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
+ val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
+ val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
+ if(partitionsForTopicsToBeDeleted.size > 0) {
+ error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
+ .format(partitionsForTopicsToBeDeleted))
+ }
+ else
+ controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
}
}
@@ -1194,7 +1291,7 @@ class ControllerEpochListener(controller: KafkaController) extends IZkDataListen
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
debug("Controller epoch listener fired with new epoch " + data.toString)
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
// read the epoch path to get the zk version
readControllerEpochFromZookeeper()
}
@@ -1222,7 +1319,11 @@ class ControllerEpochListener(controller: KafkaController) extends IZkDataListen
case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)
-case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
+case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
+ override def toString(): String = {
+ "[Topic=%s,Partition=%d,Replica=%d]".format(topic, partition, replica)
+ }
+}
case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
override def toString(): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index fd9200f..fa29bbe 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -48,24 +48,24 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
case Some(assignedReplicas) =>
- val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+ val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
case true =>
debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
- .format(topicAndPartition, liveAssignedReplicasToThisPartition.mkString(",")))
- liveAssignedReplicasToThisPartition.isEmpty match {
+ .format(topicAndPartition, liveAssignedReplicas.mkString(",")))
+ liveAssignedReplicas.isEmpty match {
case true =>
throw new NoReplicaOnlineException(("No replica for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas))
case false =>
ControllerStats.uncleanLeaderElectionRate.mark()
- val newLeader = liveAssignedReplicasToThisPartition.head
+ val newLeader = liveAssignedReplicas.head
warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss."
- .format(topicAndPartition, newLeader, liveAssignedReplicasToThisPartition.mkString(",")))
+ .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
}
case false =>
@@ -75,7 +75,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
}
info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
- (newLeaderAndIsr, liveAssignedReplicasToThisPartition)
+ (newLeaderAndIsr, liveAssignedReplicas)
case None =>
throw new NoReplicaOnlineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it")
}
@@ -106,10 +106,10 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex
case None =>
reassignedInSyncReplicas.size match {
case 0 =>
- throw new StateChangeFailedException("List of reassigned replicas for partition " +
+ throw new NoReplicaOnlineException("List of reassigned replicas for partition " +
" %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
case _ =>
- throw new StateChangeFailedException("None of the reassigned replicas for partition " +
+ throw new NoReplicaOnlineException("None of the reassigned replicas for partition " +
"%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ac4262a..487d4c8 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -26,6 +26,8 @@ import kafka.utils.{Logging, ZkUtils}
import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.apache.log4j.Logger
+import kafka.controller.Callbacks.CallbackBuilder
+import kafka.utils.Utils._
/**
* This class represents the state machine for partitions. It defines the states that a partition can be in, and
@@ -44,8 +46,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
private val controllerId = controller.config.brokerId
private val zkClient = controllerContext.zkClient
var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
- val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.controllerContext, controller.sendRequest,
- controllerId, controller.clientId)
+ val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
private val hasStarted = new AtomicBoolean(false)
private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
@@ -68,6 +69,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
// register topic and partition change listeners
def registerListeners() {
registerTopicChangeListener()
+ registerDeleteTopicListener()
}
/**
@@ -85,10 +87,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
def triggerOnlinePartitionStateChange() {
try {
brokerRequestBatch.newBatch()
- // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state
- for((topicAndPartition, partitionState) <- partitionState) {
+ // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
+ // that belong to topics to be deleted
+ for((topicAndPartition, partitionState) <- partitionState
+ if(!controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic))) {
if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
- handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector)
+ handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
+ (new CallbackBuilder).build)
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
} catch {
@@ -97,18 +102,23 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
}
+ def partitionsInState(state: PartitionState): Set[TopicAndPartition] = {
+ partitionState.filter(p => p._2 == state).keySet
+ }
+
/**
* This API is invoked by the partition change zookeeper listener
* @param partitions The list of partitions that need to be transitioned to the target state
* @param targetState The state that the partitions should be moved to
*/
def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
- leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector) {
+ leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector,
+ callbacks: Callbacks = (new CallbackBuilder).build) {
info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
try {
brokerRequestBatch.newBatch()
partitions.foreach { topicAndPartition =>
- handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
+ handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
}catch {
@@ -131,7 +141,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
* --select new leader and isr for this partition and a set of replicas to receive the LeaderAndIsr request, and write leader and isr to ZK
* --for this partition, send LeaderAndIsr request to every receiving replica and UpdateMetadata request to every live broker
*
- * NewPartition,OnlinePartition -> OfflinePartition
+ * NewPartition,OnlinePartition,OfflinePartition -> OfflinePartition
* --nothing other than marking partition state as Offline
*
* OfflinePartition -> NonExistentPartition
@@ -141,7 +151,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
* @param targetState The end state that the partition should be moved to
*/
private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
- leaderSelector: PartitionLeaderSelector) {
+ leaderSelector: PartitionLeaderSelector,
+ callbacks: Callbacks) {
val topicAndPartition = TopicAndPartition(topic, partition)
if (!hasStarted.get)
throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
@@ -178,7 +189,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
// post: partition has a leader
case OfflinePartition =>
// pre: partition should be in New or Online state
- assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
+ assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
// should be called when the leader for a partition is no longer alive
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Online to Offline"
.format(controllerId, controller.epoch, topicAndPartition))
@@ -354,6 +365,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), new AddPartitionsListener(topic))
}
+ private def registerDeleteTopicListener() = {
+ zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, new DeleteTopicsListener())
+ }
+
private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
val topicAndPartition = TopicAndPartition(topic, partition)
ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
@@ -373,7 +388,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
@throws(classOf[Exception])
def handleChildChange(parentPath : String, children : java.util.List[String]) {
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
try {
val currentChildren = {
@@ -383,7 +398,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
val newTopics = currentChildren -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- currentChildren
- // val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1))
controllerContext.allTopics = currentChildren
val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
@@ -397,12 +411,62 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
} catch {
case e: Throwable => error("Error while handling new topic", e )
}
- // TODO: kafka-330 Handle deleted topics
}
}
}
}
+ /**
+ * Delete topics includes the following operations -
+ * 1. Add the topic to be deleted to the delete topics cache, only if the topic exists
+ * 2. If there are topics to be deleted, it signals the delete topic thread
+ */
+ class DeleteTopicsListener() extends IZkChildListener with Logging {
+ this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
+ val zkClient = controllerContext.zkClient
+
+ /**
+ * Invoked when a topic is being deleted
+ * @throws Exception On any error.
+ */
+ @throws(classOf[Exception])
+ def handleChildChange(parentPath : String, children : java.util.List[String]) {
+ inLock(controllerContext.controllerLock) {
+ var topicsToBeDeleted = {
+ import JavaConversions._
+ (children: Buffer[String]).toSet
+ }
+ debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
+ val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
+ if(nonExistentTopics.size > 0)
+ warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
+ topicsToBeDeleted --= nonExistentTopics
+ if(topicsToBeDeleted.size > 0) {
+ info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
+ // add topic to deletion list
+ controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
+ // halt if other state changes are in progress
+ topicsToBeDeleted.foreach { topic =>
+ val preferredReplicaElectionInProgress =
+ controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
+ val partitionReassignmentInProgress =
+ controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
+ if(preferredReplicaElectionInProgress | partitionReassignmentInProgress)
+ controller.deleteTopicManager.haltTopicDeletion(Set(topic))
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * @throws Exception
+ * On any error.
+ */
+ @throws(classOf[Exception])
+ def handleDataDeleted(dataPath: String) {
+ }
+ }
class AddPartitionsListener(topic: String) extends IZkDataListener with Logging {
@@ -410,15 +474,21 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
@throws(classOf[Exception])
def handleDataChange(dataPath : String, data: Object) {
- controllerContext.controllerLock synchronized {
+ inLock(controllerContext.controllerLock) {
try {
info("Add Partition triggered " + data.toString + " for path " + dataPath)
val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
- val partitionsRemainingToBeAdded = partitionReplicaAssignment.filter(p =>
+ val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))
- info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded))
- if (partitionsRemainingToBeAdded.size > 0)
- controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet)
+ if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
+ error("Skipping adding partitions %s for topic %s since it is currently being deleted"
+ .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+ else {
+ if (partitionsToBeAdded.size > 0) {
+ info("New partitions to be added %s".format(partitionsToBeAdded))
+ controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
+ }
+ }
} catch {
case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
}