You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/05/06 19:38:43 UTC
git commit: kafka-1397; delete topic is not working;
patched by Timothy Chen; reviewed by Neha Narkhede and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 3f7a9061b -> 44c39c4ea
kafka-1397; delete topic is not working; patched by Timothy Chen; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/44c39c4e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/44c39c4e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/44c39c4e
Branch: refs/heads/trunk
Commit: 44c39c4ea48f365445b8c08ce57d0f16b15f0d0b
Parents: 3f7a906
Author: Timothy Chen <tn...@gmail.com>
Authored: Tue May 6 10:36:45 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue May 6 10:37:49 2014 -0700
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 18 +-
.../controller/ControllerChannelManager.scala | 14 +-
.../kafka/controller/KafkaController.scala | 8 +-
.../kafka/controller/ReplicaStateMachine.scala | 10 +-
.../kafka/controller/TopicDeletionManager.scala | 60 +-
core/src/main/scala/kafka/log/LogManager.scala | 1 -
.../scala/kafka/server/ReplicaManager.scala | 11 +-
.../scala/kafka/utils/ShutdownableThread.scala | 27 +-
.../unit/kafka/admin/DeleteTopicTest.scala | 612 ++++++++-----------
.../kafka/server/DynamicConfigChangeTest.scala | 17 +-
.../unit/kafka/server/ServerShutdownTest.scala | 2 -
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +
12 files changed, 376 insertions(+), 406 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 36ddeb4..b5d8714 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -87,7 +87,17 @@ object AdminUtils extends Logging {
ret.toMap
}
- def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") {
+
+ /**
+ * Add partitions to existing topic with optional replica assignment
+ *
+ * @param zkClient Zookeeper client
+ * @param topic Topic for adding partitions to
+ * @param numPartitions Number of partitions to be set
+ * @param replicaAssignmentStr Manual replica assignment
+ * @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
+ */
+ def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "", checkBrokerAvailable: Boolean = true) {
val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))
@@ -102,7 +112,7 @@ object AdminUtils extends Logging {
val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "")
AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
else
- getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
+ getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size, checkBrokerAvailable)
// check if manual assignment has the right replication factor
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size))
@@ -117,7 +127,7 @@ object AdminUtils extends Logging {
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
}
- def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = {
+ def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = {
var partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
var partitionId = startPartitionId
@@ -128,7 +138,7 @@ object AdminUtils extends Logging {
throw new AdminOperationException("replication factor must be larger than 0")
if (brokerList.size != brokerList.toSet.size)
throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
- if (!brokerList.toSet.subsetOf(availableBrokerList))
+ if (checkBrokerAvailable && !brokerList.toSet.subsetOf(availableBrokerList))
throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString +
"available broker:" + availableBrokerList.toString)
ret.put(partitionId, brokerList.toList)
http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 919aeb2..8763968 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -206,13 +206,17 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
replicas: Seq[Int], callback: (RequestOrResponse) => Unit = null) {
- brokerIds.filter(b => b >= 0).foreach { brokerId =>
- leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo])
- leaderAndIsrRequestMap(brokerId).put((topic, partition),
- PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
+ val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partition)
+
+ brokerIds.filter(b => b >= 0).foreach {
+ brokerId =>
+ leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo])
+ leaderAndIsrRequestMap(brokerId).put((topic, partition),
+ PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
}
+
addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
- Set(TopicAndPartition(topic, partition)))
+ Set(topicAndPartition))
}
def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 933de9d..401bf1e 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -337,11 +337,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
* required to clean up internal controller data structures
*/
def onControllerResignation() {
+ if (deleteTopicManager != null)
+ deleteTopicManager.shutdown()
+
inLock(controllerContext.controllerLock) {
if (config.autoLeaderRebalanceEnable)
autoRebalanceScheduler.shutdown()
- if (deleteTopicManager != null)
- deleteTopicManager.shutdown()
+
Utils.unregisterMBean(KafkaController.MBeanName)
partitionStateMachine.shutdown()
replicaStateMachine.shutdown()
@@ -644,8 +646,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
def shutdown() = {
inLock(controllerContext.controllerLock) {
isRunning = false
- onControllerResignation()
}
+ onControllerResignation()
}
def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 0e47dac..2f0f29d 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -233,8 +233,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
case Some(updatedLeaderIsrAndControllerEpoch) =>
// send the shrunk ISR state change request to all the remaining alive replicas of the partition.
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
- brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
- topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
+ if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
+ brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
+ topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
+ }
replicaState.put(partitionAndReplica, OfflineReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
@@ -275,6 +277,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
replicaState.filter(r => r._1.topic.equals(topic) && r._2 == state).keySet
}
+ def isAnyReplicaInState(topic: String, state: ReplicaState): Boolean = {
+ replicaState.exists(r => r._1.topic.equals(topic) && r._2 == state)
+ }
+
def replicasInDeletionStates(topic: String): Set[PartitionAndReplica] = {
val deletionStates = Set(ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionIneligible)
replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet
http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index e4bc243..219c413 100644
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -69,6 +69,7 @@ import java.util.concurrent.atomic.AtomicBoolean
class TopicDeletionManager(controller: KafkaController,
initialTopicsToBeDeleted: Set[String] = Set.empty,
initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging {
+ this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
val controllerContext = controller.controllerContext
val partitionStateMachine = controller.partitionStateMachine
val replicaStateMachine = controller.replicaStateMachine
@@ -81,14 +82,12 @@ class TopicDeletionManager(controller: KafkaController,
val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
var deleteTopicsThread: DeleteTopicsThread = null
val isDeleteTopicEnabled = controller.config.deleteTopicEnable
- @volatile var isShuttingDown = false
/**
* Invoked at the end of new controller initiation
*/
def start() {
- if(isDeleteTopicEnabled) {
- isShuttingDown = false
+ if (isDeleteTopicEnabled) {
deleteTopicsThread = new DeleteTopicsThread()
deleteTopicStateChanged.set(true)
deleteTopicsThread.start()
@@ -96,17 +95,18 @@ class TopicDeletionManager(controller: KafkaController,
}
/**
- * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared
+ * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared.
*/
def shutdown() {
- if(isDeleteTopicEnabled) {
- isShuttingDown = true
+ // Only allow one shutdown to go through
+ if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) {
+ // Resume the topic deletion so it doesn't block on the condition
resumeTopicDeletionThread()
- deleteTopicsThread.shutdown()
+ // Await delete topic thread to exit
+ deleteTopicsThread.awaitShutdown()
topicsToBeDeleted.clear()
partitionsToBeDeleted.clear()
topicsIneligibleForDeletion.clear()
- isShuttingDown = false
}
}
@@ -194,6 +194,13 @@ class TopicDeletionManager(controller: KafkaController,
false
}
+ def isPartitionToBeDeleted(topicAndPartition: TopicAndPartition) = {
+ if(isDeleteTopicEnabled) {
+ partitionsToBeDeleted.contains(topicAndPartition)
+ } else
+ false
+ }
+
def isTopicQueuedUpForDeletion(topic: String): Boolean = {
if(isDeleteTopicEnabled) {
topicsToBeDeleted.contains(topic)
@@ -207,8 +214,8 @@ class TopicDeletionManager(controller: KafkaController,
*/
private def awaitTopicDeletionNotification() {
inLock(deleteLock) {
- while(!isShuttingDown && !deleteTopicStateChanged.compareAndSet(true, false)) {
- info("Waiting for signal to start or continue topic deletion")
+ while(!deleteTopicsThread.isRunning.get() && !deleteTopicStateChanged.compareAndSet(true, false)) {
+ debug("Waiting for signal to start or continue topic deletion")
deleteTopicsCond.await()
}
}
@@ -257,6 +264,8 @@ class TopicDeletionManager(controller: KafkaController,
private def markTopicForDeletionRetry(topic: String) {
// reset replica states from ReplicaDeletionIneligible to OfflineReplica
val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
+ info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
+ .format(topic, failedReplicas.mkString(",")))
controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
}
@@ -281,7 +290,10 @@ class TopicDeletionManager(controller: KafkaController,
/**
* This callback is invoked by the DeleteTopics thread with the list of topics to be deleted
- * It invokes the delete partition callback for all partitions of a topic
+ * It invokes the delete partition callback for all partitions of a topic.
+ * The updateMetadataRequest is also going to set the leader for the topics being deleted to
+ * {@link LeaderAndIsr#LeaderDuringDelete}. This lets each broker know that this topic is being deleted and can be
+ * removed from their caches.
*/
private def onTopicDeletion(topics: Set[String]) {
info("Topic deletion callback for %s".format(topics.mkString(",")))
@@ -305,10 +317,9 @@ class TopicDeletionManager(controller: KafkaController,
* As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends
* the replicas a StopReplicaRequest (delete=true)
* This callback does the following things -
- * 1. Send metadata request to all brokers excluding the topics to be deleted
- * 2. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
+ * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
* for deletion if some replicas are dead since it won't complete successfully anyway
- * 3. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
+ * 2. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
*@param replicasForTopicsToBeDeleted
*/
private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
@@ -324,8 +335,10 @@ class TopicDeletionManager(controller: KafkaController,
debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
- if(deadReplicasForTopic.size > 0)
+ if(deadReplicasForTopic.size > 0) {
+ debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
markTopicIneligibleForDeletion(Set(topic))
+ }
}
}
@@ -365,12 +378,12 @@ class TopicDeletionManager(controller: KafkaController,
}
}
- class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread", isInterruptible = false) {
+ class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {
val zkClient = controllerContext.zkClient
override def doWork() {
awaitTopicDeletionNotification()
- if(!isRunning.get)
+ if (!isRunning.get)
return
inLock(controllerContext.controllerLock) {
@@ -395,13 +408,12 @@ class TopicDeletionManager(controller: KafkaController,
partitions.mkString(","), topic))
} else {
// if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
- // TopicDeletionSuccessful. That means, there is at least one failed replica, which means topic deletion
- // should be retried
- val replicasInTopicDeletionFailedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
- // mark topic for deletion retry
- markTopicForDeletionRetry(topic)
- info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
- .format(topic, replicasInTopicDeletionFailedState.mkString(",")))
+ // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
+ // or there is at least one failed replica (which means topic deletion should be retried).
+ if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
+ // mark topic for deletion retry
+ markTopicForDeletionRetry(topic)
+ }
}
}
// Try delete topic if it is eligible for deletion.
http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index ac67f08..ab72cff 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -44,7 +44,6 @@ class LogManager(val logDirs: Array[File],
val retentionCheckMs: Long,
scheduler: Scheduler,
private val time: Time) extends Logging {
-
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 11c20ce..6a56a77 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -125,7 +125,16 @@ class ReplicaManager(val config: KafkaConfig,
if (removedPartition != null)
removedPartition.delete() // this will delete the local log
}
- case None => //do nothing if replica no longer exists. This can happen during delete topic retries
+ case None =>
+ // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
+ // This could happen when topic is being deleted while broker is down and recovers.
+ if(deletePartition) {
+ val topicAndPartition = TopicAndPartition(topic, partitionId)
+
+ if(logManager.getLog(topicAndPartition).isDefined) {
+ logManager.deleteLog(topicAndPartition)
+ }
+ }
stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker"
.format(localBrokerId, deletePartition, topic, partitionId))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/utils/ShutdownableThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index cf8adc9..fc226c8 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -27,20 +27,29 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
val isRunning: AtomicBoolean = new AtomicBoolean(true)
private val shutdownLatch = new CountDownLatch(1)
+ def shutdown() = {
+ initiateShutdown()
+ awaitShutdown()
+ }
- def shutdown(): Unit = {
- info("Shutting down")
- isRunning.set(false)
- if (isInterruptible)
- interrupt()
- shutdownLatch.await()
- info("Shutdown completed")
+ def initiateShutdown(): Boolean = {
+ if(isRunning.compareAndSet(true, false)) {
+ info("Shutting down")
+ isRunning.set(false)
+ if (isInterruptible)
+ interrupt()
+ true
+ } else
+ false
}
/**
- * After calling shutdown(), use this API to wait until the shutdown is complete
+ * After calling initiateShutdown(), use this API to wait until the shutdown is complete
*/
- def awaitShutdown(): Unit = shutdownLatch.await()
+ def awaitShutdown(): Unit = {
+ shutdownLatch.await()
+ info("Shutdown completed")
+ }
def doWork(): Unit
http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 9c29e14..a821d60 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -33,376 +33,286 @@ import kafka.api.PartitionOffsetRequestInfo
class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
- /* Temporarily disable all tests until delete topic is fixed.
- * Add a fake test to let junit tests pass.
- */
@Test
- def testFake() {
+ def testDeleteTopicWithAllAliveReplicas() {
+ val topicAndPartition = TopicAndPartition("test", 0)
+ val topic = topicAndPartition.topic
+ val servers = createTestTopicAndCluster(topic)
+ // start topic deletion
+ AdminUtils.deleteTopic(zkClient, topic)
+ verifyTopicDeletion(topic, servers)
+ servers.foreach(_.shutdown())
}
+ @Test
+ def testResumeDeleteTopicWithRecoveredFollower() {
+ val topicAndPartition = TopicAndPartition("test", 0)
+ val topic = topicAndPartition.topic
+ val servers = createTestTopicAndCluster(topic)
+ // shut down one follower replica
+ val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+ assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
+ val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
+ follower.shutdown()
+ // start topic deletion
+ AdminUtils.deleteTopic(zkClient, topic)
+ // check if all replicas but the one that is shut down has deleted the log
+ TestUtils.waitUntilTrue(() =>
+ servers.filter(s => s.config.brokerId != follower.config.brokerId)
+ .foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.")
+ // ensure topic deletion is halted
+ TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
+ "Admin path /admin/delete_topic/test path deleted even when a follower replica is down")
+ // restart follower replica
+ follower.startup()
+ verifyTopicDeletion(topic, servers)
+ servers.foreach(_.shutdown())
+ }
- /*
- @Test
- def testDeleteTopicWithAllAliveReplicas() {
- val topicAndPartition = TopicAndPartition("test", 0)
- val topic = topicAndPartition.topic
- val servers = createTestTopicAndCluster(topic)
- // start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
- verifyTopicDeletion(topic, servers)
- servers.foreach(_.shutdown())
- }
+ @Test
+ def testResumeDeleteTopicOnControllerFailover() {
+ val topicAndPartition = TopicAndPartition("test", 0)
+ val topic = topicAndPartition.topic
+ val servers = createTestTopicAndCluster(topic)
+ val controllerId = ZkUtils.getController(zkClient)
+ val controller = servers.filter(s => s.config.brokerId == controllerId).head
+ var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+ val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get && s.config.brokerId != controllerId).last
+ follower.shutdown()
- @Test
- def testResumeDeleteTopicWithRecoveredFollower() {
- val topicAndPartition = TopicAndPartition("test", 0)
- val topic = topicAndPartition.topic
- val servers = createTestTopicAndCluster(topic)
- // shut down one follower replica
- val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
- assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
- val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
- follower.shutdown()
- // start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
- // check if all replicas but the one that is shut down has deleted the log
- assertTrue("Replicas 0,1 have not deleted log in 1000ms", TestUtils.waitUntilTrue(() =>
- servers.filter(s => s.config.brokerId != follower.config.brokerId)
- .foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty), 1000))
- // ensure topic deletion is halted
- assertTrue("Admin path /admin/delete_topic/test path deleted in 1000ms even when a follower replica is down",
- TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 500))
- // restart follower replica
- follower.startup()
- verifyTopicDeletion(topic, servers)
- servers.foreach(_.shutdown())
- }
+ // start topic deletion
+ AdminUtils.deleteTopic(zkClient, topic)
+ // shut down the controller to trigger controller failover during delete topic
+ controller.shutdown()
- @Test
- def testResumeDeleteTopicOnControllerFailover() {
- val topicAndPartition = TopicAndPartition("test", 0)
- val topic = topicAndPartition.topic
- val servers = createTestTopicAndCluster(topic)
- // start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
- // shut down the controller to trigger controller failover during delete topic
- val controllerId = ZkUtils.getController(zkClient)
- val controller = servers.filter(s => s.config.brokerId == controllerId).head
- controller.shutdown()
- // ensure topic deletion is halted
- assertTrue("Admin path /admin/delete_topic/test path deleted in 500ms even when a replica is down",
- TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 500))
- // restart follower replica
- controller.startup()
- // wait until admin path for delete topic is deleted, signaling completion of topic deletion
- assertTrue("Admin path /admin/delete_topic/test path not deleted in 4000ms even after a follower replica is restarted",
- TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 4000))
- assertTrue("Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted",
- TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), 100))
- // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
- assertTrue("Replica logs not deleted after delete topic is complete",
- servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty))
- servers.foreach(_.shutdown())
- }
+ // ensure topic deletion is halted
+ TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
+ "Admin path /admin/delete_topic/test path deleted even when a replica is down")
- @Test
- def testRequestHandlingDuringDeleteTopic() {
- val topicAndPartition = TopicAndPartition("test", 0)
- val topic = topicAndPartition.topic
- val servers = createTestTopicAndCluster(topic)
- // start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
- // shut down one follower replica
- var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
- assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
- val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
- follower.shutdown()
- // test if produce requests are failed with UnknownTopicOrPartitionException during delete topic
- val props1 = new Properties()
- props1.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(","))
- props1.put("serializer.class", "kafka.serializer.StringEncoder")
- props1.put("request.required.acks", "1")
- val producerConfig1 = new ProducerConfig(props1)
- val producer1 = new Producer[String, String](producerConfig1)
- try{
- producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
- fail("Test should fail because the topic is being deleted")
- } catch {
- case e: FailedToSendMessageException =>
- case oe: Throwable => fail("fails with exception", oe)
- } finally {
- producer1.close()
- }
- // test if fetch requests fail during delete topic
- servers.filter(s => s.config.brokerId != follower.config.brokerId).foreach { server =>
- val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64*1024, "")
+ controller.startup()
+ follower.startup()
+
+ verifyTopicDeletion(topic, servers)
+ servers.foreach(_.shutdown())
+ }
+
+ @Test
+ def testRequestHandlingDuringDeleteTopic() {
+ val topicAndPartition = TopicAndPartition("test", 0)
+ val topic = topicAndPartition.topic
+ val servers = createTestTopicAndCluster(topic)
+ // start topic deletion
+ AdminUtils.deleteTopic(zkClient, topic)
+ // shut down one follower replica
+ var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+ assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
+ val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
+ follower.shutdown()
+ // test if produce requests are failed with UnknownTopicOrPartitionException during delete topic
+ val props1 = new Properties()
+ props1.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(","))
+ props1.put("serializer.class", "kafka.serializer.StringEncoder")
+ props1.put("request.required.acks", "1")
+ val producerConfig1 = new ProducerConfig(props1)
+ val producer1 = new Producer[String, String](producerConfig1)
+ try {
+ producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
+ fail("Test should fail because the topic is being deleted")
+ } catch {
+ case e: FailedToSendMessageException =>
+ case oe: Throwable => fail("fails with exception", oe)
+ } finally {
+ producer1.close()
+ }
+ // test if fetch requests fail during delete topic
+ val availableServers: Seq[KafkaServer] = servers.filter(s => s.config.brokerId != follower.config.brokerId).toSeq
+ availableServers.foreach {
+ server =>
+ val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64 * 1024, "")
val request = new FetchRequestBuilder()
.clientId("test-client")
.addFetch(topic, 0, 0, 10000)
.build()
val fetched = consumer.fetch(request)
val fetchResponse = fetched.data(topicAndPartition)
- assertTrue("Fetch should fail with UnknownTopicOrPartitionCode", fetchResponse.error == ErrorMapping.UnknownTopicOrPartitionCode)
- }
- // test if offset requests fail during delete topic
- servers.filter(s => s.config.brokerId != follower.config.brokerId).foreach { server =>
- val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64*1024, "")
+ assertEquals("Fetch should fail with UnknownTopicOrPartitionCode", ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.error)
+ }
+ // test if offset requests fail during delete topic
+ availableServers.foreach {
+ server =>
+ val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64 * 1024, "")
val offsetRequest = new OffsetRequest(Map(topicAndPartition -> new PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
val errorCode = offsetResponse.partitionErrorAndOffsets(topicAndPartition).error
- assertTrue("Offset request should fail with UnknownTopicOrPartitionCode", errorCode == ErrorMapping.UnknownTopicOrPartitionCode)
- }
- // restart follower replica
- follower.startup()
- verifyTopicDeletion(topic, servers)
- servers.foreach(_.shutdown())
- }
-
- @Test
- def testDeleteTopicDuringPreferredReplicaElection() {
- val topic = "test"
- val topicAndPartition = TopicAndPartition(topic, 0)
- val servers = createTestTopicAndCluster(topic)
- var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
- assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
- // shut down the controller to move the leader to a non preferred replica before delete topic
- val preferredReplicaId = 0
- val preferredReplica = servers.filter(s => s.config.brokerId == preferredReplicaId).head
- preferredReplica.shutdown()
- preferredReplica.startup()
- val newLeaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 3000, leaderIdOpt)
- assertTrue("New leader should be elected prior to delete topic", newLeaderIdOpt.isDefined)
- // test preferred replica election
- val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(topicAndPartition))
- preferredReplicaElection.moveLeaderToPreferredReplica()
- // start topic deletion during preferred replica election. This should halt topic deletion but eventually
- // complete it successfully
- AdminUtils.deleteTopic(zkClient, topic)
- val newControllerId = ZkUtils.getController(zkClient)
- val newController = servers.filter(s => s.config.brokerId == newControllerId).head
- assertTrue("Preferred replica election should succeed after 1000ms", TestUtils.waitUntilTrue(() =>
- !newController.kafkaController.controllerContext.partitionsUndergoingPreferredReplicaElection.contains(topicAndPartition), 1000))
- verifyTopicDeletion(topic, servers)
- servers.foreach(_.shutdown())
- }
-
- @Test
- def testPartitionReassignmentDuringDeleteTopic() {
- val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
- val topic = "test"
- val topicAndPartition = TopicAndPartition(topic, 0)
- val brokerConfigs = TestUtils.createBrokerConfigs(4)
- brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
- // create brokers
- val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
- val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
- // create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
- // wait until replica log is created on every broker
- assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
- res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000))
- var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
- assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
- // start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
- // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since
- // the topic is being deleted
- // reassign partition 0
- val oldAssignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
- val newReplicas = Seq(1, 2, 3)
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
- assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
- // wait until reassignment is completed
- TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
- ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
- Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed;
- }, 1000)
- val controllerId = ZkUtils.getController(zkClient)
- val controller = servers.filter(s => s.config.brokerId == controllerId).head
- assertFalse("Partition reassignment should fail",
- controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition))
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
- assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas)
- verifyTopicDeletion(topic, servers)
- allServers.foreach(_.shutdown())
- }
-
- @Test
- def testDeleteTopicDuringPartitionReassignment() {
- val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
- val topic = "test"
- val topicAndPartition = TopicAndPartition(topic, 0)
- val brokerConfigs = TestUtils.createBrokerConfigs(4)
- brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
- // create brokers
- val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
- val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
- // create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
- // wait until replica log is created on every broker
- assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
- res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000))
- var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
- assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
- // start partition reassignment at the same time right before delete topic. In this case, reassignment will succeed
- // reassign partition 0
- val newReplicas = Seq(1, 2, 3)
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
- assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
- // start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
- // wait until reassignment is completed
- TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
- ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
- Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
- }, 1000)
- val controllerId = ZkUtils.getController(zkClient)
- val controller = servers.filter(s => s.config.brokerId == controllerId).head
- assertFalse("Partition reassignment should complete",
- controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition))
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
- assertEquals("Partition should be reassigned to 1,2,3", newReplicas, assignedReplicas)
- verifyTopicDeletion(topic, allServers)
- allServers.foreach(_.shutdown())
+ assertEquals("Offset request should fail with UnknownTopicOrPartitionCode", ErrorMapping.UnknownTopicOrPartitionCode, errorCode)
}
+ // restart follower replica
+ follower.startup()
+ verifyTopicDeletion(topic, availableServers)
+ servers.foreach(_.shutdown())
+ }
- @Test
- def testDeleteTopicDuringAddPartition() {
- val topic = "test"
- val servers = createTestTopicAndCluster(topic)
- val newPartition = TopicAndPartition(topic, 1)
- // add partitions to topic
- AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2")
- // start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
- // test if topic deletion is resumed
- verifyTopicDeletion(topic, servers)
- // verify that new partition doesn't exist on any broker either
- assertTrue("Replica logs not for new partition [test,1] not deleted after delete topic is complete", TestUtils.waitUntilTrue(() =>
- servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty), 1000))
- servers.foreach(_.shutdown())
- }
+ @Test
+ def testPartitionReassignmentDuringDeleteTopic() {
+ val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
+ val topic = "test"
+ val topicAndPartition = TopicAndPartition(topic, 0)
+ val brokerConfigs = TestUtils.createBrokerConfigs(4)
+ brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
+ // create brokers
+ val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
+ val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
+ // create the topic
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+ // wait until replica log is created on every broker
+ TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
+ res && server.getLogManager().getLog(topicAndPartition).isDefined),
+ "Replicas for topic test not created.")
+ var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+ assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
+ val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
+ follower.shutdown()
+ // start topic deletion
+ AdminUtils.deleteTopic(zkClient, topic)
+ // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since
+ // the topic is being deleted
+ // reassign partition 0
+ val oldAssignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
+ val newReplicas = Seq(1, 2, 3)
+ val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+ assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
+ // wait until reassignment is completed
+ TestUtils.waitUntilTrue(() => {
+ val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
+ ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
+ Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed;
+ }, "Partition reassignment shouldn't complete.")
+ val controllerId = ZkUtils.getController(zkClient)
+ val controller = servers.filter(s => s.config.brokerId == controllerId).head
+ assertFalse("Partition reassignment should fail",
+ controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition))
+ val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
+ assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas)
+ follower.startup()
+ verifyTopicDeletion(topic, servers)
+ allServers.foreach(_.shutdown())
+ }
- @Test
- def testAddPartitionDuringDeleteTopic() {
- val topic = "test"
- val topicAndPartition = TopicAndPartition(topic, 0)
- val servers = createTestTopicAndCluster(topic)
- // start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
- // add partitions to topic
- val newPartition = TopicAndPartition(topic, 1)
- AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2")
- verifyTopicDeletion(topic, servers)
- // verify that new partition doesn't exist on any broker either
- assertTrue("Replica logs not deleted after delete topic is complete",
- servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty))
- servers.foreach(_.shutdown())
- }
+ @Test
+ def testDeleteTopicDuringAddPartition() {
+ val topic = "test"
+ val servers = createTestTopicAndCluster(topic)
+ var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+ assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
+ val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
+ val newPartition = TopicAndPartition(topic, 1)
+ follower.shutdown()
+ // add partitions to topic
+ AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2", false)
+ // start topic deletion
+ AdminUtils.deleteTopic(zkClient, topic)
+ follower.startup()
+ // test if topic deletion is resumed
+ verifyTopicDeletion(topic, servers)
+ // verify that new partition doesn't exist on any broker either
+ TestUtils.waitUntilTrue(() =>
+ servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty),
+ "Replica logs not for new partition [test,1] not deleted after delete topic is complete.")
+ servers.foreach(_.shutdown())
+ }
- @Test
- def testRecreateTopicAfterDeletion() {
- val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
- val topic = "test"
- val topicAndPartition = TopicAndPartition(topic, 0)
- val servers = createTestTopicAndCluster(topic)
- // start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
- verifyTopicDeletion(topic, servers)
- // re-create topic on same replicas
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
- // wait until leader is elected
- val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
- assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
- // check if all replica logs are created
- assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
- res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000))
- servers.foreach(_.shutdown())
- }
+ @Test
+ def testAddPartitionDuringDeleteTopic() {
+ val topic = "test"
+ val topicAndPartition = TopicAndPartition(topic, 0)
+ val servers = createTestTopicAndCluster(topic)
+ // start topic deletion
+ AdminUtils.deleteTopic(zkClient, topic)
+ // add partitions to topic
+ val newPartition = TopicAndPartition(topic, 1)
+ AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2")
+ verifyTopicDeletion(topic, servers)
+ // verify that new partition doesn't exist on any broker either
+ assertTrue("Replica logs not deleted after delete topic is complete",
+ servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty))
+ servers.foreach(_.shutdown())
+ }
- @Test
- def testTopicConfigChangesDuringDeleteTopic() {
- val topic = "test"
- val servers = createTestTopicAndCluster(topic)
- val topicConfigs = new Properties()
- topicConfigs.put("segment.ms", "1000000")
- // start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
- verifyTopicDeletion(topic, servers)
- // make topic config changes
- try {
- AdminUtils.changeTopicConfig(zkClient, topic, topicConfigs)
- fail("Should fail with AdminOperationException for topic doesn't exist")
- } catch {
- case e: AdminOperationException => // expected
- }
- servers.foreach(_.shutdown())
- }
+ @Test
+ def testRecreateTopicAfterDeletion() {
+ val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
+ val topic = "test"
+ val topicAndPartition = TopicAndPartition(topic, 0)
+ val servers = createTestTopicAndCluster(topic)
+ // start topic deletion
+ AdminUtils.deleteTopic(zkClient, topic)
+ verifyTopicDeletion(topic, servers)
+ // re-create topic on same replicas
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+ // wait until leader is elected
+ val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+ assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
+ // check if all replica logs are created
+ TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined),
+ "Replicas for topic test not created.")
+ servers.foreach(_.shutdown())
+ }
- @Test
- def testAutoCreateAfterDeleteTopic() {
- val topicAndPartition = TopicAndPartition("test", 0)
- val topic = topicAndPartition.topic
- val servers = createTestTopicAndCluster(topic)
- // start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
- verifyTopicDeletion(topic, servers)
- // test if first produce request after topic deletion auto creates the topic
- val props = new Properties()
- props.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(","))
- props.put("serializer.class", "kafka.serializer.StringEncoder")
- props.put("producer.type", "sync")
- props.put("request.required.acks", "1")
- props.put("message.send.max.retries", "1")
- val producerConfig = new ProducerConfig(props)
- val producer = new Producer[String, String](producerConfig)
- try{
- producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
- } catch {
- case e: FailedToSendMessageException => fail("Topic should have been auto created")
- case oe: Throwable => fail("fails with exception", oe)
- }
- // test the topic path exists
- assertTrue("Topic not auto created", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
- // wait until leader is elected
- val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
- assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
- try {
- producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
- } catch {
- case e: FailedToSendMessageException => fail("Topic should have been auto created")
- case oe: Throwable => fail("fails with exception", oe)
- } finally {
- producer.close()
- }
- servers.foreach(_.shutdown())
+ @Test
+ def testAutoCreateAfterDeleteTopic() {
+ val topicAndPartition = TopicAndPartition("test", 0)
+ val topic = topicAndPartition.topic
+ val servers = createTestTopicAndCluster(topic)
+ // start topic deletion
+ AdminUtils.deleteTopic(zkClient, topic)
+ verifyTopicDeletion(topic, servers)
+ // test if first produce request after topic deletion auto creates the topic
+ val props = new Properties()
+ props.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(","))
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("producer.type", "sync")
+ props.put("request.required.acks", "1")
+ props.put("message.send.max.retries", "1")
+ val producerConfig = new ProducerConfig(props)
+ val producer = new Producer[String, String](producerConfig)
+ try {
+ producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
+ } catch {
+ case e: FailedToSendMessageException => fail("Topic should have been auto created")
+ case oe: Throwable => fail("fails with exception", oe)
}
+ // test the topic path exists
+ assertTrue("Topic not auto created", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
+ // wait until leader is elected
+ val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+ assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
+ servers.foreach(_.shutdown())
+ }
- @Test
- def testDeleteNonExistingTopic() {
- val topicAndPartition = TopicAndPartition("test", 0)
- val topic = topicAndPartition.topic
- val servers = createTestTopicAndCluster(topic)
- // start topic deletion
- AdminUtils.deleteTopic(zkClient, "test2")
- // verify delete topic path for test2 is removed from zookeeper
- verifyTopicDeletion("test2", servers)
- // verify that topic test is untouched
- assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
- res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000))
- // test the topic path exists
- assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
- // topic test should have a leader
- val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
- assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
- servers.foreach(_.shutdown())
+ @Test
+ def testDeleteNonExistingTopic() {
+ val topicAndPartition = TopicAndPartition("test", 0)
+ val topic = topicAndPartition.topic
+ val servers = createTestTopicAndCluster(topic)
+ // start topic deletion
+ AdminUtils.deleteTopic(zkClient, "test2")
+ // verify delete topic path for test2 is removed from zookeeper
+ verifyTopicDeletion("test2", servers)
+ // verify that topic test is untouched
+ TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
+ res && server.getLogManager().getLog(topicAndPartition).isDefined),
+ "Replicas for topic test not created")
+ // test the topic path exists
+ assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
+ // topic test should have a leader
+ val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+ assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
+ servers.foreach(_.shutdown())
- }
+ }
private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
- val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
+ val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topicAndPartition = TopicAndPartition(topic, 0)
val brokerConfigs = TestUtils.createBrokerConfigs(3)
brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
@@ -411,21 +321,21 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
// wait until replica log is created on every broker
- assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
- res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000))
+ TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
+ res && server.getLogManager().getLog(topicAndPartition).isDefined),
+ "Replicas for topic test not created")
servers
}
private def verifyTopicDeletion(topic: String, servers: Seq[KafkaServer]) {
val topicAndPartition = TopicAndPartition(topic, 0)
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
- assertTrue("Admin path /admin/delete_topic/test path not deleted in 1000ms even after a replica is restarted",
- TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 1000))
- assertTrue("Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted",
- TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), 100))
+ TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
+ "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted")
+ TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)),
+ "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted")
// ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
assertTrue("Replica logs not deleted after delete topic is complete",
servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty))
}
- */
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 5c48796..ad12116 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -17,14 +17,12 @@
package kafka.server
import junit.framework.Assert._
-import java.util.Properties
-import java.io.File
-import org.junit.{After, Before, Test}
+import org.junit.Test
import kafka.integration.KafkaServerTestHarness
import kafka.utils._
import kafka.common._
import kafka.log.LogConfig
-import kafka.admin.AdminUtils
+import kafka.admin.{AdminOperationException, AdminUtils}
import org.scalatest.junit.JUnit3Suite
class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -48,4 +46,15 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
}
}
+ @Test
+ def testConfigChangeOnNonExistingTopic() {
+ val topic = TestUtils.tempTopic
+ try {
+ AdminUtils.changeTopicConfig(zkClient, topic, LogConfig(flushInterval = 10000).toProps)
+ fail("Should fail with AdminOperationException for topic doesn't exist")
+ } catch {
+ case e: AdminOperationException => // expected
+ }
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 014e964..ab60e9b 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -101,7 +101,6 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
verifyNonDaemonThreadsStatus
}
- /* Temporarily disable the test until delete topic is fixed.
@Test
def testCleanShutdownWithDeleteTopicEnabled() {
val newProps = TestUtils.createBrokerConfig(0, port)
@@ -114,7 +113,6 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
Utils.rm(server.config.logDirs)
verifyNonDaemonThreadsStatus
}
- */
def verifyNonDaemonThreadsStatus() {
assertEquals(0, Thread.getAllStackTraces.keySet().toArray
http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 00bfba4..034f361 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -87,6 +87,8 @@ object TestUtils extends Logging {
f
}
+ def tempTopic(): String = "testTopic" + random.nextInt(1000000)
+
/**
* Create a temporary relative directory
*/