You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2015/02/10 20:59:08 UTC

kafka git commit: KAFKA-1476 Added a ConsumerCommand tool that will replace other consumer related tools in the future; reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk 71602de0b -> 9fe9913e9


KAFKA-1476 Added a ConsumerCommand tool that will replace other consumer related tools in the future; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9fe9913e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9fe9913e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9fe9913e

Branch: refs/heads/trunk
Commit: 9fe9913e95e1d3e114c74620d8da40f804f71b18
Parents: 71602de
Author: Onur Karaman <ok...@linkedin.com>
Authored: Tue Feb 10 11:58:54 2015 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Feb 10 11:59:01 2015 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  56 +++-
 .../kafka/admin/ConsumerGroupCommand.scala      | 310 +++++++++++++++++++
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  26 +-
 .../kafka/admin/DeleteConsumerGroupTest.scala   | 212 +++++++++++++
 .../unit/kafka/admin/DeleteTopicTest.scala      |  52 +---
 .../test/scala/unit/kafka/utils/TestUtils.scala |  33 +-
 6 files changed, 641 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9fe9913e/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 28b12c7..b700110 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -20,7 +20,7 @@ package kafka.admin
 import kafka.common._
 import kafka.cluster.Broker
 import kafka.log.LogConfig
-import kafka.utils.{Logging, ZkUtils, Json}
+import kafka.utils._
 import kafka.api.{TopicMetadata, PartitionMetadata}
 
 import java.util.Random
@@ -164,6 +164,60 @@ object AdminUtils extends Logging {
     ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
   }
   
