You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2015/02/10 20:59:08 UTC
kafka git commit: KAFKA-1476 Added a ConsumerCommand tool that will
replace other consumer related tools in the future; reviewed by Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk 71602de0b -> 9fe9913e9
KAFKA-1476 Added a ConsumerCommand tool that will replace other consumer related tools in the future; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9fe9913e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9fe9913e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9fe9913e
Branch: refs/heads/trunk
Commit: 9fe9913e95e1d3e114c74620d8da40f804f71b18
Parents: 71602de
Author: Onur Karaman <ok...@linkedin.com>
Authored: Tue Feb 10 11:58:54 2015 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Feb 10 11:59:01 2015 -0800
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 56 +++-
.../kafka/admin/ConsumerGroupCommand.scala | 310 +++++++++++++++++++
core/src/main/scala/kafka/utils/ZkUtils.scala | 26 +-
.../kafka/admin/DeleteConsumerGroupTest.scala | 212 +++++++++++++
.../unit/kafka/admin/DeleteTopicTest.scala | 52 +---
.../test/scala/unit/kafka/utils/TestUtils.scala | 33 +-
6 files changed, 641 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fe9913e/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 28b12c7..b700110 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -20,7 +20,7 @@ package kafka.admin
import kafka.common._
import kafka.cluster.Broker
import kafka.log.LogConfig
-import kafka.utils.{Logging, ZkUtils, Json}
+import kafka.utils._
import kafka.api.{TopicMetadata, PartitionMetadata}
import java.util.Random
@@ -164,6 +164,60 @@ object AdminUtils extends Logging {
ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
}
+ def isConsumerGroupActive(zkClient: ZkClient, group: String) = {
+ ZkUtils.getConsumersInGroup(zkClient, group).nonEmpty
+ }
+
+ /**
+ * Delete the whole directory of the given consumer group if the group is inactive.
+ *
+ * @param zkClient Zookeeper client
+ * @param group Consumer group
+ * @return whether or not we deleted the consumer group information
+ */
+ def deleteConsumerGroupInZK(zkClient: ZkClient, group: String) = {
+ if (!isConsumerGroupActive(zkClient, group)) {
+ val dir = new ZKGroupDirs(group)
+ ZkUtils.deletePathRecursive(zkClient, dir.consumerGroupDir)
+ true
+ }
+ else false
+ }
+
+ /**
+ * Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive.
+ * If the consumer group consumes no other topics, delete the whole consumer group directory.
+ *
+ * @param zkClient Zookeeper client
+ * @param group Consumer group
+ * @param topic Topic of the consumer group information we wish to delete
+ * @return whether or not we deleted the consumer group information for the given topic
+ */
+ def deleteConsumerGroupInfoForTopicInZK(zkClient: ZkClient, group: String, topic: String) = {
+ val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group)
+ if (topics == Seq(topic)) {
+ deleteConsumerGroupInZK(zkClient, group)
+ }
+ else if (!isConsumerGroupActive(zkClient, group)) {
+ val dir = new ZKGroupTopicDirs(group, topic)
+ ZkUtils.deletePathRecursive(zkClient, dir.consumerOwnerDir)
+ ZkUtils.deletePathRecursive(zkClient, dir.consumerOffsetDir)
+ true
+ }
+ else false
+ }
+
+ /**
+ * Delete every inactive consumer group's information about the given topic in Zookeeper.
+ *
+ * @param zkClient Zookeeper client
+ * @param topic Topic of the consumer group information we wish to delete
+ */
+ def deleteAllConsumerGroupInfoForTopicInZK(zkClient: ZkClient, topic: String) {
+ val groups = ZkUtils.getAllConsumerGroupsForTopic(zkClient, topic)
+ groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic))
+ }
+
def topicExists(zkClient: ZkClient, topic: String): Boolean =
zkClient.exists(ZkUtils.getTopicPath(topic))
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fe9913e/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
new file mode 100644
index 0000000..89fa29a
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import kafka.common._
+import java.util.Properties
+import kafka.client.ClientUtils
+import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, OffsetFetchResponse, OffsetFetchRequest}
+import org.I0Itec.zkclient.exception.ZkNoNodeException
+import kafka.common.TopicAndPartition
+import joptsimple.{OptionSpec, OptionParser}
+import scala.collection.{Set, mutable}
+import kafka.consumer.SimpleConsumer
+import collection.JavaConversions._
+
+
+object ConsumerGroupCommand {
+
+ def main(args: Array[String]) {
+ val opts = new ConsumerGroupCommandOptions(args)
+
+ if(args.length == 0)
+ CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.")
+
+ // should have exactly one action
+ val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
+ if(actions != 1)
+ CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete")
+
+ opts.checkArgs()
+
+ val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
+
+ try {
+ if (opts.options.has(opts.listOpt))
+ list(zkClient)
+ else if (opts.options.has(opts.describeOpt))
+ describe(zkClient, opts)
+ else if (opts.options.has(opts.deleteOpt))
+ delete(zkClient, opts)
+ } catch {
+ case e: Throwable =>
+ println("Error while executing consumer group command " + e.getMessage)
+ println(Utils.stackTrace(e))
+ } finally {
+ zkClient.close()
+ }
+ }
+
+ def list(zkClient: ZkClient) {
+ ZkUtils.getConsumerGroups(zkClient).foreach(println)
+ }
+
+ def describe(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+ val configs = parseConfigs(opts)
+ val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
+ val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
+ val group = opts.options.valueOf(opts.groupOpt)
+ val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group)
+ if (topics.isEmpty) {
+ println("No topic available for consumer group provided")
+ }
+ topics.foreach(topic => describeTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs))
+ }
+
+ def delete(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+ if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt)) {
+ deleteForTopic(zkClient, opts)
+ }
+ else if (opts.options.has(opts.groupOpt)) {
+ deleteForGroup(zkClient, opts)
+ }
+ else if (opts.options.has(opts.topicOpt)) {
+ deleteAllForTopic(zkClient, opts)
+ }
+ }
+
+ private def deleteForGroup(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+ val groups = opts.options.valuesOf(opts.groupOpt)
+ groups.foreach { group =>
+ try {
+ if (AdminUtils.deleteConsumerGroupInZK(zkClient, group))
+ println("Deleted all consumer group information for group %s in zookeeper.".format(group))
+ else
+ println("Delete for group %s failed because its consumers are still active.".format(group))
+ }
+ catch {
+ case e: ZkNoNodeException =>
+ println("Delete for group %s failed because group does not exist.".format(group))
+ }
+ }
+ }
+
+ private def deleteForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+ val groups = opts.options.valuesOf(opts.groupOpt)
+ val topic = opts.options.valueOf(opts.topicOpt)
+ Topic.validate(topic)
+ groups.foreach { group =>
+ try {
+ if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic))
+ println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic))
+ else
+ println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic))
+ }
+ catch {
+ case e: ZkNoNodeException =>
+ println("Delete for group %s topic %s failed because group does not exist.".format(group, topic))
+ }
+ }
+ }
+
+ private def deleteAllForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+ val topic = opts.options.valueOf(opts.topicOpt)
+ Topic.validate(topic)
+ AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic)
+ println("Deleted consumer group information for all inactive consumer groups for topic %s in zookeeper.".format(topic))
+ }
+
+ private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = {
+ val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))
+ require(configsToBeAdded.forall(config => config.length == 2),
+ "Invalid config: all configs to be added must be in the format \"key=val\".")
+ val props = new Properties
+ configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
+ props
+ }
+
+ private def describeTopic(zkClient: ZkClient,
+ group: String,
+ topic: String,
+ channelSocketTimeoutMs: Int,
+ channelRetryBackoffMs: Int) {
+ val topicPartitions = getTopicPartitions(zkClient, topic)
+ val partitionOffsets = getPartitionOffsets(zkClient, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
+ println("%s, %s, %s, %s, %s, %s, %s"
+ .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER"))
+ topicPartitions
+ .sortBy { case topicPartition => topicPartition.partition }
+ .foreach { topicPartition =>
+ describePartition(zkClient, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition))
+ }
+ }
+
+ private def getTopicPartitions(zkClient: ZkClient, topic: String) = {
+ val topicPartitionMap = ZkUtils.getPartitionsForTopics(zkClient, Seq(topic))
+ val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
+ partitions.map(TopicAndPartition(topic, _))
+ }
+
+ private def getPartitionOffsets(zkClient: ZkClient,
+ group: String,
+ topicPartitions: Seq[TopicAndPartition],
+ channelSocketTimeoutMs: Int,
+ channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = {
+ val offsetMap = mutable.Map[TopicAndPartition, Long]()
+ val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)
+ channel.send(OffsetFetchRequest(group, topicPartitions))
+ val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)
+
+ offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
+ if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
+ val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
+ // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
+ // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
+ try {
+ val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
+ offsetMap.put(topicAndPartition, offset)
+ } catch {
+ case z: ZkNoNodeException =>
+ println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper."
+ .format(group, topicAndPartition))
+ }
+ }
+ else if (offsetAndMetadata.error == ErrorMapping.NoError)
+ offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
+ else
+ println("Could not fetch offset from kafka for group %s partition %s due to %s."
+ .format(group, topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
+ }
+ channel.disconnect()
+ offsetMap.toMap
+ }
+
+ private def describePartition(zkClient: ZkClient,
+ group: String,
+ topic: String,
+ partition: Int,
+ offsetOpt: Option[Long]) {
+ val topicAndPartition = TopicAndPartition(topic, partition)
+ val groupDirs = new ZKGroupTopicDirs(group, topic)
+ val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/" + partition)._1
+ ZkUtils.getLeaderForPartition(zkClient, topic, partition) match {
+ case Some(-1) =>
+ println("%s, %s, %s, %s, %s, %s, %s"
+ .format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none")))
+ case Some(brokerId) =>
+ val consumerOpt = getConsumer(zkClient, brokerId)
+ consumerOpt match {
+ case Some(consumer) =>
+ val request =
+ OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+ val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+ consumer.close()
+
+ val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _)
+ println("%s, %s, %s, %s, %s, %s, %s"
+ .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none")))
+ case None => // ignore
+ }
+ case None =>
+ println("No broker for partition %s".format(topicAndPartition))
+ }
+ }
+
+ private def getConsumer(zkClient: ZkClient, brokerId: Int): Option[SimpleConsumer] = {
+ try {
+ ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
+ case Some(brokerInfoString) =>
+ Json.parseFull(brokerInfoString) match {
+ case Some(m) =>
+ val brokerInfo = m.asInstanceOf[Map[String, Any]]
+ val host = brokerInfo.get("host").get.asInstanceOf[String]
+ val port = brokerInfo.get("port").get.asInstanceOf[Int]
+ Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand"))
+ case None =>
+ throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
+ }
+ case None =>
+ throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
+ }
+ } catch {
+ case t: Throwable =>
+ println("Could not parse broker info due to " + t.getMessage)
+ None
+ }
+ }
+
+ class ConsumerGroupCommandOptions(args: Array[String]) {
+ val ZkConnectDoc = "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+ "Multiple URLS can be given to allow fail-over."
+ val GroupDoc = "The consumer group we wish to act on."
+ val TopicDoc = "The topic whose consumer group information should be deleted."
+ val ConfigDoc = "Configuration for timeouts. For instance --config channelSocketTimeoutMs=600"
+ val ListDoc = "List all consumer groups."
+ val DescribeDoc = "Describe consumer group and list offset lag related to given group."
+ val nl = System.getProperty("line.separator")
+ val DeleteDoc = "Pass in groups to delete topic partition offsets and ownership information " +
+ "over the entire consumer group. For instance --group g1 --group g2" + nl +
+ "Pass in groups with a single topic to just delete the given topic's partition offsets and ownership " +
+ "information for the given consumer groups. For instance --group g1 --group g2 --topic t1" + nl +
+ "Pass in just a topic to delete the given topic's partition offsets and ownership information " +
+ "for every consumer group. For instance --topic t1" + nl +
+ "WARNING: Only does deletions on consumer groups that are not active."
+ val parser = new OptionParser
+ val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
+ .withRequiredArg
+ .describedAs("urls")
+ .ofType(classOf[String])
+ val groupOpt = parser.accepts("group", GroupDoc)
+ .withRequiredArg
+ .describedAs("consumer group")
+ .ofType(classOf[String])
+ val topicOpt = parser.accepts("topic", TopicDoc)
+ .withRequiredArg
+ .describedAs("topic")
+ .ofType(classOf[String])
+ val configOpt = parser.accepts("config", ConfigDoc)
+ .withRequiredArg
+ .describedAs("name=value")
+ .ofType(classOf[String])
+ val listOpt = parser.accepts("list", ListDoc)
+ val describeOpt = parser.accepts("describe", DescribeDoc)
+ val deleteOpt = parser.accepts("delete", DeleteDoc)
+ val options = parser.parse(args : _*)
+
+ val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)
+
+ def checkArgs() {
+ // check required args
+ CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+ if (options.has(describeOpt))
+ CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
+ if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))
+ CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt))
+
+ // check invalid args
+ CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allConsumerGroupLevelOpts - describeOpt)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fe9913e/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index c14bd45..c78a1b6 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -749,6 +749,26 @@ object ZkUtils extends Logging {
}.flatten.toSet
}
}
+
+ def getConsumerGroups(zkClient: ZkClient) = {
+ ZkUtils.getChildren(zkClient, ConsumersPath)
+ }
+
+ def getTopicsByConsumerGroup(zkClient: ZkClient,consumerGroup:String) = {
+ ZkUtils.getChildrenParentMayNotExist(zkClient, new ZKGroupDirs(consumerGroup).consumerGroupOwnersDir)
+ }
+
+ def getAllConsumerGroupsForTopic(zkClient: ZkClient, topic: String): Set[String] = {
+ val groups = ZkUtils.getChildrenParentMayNotExist(zkClient, ConsumersPath)
+ if (groups == null) Set.empty
+ else {
+ groups.foldLeft(Set.empty[String]) {(consumerGroupsForTopic, group) =>
+ val topics = getChildren(zkClient, new ZKGroupDirs(group).consumerGroupOffsetsDir)
+ if (topics.contains(topic)) consumerGroupsForTopic + group
+ else consumerGroupsForTopic
+ }
+ }
+ }
}
object ZKStringSerializer extends ZkSerializer {
@@ -769,11 +789,13 @@ class ZKGroupDirs(val group: String) {
def consumerDir = ZkUtils.ConsumersPath
def consumerGroupDir = consumerDir + "/" + group
def consumerRegistryDir = consumerGroupDir + "/ids"
+ def consumerGroupOffsetsDir = consumerGroupDir + "/offsets"
+ def consumerGroupOwnersDir = consumerGroupDir + "/owners"
}
class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) {
- def consumerOffsetDir = consumerGroupDir + "/offsets/" + topic
- def consumerOwnerDir = consumerGroupDir + "/owners/" + topic
+ def consumerOffsetDir = consumerGroupOffsetsDir + "/" + topic
+ def consumerOwnerDir = consumerGroupOwnersDir + "/" + topic
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fe9913e/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
new file mode 100644
index 0000000..d530338
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{ZKGroupDirs, ZKGroupTopicDirs, ZkUtils, TestUtils}
+import kafka.server.KafkaConfig
+import org.junit.Test
+import kafka.consumer._
+import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
+import kafka.integration.KafkaServerTestHarness
+
+
+class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness {
+ val configs = TestUtils.createBrokerConfigs(3, false, true).map(new KafkaConfig(_))
+
+ @Test
+ def testGroupWideDeleteInZK() {
+ val topic = "test"
+ val groupToDelete = "groupToDelete"
+ val otherGroup = "otherGroup"
+
+ TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+ fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
+ fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
+
+ AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete)
+
+ TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)),
+ "DeleteConsumerGroupInZK should delete the provided consumer group's directory")
+ TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(otherGroup)),
+ "DeleteConsumerGroupInZK should not delete unrelated consumer group directories")
+ }
+
+ @Test
+ def testGroupWideDeleteInZKDoesNothingForActiveConsumerGroup() {
+ val topic = "test"
+ val groupToDelete = "groupToDelete"
+ val otherGroup = "otherGroup"
+
+ TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+ fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, true)
+ fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
+
+ AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete)
+
+ TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(groupToDelete)),
+ "DeleteConsumerGroupInZK should not delete the provided consumer group's directory if the consumer group is still active")
+ TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(otherGroup)),
+ "DeleteConsumerGroupInZK should not delete unrelated consumer group directories")
+ }
+
+ @Test
+ def testGroupTopicWideDeleteInZKForGroupConsumingOneTopic() {
+ val topic = "test"
+ val groupToDelete = "groupToDelete"
+ val otherGroup = "otherGroup"
+ TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+ fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
+ fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
+
+ AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topic)
+
+ TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)),
+ "DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's directory if it just consumes from one topic")
+ TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(otherGroup, topic)),
+ "DeleteConsumerGroupInfoForTopicInZK should not delete unrelated consumer group owner and offset directories")
+ }
+
+ @Test
+ def testGroupTopicWideDeleteInZKForGroupConsumingMultipleTopics() {
+ val topicToDelete = "topicToDelete"
+ val otherTopic = "otherTopic"
+ val groupToDelete = "groupToDelete"
+ val otherGroup = "otherGroup"
+ TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
+ TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+
+ fillInConsumerGroupInfo(topicToDelete, groupToDelete, "consumer", 0, 10, false)
+ fillInConsumerGroupInfo(otherTopic, groupToDelete, "consumer", 0, 10, false)
+ fillInConsumerGroupInfo(topicToDelete, otherGroup, "consumer", 0, 10, false)
+
+ AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topicToDelete)
+
+ TestUtils.waitUntilTrue(() => !groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(groupToDelete, topicToDelete)),
+ "DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's owner and offset directories for the given topic")
+ TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(groupToDelete, otherTopic)),
+ "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for unrelated topics")
+ TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(otherGroup, topicToDelete)),
+ "DeleteConsumerGroupInfoForTopicInZK should not delete unrelated consumer group owner and offset directories")
+ }
+
+ @Test
+ def testGroupTopicWideDeleteInZKDoesNothingForActiveGroupConsumingMultipleTopics() {
+ val topicToDelete = "topicToDelete"
+ val otherTopic = "otherTopic"
+ val group = "group"
+ TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
+ TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+
+ fillInConsumerGroupInfo(topicToDelete, group, "consumer", 0, 10, true)
+ fillInConsumerGroupInfo(otherTopic, group, "consumer", 0, 10, true)
+
+ AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topicToDelete)
+
+ TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, topicToDelete)),
+ "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for the given topic if the consumer group is still active")
+ TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, otherTopic)),
+ "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for unrelated topics")
+ }
+
+ @Test
+ def testTopicWideDeleteInZK() {
+ val topicToDelete = "topicToDelete"
+ val otherTopic = "otherTopic"
+ val groups = Seq("group1", "group2")
+
+ TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
+ TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+ val groupTopicDirsForTopicToDelete = groups.map(group => new ZKGroupTopicDirs(group, topicToDelete))
+ val groupTopicDirsForOtherTopic = groups.map(group => new ZKGroupTopicDirs(group, otherTopic))
+ groupTopicDirsForTopicToDelete.foreach(dir => fillInConsumerGroupInfo(topicToDelete, dir.group, "consumer", 0, 10, false))
+ groupTopicDirsForOtherTopic.foreach(dir => fillInConsumerGroupInfo(otherTopic, dir.group, "consumer", 0, 10, false))
+
+ AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topicToDelete)
+
+ TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist),
+ "Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
+ TestUtils.waitUntilTrue(() => groupTopicDirsForOtherTopic.forall(groupTopicOffsetAndOwnerDirsExist),
+ "Consumer group info on unrelated topics should not be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
+ }
+
+ @Test
+ def testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK() {
+ val topic = "topic"
+ val group = "group"
+
+ TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+ val dir = new ZKGroupTopicDirs(group, topic)
+ fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, false)
+
+ AdminUtils.deleteTopic(zkClient, topic)
+ TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+ AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic)
+
+ TestUtils.waitUntilTrue(() => !groupDirExists(dir),
+ "Consumer group info on related topics should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
+ //produce events
+ val producer = TestUtils.createNewProducer(brokerList)
+ produceEvents(producer, topic, List.fill(10)("test"))
+
+ //consume events
+ val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, "consumer")
+ consumerProps.put("auto.commit.enable", "false")
+ consumerProps.put("auto.offset.reset", "smallest")
+ consumerProps.put("consumer.timeout.ms", "2000")
+ consumerProps.put("fetch.wait.max.ms", "0")
+ val consumerConfig = new ConsumerConfig(consumerProps)
+ val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+ val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head
+ consumeEvents(messageStream, 5)
+ consumerConnector.commitOffsets(false)
+ producer.close()
+ consumerConnector.shutdown()
+
+ TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(dir),
+ "Consumer group info should exist after consuming from a recreated topic")
+ }
+
+ private def fillInConsumerGroupInfo(topic: String, group: String, consumerId: String, partition: Int, offset: Int, registerConsumer: Boolean) {
+ val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, consumerId)
+ val consumerConfig = new ConsumerConfig(consumerProps)
+ val dir = new ZKGroupTopicDirs(group, topic)
+ TestUtils.updateConsumerOffset(consumerConfig, dir.consumerOffsetDir + "/" + partition, offset)
+ ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.getConsumerPartitionOwnerPath(group, topic, partition), "")
+ ZkUtils.makeSurePersistentPathExists(zkClient, dir.consumerRegistryDir)
+ if (registerConsumer) {
+ ZkUtils.createEphemeralPathExpectConflict(zkClient, dir.consumerRegistryDir + "/" + consumerId, "")
+ }
+ }
+
+ private def groupDirExists(dir: ZKGroupDirs) = {
+ ZkUtils.pathExists(zkClient, dir.consumerGroupDir)
+ }
+
+ private def groupTopicOffsetAndOwnerDirsExist(dir: ZKGroupTopicDirs) = {
+ ZkUtils.pathExists(zkClient, dir.consumerOffsetDir) && ZkUtils.pathExists(zkClient, dir.consumerOwnerDir)
+ }
+
+ private def produceEvents(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, messages: List[String]) {
+ messages.foreach(message => producer.send(new ProducerRecord(topic, message.getBytes)))
+ }
+
+ private def consumeEvents(messageStream: KafkaStream[Array[Byte], Array[Byte]], n: Int) {
+ val iter = messageStream.iterator
+ (0 until n).foreach(_ => iter.next)
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fe9913e/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 33c2767..0cbd726 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -16,23 +16,15 @@
*/
package kafka.admin
-import java.io.File
-
import kafka.log.Log
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import junit.framework.Assert._
import kafka.utils.{ZkUtils, TestUtils}
-import kafka.server.{OffsetCheckpoint, KafkaServer, KafkaConfig}
+import kafka.server.{KafkaServer, KafkaConfig}
import org.junit.Test
-import kafka.common._
-import kafka.producer.{ProducerConfig, Producer}
import java.util.Properties
-import kafka.api._
-import kafka.consumer.SimpleConsumer
-import kafka.producer.KeyedMessage
import kafka.common.TopicAndPartition
-import kafka.api.PartitionOffsetRequestInfo
class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -43,7 +35,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val servers = createTestTopicAndCluster(topic)
// start topic deletion
AdminUtils.deleteTopic(zkClient, topic)
- verifyTopicDeletion(topic, servers)
+ TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
servers.foreach(_.shutdown())
}
@@ -68,7 +60,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
"Admin path /admin/delete_topic/test path deleted even when a follower replica is down")
// restart follower replica
follower.startup()
- verifyTopicDeletion(topic, servers)
+ TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
servers.foreach(_.shutdown())
}
@@ -95,7 +87,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
controller.startup()
follower.startup()
- verifyTopicDeletion(topic, servers)
+ TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
servers.foreach(_.shutdown())
}
@@ -141,7 +133,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas)
follower.startup()
- verifyTopicDeletion(topic, servers)
+ TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
allServers.foreach(_.shutdown())
}
@@ -160,7 +152,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
AdminUtils.deleteTopic(zkClient, topic)
follower.startup()
// test if topic deletion is resumed
- verifyTopicDeletion(topic, servers)
+ TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
// verify that new partition doesn't exist on any broker either
TestUtils.waitUntilTrue(() =>
servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty),
@@ -178,7 +170,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
// add partitions to topic
val newPartition = TopicAndPartition(topic, 1)
AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2")
- verifyTopicDeletion(topic, servers)
+ TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
// verify that new partition doesn't exist on any broker either
assertTrue("Replica logs not deleted after delete topic is complete",
servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty))
@@ -193,7 +185,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val servers = createTestTopicAndCluster(topic)
// start topic deletion
AdminUtils.deleteTopic(zkClient, topic)
- verifyTopicDeletion(topic, servers)
+ TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
// re-create topic on same replicas
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
// wait until leader is elected
@@ -213,7 +205,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
// start topic deletion
AdminUtils.deleteTopic(zkClient, "test2")
// verify delete topic path for test2 is removed from zookeeper
- verifyTopicDeletion("test2", servers)
+ TestUtils.verifyTopicDeletion(zkClient, "test2", 1, servers)
// verify that topic test is untouched
TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
res && server.getLogManager().getLog(topicAndPartition).isDefined),
@@ -252,7 +244,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
// delete topic
AdminUtils.deleteTopic(zkClient, "test")
- verifyTopicDeletion("test", servers)
+ TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
servers.foreach(_.shutdown())
}
@@ -279,30 +271,6 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
servers
}
- private def verifyTopicDeletion(topic: String, servers: Seq[KafkaServer]) {
- val topicAndPartition = TopicAndPartition(topic, 0)
- // wait until admin path for delete topic is deleted, signaling completion of topic deletion
- TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
- "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted")
- TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)),
- "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted")
- // ensure that the topic-partition has been deleted from all brokers' replica managers
- TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.replicaManager.getPartition(topic, 0) == None),
- "Replica manager's should have deleted all of this topic's partitions")
- // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
- assertTrue("Replica logs not deleted after delete topic is complete",
- servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty))
- // ensure that topic is removed from all cleaner offsets
- TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res,server) => res &&
- {
- val topicAndPartition = TopicAndPartition(topic,0)
- val logdir = server.getLogManager().logDirs(0)
- val checkpoints = new OffsetCheckpoint(new File(logdir,"cleaner-offset-checkpoint")).read()
- !checkpoints.contains(topicAndPartition)
- }),
- "Cleaner offset for deleted partition should have been removed")
- }
-
private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
var counter = 0
for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fe9913e/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 54755e8..21d0ed2 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -140,9 +140,10 @@ object TestUtils extends Logging {
* Create a test config for the given node id
*/
def createBrokerConfigs(numConfigs: Int,
- enableControlledShutdown: Boolean = true): List[Properties] = {
+ enableControlledShutdown: Boolean = true,
+ enableDeleteTopic: Boolean = false): List[Properties] = {
for((port, node) <- choosePorts(numConfigs).zipWithIndex)
- yield createBrokerConfig(node, port, enableControlledShutdown)
+ yield createBrokerConfig(node, port, enableControlledShutdown, enableDeleteTopic)
}
def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
@@ -153,7 +154,8 @@ object TestUtils extends Logging {
* Create a test config for the given node id
*/
def createBrokerConfig(nodeId: Int, port: Int = choosePort(),
- enableControlledShutdown: Boolean = true): Properties = {
+ enableControlledShutdown: Boolean = true,
+ enableDeleteTopic: Boolean = false): Properties = {
val props = new Properties
if (nodeId >= 0) props.put("broker.id", nodeId.toString)
props.put("host.name", "localhost")
@@ -162,6 +164,7 @@ object TestUtils extends Logging {
props.put("zookeeper.connect", TestZKUtils.zookeeperConnect)
props.put("replica.socket.timeout.ms", "1500")
props.put("controlled.shutdown.enable", enableControlledShutdown.toString)
+ props.put("delete.topic.enable", enableDeleteTopic.toString)
props
}
@@ -793,6 +796,30 @@ object TestUtils extends Logging {
}
messages.reverse
}
+
+ def verifyTopicDeletion(zkClient: ZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) {
+ val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
+ // wait until admin path for delete topic is deleted, signaling completion of topic deletion
+ TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
+ "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted")
+ TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)),
+ "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted")
+ // ensure that the topic-partition has been deleted from all brokers' replica managers
+ TestUtils.waitUntilTrue(() =>
+ servers.forall(server => topicAndPartitions.forall(tp => server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
+ "Replica manager's should have deleted all of this topic's partitions")
+ // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
+ assertTrue("Replica logs not deleted after delete topic is complete",
+ servers.forall(server => topicAndPartitions.forall(tp => server.getLogManager().getLog(tp).isEmpty)))
+ // ensure that topic is removed from all cleaner offsets
+ TestUtils.waitUntilTrue(() => servers.forall(server => topicAndPartitions.forall { tp =>
+ val checkpoints = server.getLogManager().logDirs.map { logDir =>
+ new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
+ }
+ checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
+ }), "Cleaner offset for deleted partition should have been removed")
+ }
+
}
object TestZKUtils {