+  def isConsumerGroupActive(zkClient: ZkClient, group: String) = {
+    ZkUtils.getConsumersInGroup(zkClient, group).nonEmpty
+  }
+
+  /**
+   * Delete the whole directory of the given consumer group if the group is inactive.
+   *
+   * @param zkClient Zookeeper client
+   * @param group Consumer group
+   * @return whether or not we deleted the consumer group information
+   */
+  def deleteConsumerGroupInZK(zkClient: ZkClient, group: String) = {
+    if (!isConsumerGroupActive(zkClient, group)) {
+      val dir = new ZKGroupDirs(group)
+      ZkUtils.deletePathRecursive(zkClient, dir.consumerGroupDir)
+      true
+    }
+    else false
+  }
+
+  /**
+   * Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive.
+   * If the consumer group consumes no other topics, delete the whole consumer group directory.
+   *
+   * @param zkClient Zookeeper client
+   * @param group Consumer group
+   * @param topic Topic of the consumer group information we wish to delete
+   * @return whether or not we deleted the consumer group information for the given topic
+   */
+  def deleteConsumerGroupInfoForTopicInZK(zkClient: ZkClient, group: String, topic: String) = {
+    val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group)
+    if (topics == Seq(topic)) {
+      deleteConsumerGroupInZK(zkClient, group)
+    }
+    else if (!isConsumerGroupActive(zkClient, group)) {
+      val dir = new ZKGroupTopicDirs(group, topic)
+      ZkUtils.deletePathRecursive(zkClient, dir.consumerOwnerDir)
+      ZkUtils.deletePathRecursive(zkClient, dir.consumerOffsetDir)
+      true
+    }
+    else false
+  }
+
+  /**
+   * Delete every inactive consumer group's information about the given topic in Zookeeper.
+   *
+   * @param zkClient Zookeeper client
+   * @param topic Topic of the consumer group information we wish to delete
+   */
+  def deleteAllConsumerGroupInfoForTopicInZK(zkClient: ZkClient, topic: String) {
+    val groups = ZkUtils.getAllConsumerGroupsForTopic(zkClient, topic)
+    groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic))
+  }
+
   def topicExists(zkClient: ZkClient, topic: String): Boolean = 
     zkClient.exists(ZkUtils.getTopicPath(topic))
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fe9913e/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
new file mode 100644
index 0000000..89fa29a
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import kafka.common._
+import java.util.Properties
+import kafka.client.ClientUtils
+import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, OffsetFetchResponse, OffsetFetchRequest}
+import org.I0Itec.zkclient.exception.ZkNoNodeException
+import kafka.common.TopicAndPartition
+import joptsimple.{OptionSpec, OptionParser}
+import scala.collection.{Set, mutable}
+import kafka.consumer.SimpleConsumer
+import collection.JavaConversions._
+
+
+object ConsumerGroupCommand {
+
+  def main(args: Array[String]) {
+    val opts = new ConsumerGroupCommandOptions(args)
+
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.")
+
+    // should have exactly one action
+    val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
+    if(actions != 1)
+      CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete")
+
+    opts.checkArgs()
+
+    val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
+
+    try {
+      if (opts.options.has(opts.listOpt))
+        list(zkClient)
+      else if (opts.options.has(opts.describeOpt))
+        describe(zkClient, opts)
+      else if (opts.options.has(opts.deleteOpt))
+        delete(zkClient, opts)
+    } catch {
+      case e: Throwable =>
+        println("Error while executing consumer group command " + e.getMessage)
+        println(Utils.stackTrace(e))
+    } finally {
+      zkClient.close()
+    }
+  }
+
+  def list(zkClient: ZkClient) {
+    ZkUtils.getConsumerGroups(zkClient).foreach(println)
+  }
+
+  def describe(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+    val configs = parseConfigs(opts)
+    val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
+    val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
+    val group = opts.options.valueOf(opts.groupOpt)
+    val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group)
+    if (topics.isEmpty) {
+      println("No topic available for consumer group provided")
+    }
+    topics.foreach(topic => describeTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs))
+  }
+
+  def delete(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+    if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt)) {
+      deleteForTopic(zkClient, opts)
+    }
+    else if (opts.options.has(opts.groupOpt)) {
+      deleteForGroup(zkClient, opts)
+    }
+    else if (opts.options.has(opts.topicOpt)) {
+      deleteAllForTopic(zkClient, opts)
+    }
+  }
+
+  private def deleteForGroup(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+    val groups = opts.options.valuesOf(opts.groupOpt)
+    groups.foreach { group =>
+      try {
+        if (AdminUtils.deleteConsumerGroupInZK(zkClient, group))
+          println("Deleted all consumer group information for group %s in zookeeper.".format(group))
+        else
+          println("Delete for group %s failed because its consumers are still active.".format(group))
+      }
+      catch {
+        case e: ZkNoNodeException =>
+          println("Delete for group %s failed because group does not exist.".format(group))
+      }
+    }
+  }
+
+  private def deleteForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+    val groups = opts.options.valuesOf(opts.groupOpt)
+    val topic = opts.options.valueOf(opts.topicOpt)
+    Topic.validate(topic)
+    groups.foreach { group =>
+      try {
+        if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic))
+          println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic))
+        else
+          println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic))
+      }
+      catch {
+        case e: ZkNoNodeException =>
+          println("Delete for group %s topic %s failed because group does not exist.".format(group, topic))
+      }
+    }
+  }
+
+  private def deleteAllForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+    val topic = opts.options.valueOf(opts.topicOpt)
+    Topic.validate(topic)
+    AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic)
+    println("Deleted consumer group information for all inactive consumer groups for topic %s in zookeeper.".format(topic))
+  }
+
+  private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = {
+    val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))
+    require(configsToBeAdded.forall(config => config.length == 2),
+            "Invalid config: all configs to be added must be in the format \"key=val\".")
+    val props = new Properties
+    configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
+    props
+  }
+
+  private def describeTopic(zkClient: ZkClient,
+                            group: String,
+                            topic: String,
+                            channelSocketTimeoutMs: Int,
+                            channelRetryBackoffMs: Int) {
+    val topicPartitions = getTopicPartitions(zkClient, topic)
+    val partitionOffsets = getPartitionOffsets(zkClient, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
+    println("%s, %s, %s, %s, %s, %s, %s"
+      .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER"))
+    topicPartitions
+      .sortBy { case topicPartition => topicPartition.partition }
+      .foreach { topicPartition =>
+      describePartition(zkClient, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition))
+    }
+  }
+
+  private def getTopicPartitions(zkClient: ZkClient, topic: String) = {
+    val topicPartitionMap = ZkUtils.getPartitionsForTopics(zkClient, Seq(topic))
+    val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
+    partitions.map(TopicAndPartition(topic, _))
+  }
+
+  private def getPartitionOffsets(zkClient: ZkClient,
+                                  group: String,
+                                  topicPartitions: Seq[TopicAndPartition],
+                                  channelSocketTimeoutMs: Int,
+                                  channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = {
+    val offsetMap = mutable.Map[TopicAndPartition, Long]()
+    val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)
+    channel.send(OffsetFetchRequest(group, topicPartitions))
+    val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)
+
+    offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
+      if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
+        val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
+        // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
+        // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
+        try {
+          val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
+          offsetMap.put(topicAndPartition, offset)
+        } catch {
+          case z: ZkNoNodeException =>
+            println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper."
+              .format(group, topicAndPartition))
+        }
+      }
+      else if (offsetAndMetadata.error == ErrorMapping.NoError)
+        offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
+      else
+        println("Could not fetch offset from kafka for group %s partition %s due to %s."
+          .format(group, topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
+    }
+    channel.disconnect()
+    offsetMap.toMap
+  }
+
+  private def describePartition(zkClient: ZkClient,
+                                group: String,
+                                topic: String,
+                                partition: Int,
+                                offsetOpt: Option[Long]) {
+    val topicAndPartition = TopicAndPartition(topic, partition)
+    val groupDirs = new ZKGroupTopicDirs(group, topic)
+    val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/" + partition)._1
+    ZkUtils.getLeaderForPartition(zkClient, topic, partition) match {
+      case Some(-1) =>
+        println("%s, %s, %s, %s, %s, %s, %s"
+          .format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none")))
+      case Some(brokerId) =>
+        val consumerOpt = getConsumer(zkClient, brokerId)
+        consumerOpt match {
+          case Some(consumer) =>
+            val request =
+              OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+            val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+            consumer.close()
+
+            val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _)
+            println("%s, %s, %s, %s, %s, %s, %s"
+              .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none")))
+          case None => // ignore
+        }
+      case None =>
+        println("No broker for partition %s".format(topicAndPartition))
+    }
+  }
+
+  private def getConsumer(zkClient: ZkClient, brokerId: Int): Option[SimpleConsumer] = {
+    try {
+      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
+        case Some(brokerInfoString) =>
+          Json.parseFull(brokerInfoString) match {
+            case Some(m) =>
+              val brokerInfo = m.asInstanceOf[Map[String, Any]]
+              val host = brokerInfo.get("host").get.asInstanceOf[String]
+              val port = brokerInfo.get("port").get.asInstanceOf[Int]
+              Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand"))
+            case None =>
+              throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
+          }
+        case None =>
+          throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
+      }
+    } catch {
+      case t: Throwable =>
+        println("Could not parse broker info due to " + t.getMessage)
+        None
+    }
+  }
+
+  class ConsumerGroupCommandOptions(args: Array[String]) {
+    val ZkConnectDoc = "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+      "Multiple URLS can be given to allow fail-over."
+    val GroupDoc = "The consumer group we wish to act on."
+    val TopicDoc = "The topic whose consumer group information should be deleted."
+    val ConfigDoc = "Configuration for timeouts. For instance --config channelSocketTimeoutMs=600"
+    val ListDoc = "List all consumer groups."
+    val DescribeDoc = "Describe consumer group and list offset lag related to given group."
+    val nl = System.getProperty("line.separator")
+    val DeleteDoc = "Pass in groups to delete topic partition offsets and ownership information " +
+      "over the entire consumer group. For instance --group g1 --group g2" + nl +
+      "Pass in groups with a single topic to just delete the given topic's partition offsets and ownership " +
+      "information for the given consumer groups. For instance --group g1 --group g2 --topic t1" + nl +
+      "Pass in just a topic to delete the given topic's partition offsets and ownership information " +
+      "for every consumer group. For instance --topic t1" + nl +
+      "WARNING: Only does deletions on consumer groups that are not active."
+    val parser = new OptionParser
+    val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
+                             .withRequiredArg
+                             .describedAs("urls")
+                             .ofType(classOf[String])
+    val groupOpt = parser.accepts("group", GroupDoc)
+                         .withRequiredArg
+                         .describedAs("consumer group")
+                         .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", TopicDoc)
+                         .withRequiredArg
+                         .describedAs("topic")
+                         .ofType(classOf[String])
+    val configOpt = parser.accepts("config", ConfigDoc)
+                          .withRequiredArg
+                          .describedAs("name=value")
+                          .ofType(classOf[String])
+    val listOpt = parser.accepts("list", ListDoc)
+    val describeOpt = parser.accepts("describe", DescribeDoc)
+    val deleteOpt = parser.accepts("delete", DeleteDoc)
+    val options = parser.parse(args : _*)
+
+    val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)
+
+    def checkArgs() {
+      // check required args
+      CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+      if (options.has(describeOpt))
+        CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
+      if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))
+        CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt))
+
+      // check invalid args
+      CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allConsumerGroupLevelOpts - describeOpt)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fe9913e/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index c14bd45..c78a1b6 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -749,6 +749,26 @@ object ZkUtils extends Logging {
       }.flatten.toSet
     }
   }
+
+  def getConsumerGroups(zkClient: ZkClient) = {
+    ZkUtils.getChildren(zkClient, ConsumersPath)
+  }
+
+  def getTopicsByConsumerGroup(zkClient: ZkClient,consumerGroup:String) = {
+    ZkUtils.getChildrenParentMayNotExist(zkClient, new ZKGroupDirs(consumerGroup).consumerGroupOwnersDir)
+  }
+
+  def getAllConsumerGroupsForTopic(zkClient: ZkClient, topic: String): Set[String] = {
+    val groups = ZkUtils.getChildrenParentMayNotExist(zkClient, ConsumersPath)
+    if (groups == null) Set.empty
+    else {
+      groups.foldLeft(Set.empty[String]) {(consumerGroupsForTopic, group) =>
+        val topics = getChildren(zkClient, new ZKGroupDirs(group).consumerGroupOffsetsDir)
+        if (topics.contains(topic)) consumerGroupsForTopic + group
+        else consumerGroupsForTopic
+      }
+    }
+  }
 }
 
 object ZKStringSerializer extends ZkSerializer {
@@ -769,11 +789,13 @@ class ZKGroupDirs(val group: String) {
   def consumerDir = ZkUtils.ConsumersPath
   def consumerGroupDir = consumerDir + "/" + group
   def consumerRegistryDir = consumerGroupDir + "/ids"
+  def consumerGroupOffsetsDir = consumerGroupDir + "/offsets"
+  def consumerGroupOwnersDir = consumerGroupDir + "/owners"
 }
 
 class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) {
-  def consumerOffsetDir = consumerGroupDir + "/offsets/" + topic
-  def consumerOwnerDir = consumerGroupDir + "/owners/" + topic
+  def consumerOffsetDir = consumerGroupOffsetsDir + "/" + topic
+  def consumerOwnerDir = consumerGroupOwnersDir + "/" + topic
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fe9913e/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
new file mode 100644
index 0000000..d530338
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{ZKGroupDirs, ZKGroupTopicDirs, ZkUtils, TestUtils}
+import kafka.server.KafkaConfig
+import org.junit.Test
+import kafka.consumer._
+import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
+import kafka.integration.KafkaServerTestHarness
+
+
+class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness {
+  val configs = TestUtils.createBrokerConfigs(3, false, true).map(new KafkaConfig(_))
+
+  @Test
+  def testGroupWideDeleteInZK() {
+    val topic = "test"
+    val groupToDelete = "groupToDelete"
+    val otherGroup = "otherGroup"
+
+    TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+    fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
+    fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
+
+    AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete)
+
+    TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)),
+      "DeleteConsumerGroupInZK should delete the provided consumer group's directory")
+    TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(otherGroup)),
+      "DeleteConsumerGroupInZK should not delete unrelated consumer group directories")
+  }
+
+  @Test
+  def testGroupWideDeleteInZKDoesNothingForActiveConsumerGroup() {
+    val topic = "test"
+    val groupToDelete = "groupToDelete"
+    val otherGroup = "otherGroup"
+
+    TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+    fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, true)
+    fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
+
+    AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete)
+
+    TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(groupToDelete)),
+      "DeleteConsumerGroupInZK should not delete the provided consumer group's directory if the consumer group is still active")
+    TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(otherGroup)),
+      "DeleteConsumerGroupInZK should not delete unrelated consumer group directories")
+  }
+
+  @Test
+  def testGroupTopicWideDeleteInZKForGroupConsumingOneTopic() {
+    val topic = "test"
+    val groupToDelete = "groupToDelete"
+    val otherGroup = "otherGroup"
+    TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+    fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
+    fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
+
+    AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topic)
+
+    TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)),
+      "DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's directory if it just consumes from one topic")
+    TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(otherGroup, topic)),
+      "DeleteConsumerGroupInfoForTopicInZK should not delete unrelated consumer group owner and offset directories")
+  }
+
+  @Test
+  def testGroupTopicWideDeleteInZKForGroupConsumingMultipleTopics() {
+    val topicToDelete = "topicToDelete"
+    val otherTopic = "otherTopic"
+    val groupToDelete = "groupToDelete"
+    val otherGroup = "otherGroup"
+    TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
+    TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+
+    fillInConsumerGroupInfo(topicToDelete, groupToDelete, "consumer", 0, 10, false)
+    fillInConsumerGroupInfo(otherTopic, groupToDelete, "consumer", 0, 10, false)
+    fillInConsumerGroupInfo(topicToDelete, otherGroup, "consumer", 0, 10, false)
+
+    AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topicToDelete)
+
+    TestUtils.waitUntilTrue(() => !groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(groupToDelete, topicToDelete)),
+      "DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's owner and offset directories for the given topic")
+    TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(groupToDelete, otherTopic)),
+      "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for unrelated topics")
+    TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(otherGroup, topicToDelete)),
+      "DeleteConsumerGroupInfoForTopicInZK should not delete unrelated consumer group owner and offset directories")
+  }
+
+  @Test
+  def testGroupTopicWideDeleteInZKDoesNothingForActiveGroupConsumingMultipleTopics() {
+    val topicToDelete = "topicToDelete"
+    val otherTopic = "otherTopic"
+    val group = "group"
+    TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
+    TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+
+    fillInConsumerGroupInfo(topicToDelete, group, "consumer", 0, 10, true)
+    fillInConsumerGroupInfo(otherTopic, group, "consumer", 0, 10, true)
+
+    AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topicToDelete)
+
+    TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, topicToDelete)),
+      "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for the given topic if the consumer group is still active")
+    TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, otherTopic)),
+      "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for unrelated topics")
+  }
+
+  @Test
+  def testTopicWideDeleteInZK() {
+    val topicToDelete = "topicToDelete"
+    val otherTopic = "otherTopic"
+    val groups = Seq("group1", "group2")
+
+    TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
+    TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+    val groupTopicDirsForTopicToDelete = groups.map(group => new ZKGroupTopicDirs(group, topicToDelete))
+    val groupTopicDirsForOtherTopic = groups.map(group => new ZKGroupTopicDirs(group, otherTopic))
+    groupTopicDirsForTopicToDelete.foreach(dir => fillInConsumerGroupInfo(topicToDelete, dir.group, "consumer", 0, 10, false))
+    groupTopicDirsForOtherTopic.foreach(dir => fillInConsumerGroupInfo(otherTopic, dir.group, "consumer", 0, 10, false))
+
+    AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topicToDelete)
+
+    TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist),
+      "Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
+    TestUtils.waitUntilTrue(() => groupTopicDirsForOtherTopic.forall(groupTopicOffsetAndOwnerDirsExist),
+      "Consumer group info on unrelated topics should not be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
+  }
+
+  @Test
+  def testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK() {
+    val topic = "topic"
+    val group = "group"
+
+    TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+    val dir = new ZKGroupTopicDirs(group, topic)
+    fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, false)
+
+    AdminUtils.deleteTopic(zkClient, topic)
+    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic)
+
+    TestUtils.waitUntilTrue(() => !groupDirExists(dir),
+      "Consumer group info on related topics should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
+    //produce events
+    val producer = TestUtils.createNewProducer(brokerList)
+    produceEvents(producer, topic, List.fill(10)("test"))
+
+    //consume events
+    val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, "consumer")
+    consumerProps.put("auto.commit.enable", "false")
+    consumerProps.put("auto.offset.reset", "smallest")
+    consumerProps.put("consumer.timeout.ms", "2000")
+    consumerProps.put("fetch.wait.max.ms", "0")
+    val consumerConfig = new ConsumerConfig(consumerProps)
+    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+    val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head
+    consumeEvents(messageStream, 5)
+    consumerConnector.commitOffsets(false)
+    producer.close()
+    consumerConnector.shutdown()
+
+    TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(dir),
+      "Consumer group info should exist after consuming from a recreated topic")
+  }
+
+  private def fillInConsumerGroupInfo(topic: String, group: String, consumerId: String, partition: Int, offset: Int, registerConsumer: Boolean) {
+    val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, consumerId)
+    val consumerConfig = new ConsumerConfig(consumerProps)
+    val dir = new ZKGroupTopicDirs(group, topic)
+    TestUtils.updateConsumerOffset(consumerConfig, dir.consumerOffsetDir + "/" + partition, offset)
+    ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.getConsumerPartitionOwnerPath(group, topic, partition), "")
+    ZkUtils.makeSurePersistentPathExists(zkClient, dir.consumerRegistryDir)
+    if (registerConsumer) {
+      ZkUtils.createEphemeralPathExpectConflict(zkClient, dir.consumerRegistryDir + "/" + consumerId, "")
+    }
+  }
+
+  private def groupDirExists(dir: ZKGroupDirs) = {
+    ZkUtils.pathExists(zkClient, dir.consumerGroupDir)
+  }
+
+  private def groupTopicOffsetAndOwnerDirsExist(dir: ZKGroupTopicDirs) = {
+    ZkUtils.pathExists(zkClient, dir.consumerOffsetDir) && ZkUtils.pathExists(zkClient, dir.consumerOwnerDir)
+  }
+
+  private def produceEvents(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, messages: List[String]) {
+    messages.foreach(message => producer.send(new ProducerRecord(topic, message.getBytes)))
+  }
+
+  private def consumeEvents(messageStream: KafkaStream[Array[Byte], Array[Byte]], n: Int) {
+    val iter = messageStream.iterator
+    (0 until n).foreach(_ => iter.next)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fe9913e/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 33c2767..0cbd726 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -16,23 +16,15 @@
  */
 package kafka.admin
 
-import java.io.File
-
 import kafka.log.Log
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import junit.framework.Assert._
 import kafka.utils.{ZkUtils, TestUtils}
-import kafka.server.{OffsetCheckpoint, KafkaServer, KafkaConfig}
+import kafka.server.{KafkaServer, KafkaConfig}
 import org.junit.Test
-import kafka.common._
-import kafka.producer.{ProducerConfig, Producer}
 import java.util.Properties
-import kafka.api._
-import kafka.consumer.SimpleConsumer
-import kafka.producer.KeyedMessage
 import kafka.common.TopicAndPartition
-import kafka.api.PartitionOffsetRequestInfo
 
 class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -43,7 +35,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val servers = createTestTopicAndCluster(topic)
     // start topic deletion
     AdminUtils.deleteTopic(zkClient, topic)
-    verifyTopicDeletion(topic, servers)
+    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
     servers.foreach(_.shutdown())
   }
 
@@ -68,7 +60,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
       "Admin path /admin/delete_topic/test path deleted even when a follower replica is down")
     // restart follower replica
     follower.startup()
-    verifyTopicDeletion(topic, servers)
+    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
     servers.foreach(_.shutdown())
   }
 
@@ -95,7 +87,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     controller.startup()
     follower.startup()
 
-    verifyTopicDeletion(topic, servers)
+    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
     servers.foreach(_.shutdown())
   }
 
@@ -141,7 +133,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
     assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas)
     follower.startup()
-    verifyTopicDeletion(topic, servers)
+    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
     allServers.foreach(_.shutdown())
   }
 
@@ -160,7 +152,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     AdminUtils.deleteTopic(zkClient, topic)
     follower.startup()
     // test if topic deletion is resumed
-    verifyTopicDeletion(topic, servers)
+    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
     // verify that new partition doesn't exist on any broker either
     TestUtils.waitUntilTrue(() =>
       servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty),
@@ -178,7 +170,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     // add partitions to topic
     val newPartition = TopicAndPartition(topic, 1)
     AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2")
-    verifyTopicDeletion(topic, servers)
+    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
     // verify that new partition doesn't exist on any broker either
     assertTrue("Replica logs not deleted after delete topic is complete",
       servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty))
@@ -193,7 +185,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val servers = createTestTopicAndCluster(topic)
     // start topic deletion
     AdminUtils.deleteTopic(zkClient, topic)
-    verifyTopicDeletion(topic, servers)
+    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
     // re-create topic on same replicas
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // wait until leader is elected
@@ -213,7 +205,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     // start topic deletion
     AdminUtils.deleteTopic(zkClient, "test2")
     // verify delete topic path for test2 is removed from zookeeper
-    verifyTopicDeletion("test2", servers)
+    TestUtils.verifyTopicDeletion(zkClient, "test2", 1, servers)
     // verify that topic test is untouched
     TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
       res && server.getLogManager().getLog(topicAndPartition).isDefined),
@@ -252,7 +244,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // delete topic
     AdminUtils.deleteTopic(zkClient, "test")
-    verifyTopicDeletion("test", servers)
+    TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
 
     servers.foreach(_.shutdown())
   }
@@ -279,30 +271,6 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     servers
   }
 
-  private def verifyTopicDeletion(topic: String, servers: Seq[KafkaServer]) {
-    val topicAndPartition = TopicAndPartition(topic, 0)
-    // wait until admin path for delete topic is deleted, signaling completion of topic deletion
-    TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
-      "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted")
-    TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)),
-      "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted")
-    // ensure that the topic-partition has been deleted from all brokers' replica managers
-    TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.replicaManager.getPartition(topic, 0) == None),
-      "Replica manager's should have deleted all of this topic's partitions")
-    // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
-    assertTrue("Replica logs not deleted after delete topic is complete",
-      servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty))
-    // ensure that topic is removed from all cleaner offsets
-    TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res,server) => res &&
-    {
-      val topicAndPartition = TopicAndPartition(topic,0)
-      val logdir = server.getLogManager().logDirs(0)
-      val checkpoints =  new OffsetCheckpoint(new File(logdir,"cleaner-offset-checkpoint")).read()
-      !checkpoints.contains(topicAndPartition)
-    }),
-      "Cleaner offset for deleted partition should have been removed")
-  }
-
   private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
     var counter = 0
     for(dup <- 0 until numDups; key <- 0 until numKeys) yield {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fe9913e/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 54755e8..21d0ed2 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -140,9 +140,10 @@ object TestUtils extends Logging {
    * Create a test config for the given node id
    */
   def createBrokerConfigs(numConfigs: Int,
-    enableControlledShutdown: Boolean = true): List[Properties] = {
+    enableControlledShutdown: Boolean = true,
+    enableDeleteTopic: Boolean = false): List[Properties] = {
     for((port, node) <- choosePorts(numConfigs).zipWithIndex)
-    yield createBrokerConfig(node, port, enableControlledShutdown)
+    yield createBrokerConfig(node, port, enableControlledShutdown, enableDeleteTopic)
   }
 
   def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
@@ -153,7 +154,8 @@ object TestUtils extends Logging {
    * Create a test config for the given node id
    */
   def createBrokerConfig(nodeId: Int, port: Int = choosePort(),
-    enableControlledShutdown: Boolean = true): Properties = {
+    enableControlledShutdown: Boolean = true,
+    enableDeleteTopic: Boolean = false): Properties = {
     val props = new Properties
     if (nodeId >= 0) props.put("broker.id", nodeId.toString)
     props.put("host.name", "localhost")
@@ -162,6 +164,7 @@ object TestUtils extends Logging {
     props.put("zookeeper.connect", TestZKUtils.zookeeperConnect)
     props.put("replica.socket.timeout.ms", "1500")
     props.put("controlled.shutdown.enable", enableControlledShutdown.toString)
+    props.put("delete.topic.enable", enableDeleteTopic.toString)
     props
   }
 
@@ -793,6 +796,30 @@ object TestUtils extends Logging {
     }
     messages.reverse
   }
+
+  def verifyTopicDeletion(zkClient: ZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) {
+    val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
+    // wait until admin path for delete topic is deleted, signaling completion of topic deletion
+    TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
+      "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted")
+    TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)),
+      "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted")
+    // ensure that the topic-partition has been deleted from all brokers' replica managers
+    TestUtils.waitUntilTrue(() =>
+      servers.forall(server => topicAndPartitions.forall(tp => server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
+      "Replica manager's should have deleted all of this topic's partitions")
+    // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
+    assertTrue("Replica logs not deleted after delete topic is complete",
+      servers.forall(server => topicAndPartitions.forall(tp => server.getLogManager().getLog(tp).isEmpty)))
+    // ensure that topic is removed from all cleaner offsets
+    TestUtils.waitUntilTrue(() => servers.forall(server => topicAndPartitions.forall { tp =>
+      val checkpoints = server.getLogManager().logDirs.map { logDir =>
+        new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
+      }
+      checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
+    }), "Cleaner offset for deleted partition should have been removed")
+  }
+
 }
 
 object TestZKUtils {