You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/16 23:04:29 UTC

kafka git commit: KAFKA-2831; Do not use ZKUtils in `ConsumerGroupCommand` if `new-consumer` is used

Repository: kafka
Updated Branches:
  refs/heads/trunk 4a3d244a2 -> 5fc4546de


KAFKA-2831; Do not use ZKUtils in `ConsumerGroupCommand` if `new-consumer` is used

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ashish Singh <as...@cloudera.com>, Jun Rao <ju...@gmail.com>

Closes #528 from ijuma/kafka-2831-consumer-group-command-zookeeper-new-consumer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5fc4546d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5fc4546d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5fc4546d

Branch: refs/heads/trunk
Commit: 5fc4546de7f238a8ee9c6f0b4fe276f0da47707c
Parents: 4a3d244
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Nov 16 14:04:26 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Nov 16 14:04:26 2015 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/admin/AdminClient.scala    |  16 +-
 .../kafka/admin/ConsumerGroupCommand.scala      | 542 ++++++++++---------
 2 files changed, 310 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5fc4546d/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 1dea28b..181080f 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -146,11 +146,10 @@ class AdminClient(val time: Time,
     GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
   }
 
-  case class ConsumerSummary(
-                              memberId: String,
-                              clientId: String,
-                              clientHost: String,
-                              assignment: List[TopicPartition])
+  case class ConsumerSummary(memberId: String,
+                             clientId: String,
+                             clientHost: String,
+                             assignment: List[TopicPartition])
 
   def describeConsumerGroup(groupId: String): List[ConsumerSummary] = {
     val group = describeGroup(groupId)
@@ -169,6 +168,11 @@ class AdminClient(val time: Time,
       List.empty
     }
   }
+
+  def close() {
+    client.close()
+  }
+
 }
 
 object AdminClient {
@@ -249,4 +253,4 @@ object AdminClient {
       highLevelClient,
       bootstrapCluster.nodes().asScala.toList)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5fc4546d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index c29efe4..2d95767 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -17,7 +17,6 @@
 
 package kafka.admin
 
-
 import java.util.Properties
 
 import joptsimple.{OptionParser, OptionSpec}
@@ -33,7 +32,7 @@ import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.{Set, mutable}
 
 object ConsumerGroupCommand {
@@ -41,306 +40,352 @@ object ConsumerGroupCommand {
   def main(args: Array[String]) {
     val opts = new ConsumerGroupCommandOptions(args)
 
-    if(args.length == 0)
+    if (args.length == 0)
       CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.")
 
     // should have exactly one action
     val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
-    if(actions != 1)
+    if (actions != 1)
       CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete")
 
     opts.checkArgs()
 
-    val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt), 
-                          30000,
-                          30000,
-                          JaasUtils.isZkSecurityEnabled())
+    val consumerGroupService = {
+      if (opts.options.has(opts.newConsumerOpt)) new KafkaConsumerGroupService(opts)
+      else new ZkConsumerGroupService(opts)
+    }
 
     try {
       if (opts.options.has(opts.listOpt))
-        list(zkUtils, opts)
+        consumerGroupService.list()
       else if (opts.options.has(opts.describeOpt))
-        describe(zkUtils, opts)
-      else if (opts.options.has(opts.deleteOpt))
-        delete(zkUtils, opts)
+        consumerGroupService.describe()
+      else if (opts.options.has(opts.deleteOpt)) {
+        consumerGroupService match {
+          case service: ZkConsumerGroupService => service.delete()
+          case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService")
+        }
+      }
     } catch {
       case e: Throwable =>
         println("Error while executing consumer group command " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {
-      zkUtils.close()
+      consumerGroupService.close()
     }
   }
 
-  def list(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
-    val useNewConsumer = opts.options.has(opts.newConsumerOpt)
-    if (!useNewConsumer)
-      zkUtils.getConsumerGroups().foreach(println)
-    else {
-      val adminClient = createAndGetAdminClient(opts)
-      adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId))
-    }
+  private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = {
+    val configsToBeAdded = opts.options.valuesOf(opts.configOpt).asScala.map(_.split("""\s*=\s*"""))
+    require(configsToBeAdded.forall(config => config.length == 2),
+      "Invalid config: all configs to be added must be in the format \"key=val\".")
+    val props = new Properties
+    configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
+    props
   }
 
-  def createAndGetAdminClient(opts: ConsumerGroupCommandOptions): AdminClient = {
-    AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt))
-  }
+  sealed trait ConsumerGroupService {
 
-  def describe(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
-    val useNewConsumer = opts.options.has(opts.newConsumerOpt)
-    val group = opts.options.valueOf(opts.groupOpt)
-    val configs = parseConfigs(opts)
-    val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
-    val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
+    def list(): Unit
 
-    println("%s, %s, %s, %s, %s, %s, %s"
-      .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER"))
+    def describe() {
+      describeGroup(opts.options.valueOf(opts.groupOpt))
+    }
 
-    if (!useNewConsumer) {
-      val topics = zkUtils.getTopicsByConsumerGroup(group)
-      if (topics.isEmpty) {
-        println("No topic available for consumer group provided")
-      } else {
-        topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts))
-      }
-    } else {
-      val consumers = createAndGetAdminClient(opts).describeConsumerGroup(group)
+    def close(): Unit
 
-      if (consumers.isEmpty) {
-        println(s"Consumer group, ${group}, does not exist or is rebalancing.")
-      } else {
-        consumers.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x.assignment.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), Option("%s_%s".format(x.clientId, x.clientHost))))
+    protected def opts: ConsumerGroupCommandOptions
+
+    protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult
+
+    protected def describeGroup(group: String): Unit
+
+    protected def describeTopicPartition(group: String,
+                                         topicPartitions: Seq[TopicAndPartition],
+                                         getPartitionOffset: TopicAndPartition => Option[Long],
+                                         getOwner: TopicAndPartition => Option[String]): Unit = {
+      topicPartitions
+        .sortBy { case topicPartition => topicPartition.partition }
+        .foreach { topicPartition =>
+          describePartition(group, topicPartition.topic, topicPartition.partition, getPartitionOffset(topicPartition),
+            getOwner(topicPartition))
+        }
+    }
+
+    protected def printDescribeHeader() {
+      println("GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER")
+    }
+
+    private def describePartition(group: String,
+                                  topic: String,
+                                  partition: Int,
+                                  offsetOpt: Option[Long],
+                                  ownerOpt: Option[String]) {
+      def print(logEndOffset: Option[Long]): Unit = {
+        val lag = offsetOpt.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset))
+        println(Seq(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset.getOrElse("unknown"),
+          lag.getOrElse("unknown"), ownerOpt.getOrElse("none")).mkString(", "))
+      }
+      getLogEndOffset(topic, partition) match {
+        case LogEndOffsetResult.LogEndOffset(logEndOffset) => print(Some(logEndOffset))
+        case LogEndOffsetResult.Unknown => print(None)
+        case LogEndOffsetResult.Ignore =>
       }
     }
+
   }
 
-  def delete(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
-    if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt)) {
-      deleteForTopic(zkUtils, opts)
+  class ZkConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {
+
+    private val zkUtils = {
+      val zkUrl = opts.options.valueOf(opts.zkConnectOpt)
+      ZkUtils(zkUrl, 30000, 30000, JaasUtils.isZkSecurityEnabled)
     }
-    else if (opts.options.has(opts.groupOpt)) {
-      deleteForGroup(zkUtils, opts)
+
+    def close() {
+      zkUtils.close()
     }
-    else if (opts.options.has(opts.topicOpt)) {
-      deleteAllForTopic(zkUtils, opts)
+
+    def list() {
+      zkUtils.getConsumerGroups().foreach(println)
     }
-  }
 
-  private def deleteForGroup(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
-    val groups = opts.options.valuesOf(opts.groupOpt)
-    groups.foreach { group =>
-      try {
-        if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group))
-          println("Deleted all consumer group information for group %s in zookeeper.".format(group))
+    def delete() {
+      if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt))
+        deleteForTopic()
+      else if (opts.options.has(opts.groupOpt))
+        deleteForGroup()
+      else if (opts.options.has(opts.topicOpt))
+        deleteAllForTopic()
+    }
+
+    protected def describeGroup(group: String) {
+      val configs = parseConfigs(opts)
+      val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
+      val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
+      val topics = zkUtils.getTopicsByConsumerGroup(group)
+      if (topics.isEmpty)
+        println("No topic available for consumer group provided")
+      printDescribeHeader()
+      topics.foreach(topic => describeTopic(group, topic, channelSocketTimeoutMs, channelRetryBackoffMs))
+    }
+
+    private def describeTopic(group: String,
+                              topic: String,
+                              channelSocketTimeoutMs: Int,
+                              channelRetryBackoffMs: Int) {
+      val topicPartitions = getTopicPartitions(topic)
+      val groupDirs = new ZKGroupTopicDirs(group, topic)
+      val ownerByTopicPartition = topicPartitions.flatMap { topicPartition =>
+        zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + topicPartition.partition)._1.map { owner =>
+          topicPartition -> owner
+        }
+      }.toMap
+      val partitionOffsets = getPartitionOffsets(group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
+      describeTopicPartition(group, topicPartitions, partitionOffsets.get, ownerByTopicPartition.get)
+    }
+
+    private def getTopicPartitions(topic: String): Seq[TopicAndPartition] = {
+      val topicPartitionMap = zkUtils.getPartitionsForTopics(Seq(topic))
+      val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
+      partitions.map(TopicAndPartition(topic, _))
+    }
+
+    protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
+      zkUtils.getLeaderForPartition(topic, partition) match {
+        case Some(-1) => LogEndOffsetResult.Unknown
+        case Some(brokerId) =>
+          getZkConsumer(brokerId).map { consumer =>
+            val topicAndPartition = new TopicAndPartition(topic, partition)
+            val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+            val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+            consumer.close()
+            LogEndOffsetResult.LogEndOffset(logEndOffset)
+          }.getOrElse(LogEndOffsetResult.Ignore)
+        case None =>
+          println(s"No broker for partition ${new TopicPartition(topic, partition)}")
+          LogEndOffsetResult.Ignore
+      }
+    }
+
+    private def getPartitionOffsets(group: String,
+                                    topicPartitions: Seq[TopicAndPartition],
+                                    channelSocketTimeoutMs: Int,
+                                    channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = {
+      val offsetMap = mutable.Map[TopicAndPartition, Long]()
+      val channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs)
+      channel.send(OffsetFetchRequest(group, topicPartitions))
+      val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload())
+
+      offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
+        if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
+          val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
+          // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
+          // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
+          try {
+            val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
+            offsetMap.put(topicAndPartition, offset)
+          } catch {
+            case z: ZkNoNodeException =>
+              println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper."
+                .format(group, topicAndPartition))
+          }
+        }
+        else if (offsetAndMetadata.error == ErrorMapping.NoError)
+          offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
         else
-          println("Delete for group %s failed because its consumers are still active.".format(group))
+          println("Could not fetch offset from kafka for group %s partition %s due to %s."
+            .format(group, topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
       }
-      catch {
-        case e: ZkNoNodeException =>
-          println("Delete for group %s failed because group does not exist.".format(group))
+      channel.disconnect()
+      offsetMap.toMap
+    }
+
+    private def deleteForGroup() {
+      val groups = opts.options.valuesOf(opts.groupOpt)
+      groups.asScala.foreach { group =>
+        try {
+          if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group))
+            println("Deleted all consumer group information for group %s in zookeeper.".format(group))
+          else
+            println("Delete for group %s failed because its consumers are still active.".format(group))
+        }
+        catch {
+          case e: ZkNoNodeException =>
+            println("Delete for group %s failed because group does not exist.".format(group))
+        }
       }
     }
-  }
 
-  private def deleteForTopic(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
-    val groups = opts.options.valuesOf(opts.groupOpt)
-    val topic = opts.options.valueOf(opts.topicOpt)
-    Topic.validate(topic)
-    groups.foreach { group =>
-      try {
-        if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
-          println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic))
-        else
-          println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic))
+    private def deleteForTopic() {
+      val groups = opts.options.valuesOf(opts.groupOpt)
+      val topic = opts.options.valueOf(opts.topicOpt)
+      Topic.validate(topic)
+      groups.asScala.foreach { group =>
+        try {
+          if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
+            println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic))
+          else
+            println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic))
+        }
+        catch {
+          case e: ZkNoNodeException =>
+            println("Delete for group %s topic %s failed because group does not exist.".format(group, topic))
+        }
       }
-      catch {
-        case e: ZkNoNodeException =>
-          println("Delete for group %s topic %s failed because group does not exist.".format(group, topic))
+    }
+
+    private def deleteAllForTopic() {
+      val topic = opts.options.valueOf(opts.topicOpt)
+      Topic.validate(topic)
+      AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
+      println("Deleted consumer group information for all inactive consumer groups for topic %s in zookeeper.".format(topic))
+    }
+
+    private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
+      try {
+        zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
+          case Some(brokerInfoString) =>
+            Json.parseFull(brokerInfoString) match {
+              case Some(m) =>
+                val brokerInfo = m.asInstanceOf[Map[String, Any]]
+                val host = brokerInfo.get("host").get.asInstanceOf[String]
+                val port = brokerInfo.get("port").get.asInstanceOf[Int]
+                Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand"))
+              case None =>
+                throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
+            }
+          case None =>
+            throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
+        }
+      } catch {
+        case t: Throwable =>
+          println("Could not parse broker info due to " + t.getMessage)
+          None
       }
     }
-  }
 
-  private def deleteAllForTopic(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
-    val topic = opts.options.valueOf(opts.topicOpt)
-    Topic.validate(topic)
-    AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
-    println("Deleted consumer group information for all inactive consumer groups for topic %s in zookeeper.".format(topic))
   }
 
-  private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = {
-    val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))
-    require(configsToBeAdded.forall(config => config.length == 2),
-            "Invalid config: all configs to be added must be in the format \"key=val\".")
-    val props = new Properties
-    configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
-    props
-  }
+  class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {
 
-  private def describeTopic(zkUtils: ZkUtils,
-                            group: String,
-                            topic: String,
-                            channelSocketTimeoutMs: Int,
-                            channelRetryBackoffMs: Int,
-                            opts: ConsumerGroupCommandOptions) {
-    val topicPartitions = getTopicPartitions(zkUtils, topic)
-    describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, topicPartitions)
-  }
+    private val adminClient = createAdminClient()
 
-  def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition], owner: Option[String] = None): Unit = {
-    val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
-    topicPartitions
-      .sortBy { case topicPartition => topicPartition.partition }
-      .foreach { topicPartition =>
-      describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition), opts, owner)
-    }
-  }
+    // `consumer` is only needed for `describe`, so we instantiate it lazily
+    private var consumer: KafkaConsumer[String, String] = null
 
-  private def getTopicPartitions(zkUtils: ZkUtils, topic: String) = {
-    val topicPartitionMap = zkUtils.getPartitionsForTopics(Seq(topic))
-    val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
-    partitions.map(TopicAndPartition(topic, _))
-  }
+    def list() {
+      adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId))
+    }
 
-  private def getPartitionOffsets(zkUtils: ZkUtils,
-                                  group: String,
-                                  topicPartitions: Seq[TopicAndPartition],
-                                  channelSocketTimeoutMs: Int,
-                                  channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = {
-    val offsetMap = mutable.Map[TopicAndPartition, Long]()
-    val channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs)
-    channel.send(OffsetFetchRequest(group, topicPartitions))
-    val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload())
-
-    offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
-      if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
-        val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
-        // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
-        // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
-        try {
-          val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
-          offsetMap.put(topicAndPartition, offset)
-        } catch {
-          case z: ZkNoNodeException =>
-            println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper."
-              .format(group, topicAndPartition))
+    protected def describeGroup(group: String) {
+      val consumerSummaries = adminClient.describeConsumerGroup(group)
+      if (consumerSummaries.isEmpty)
+        println(s"Consumer group `${group}` does not exist or is rebalancing.")
+      else {
+        val consumer = getConsumer()
+        printDescribeHeader()
+        consumerSummaries.foreach { consumerSummary =>
+          val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
+          val partitionOffsets = topicPartitions.flatMap { topicPartition =>
+            Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
+              topicPartition -> offsetAndMetadata.offset
+            }
+          }.toMap
+          describeTopicPartition(group, topicPartitions, partitionOffsets.get,
+            _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
         }
       }
-      else if (offsetAndMetadata.error == ErrorMapping.NoError)
-        offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
-      else
-        println("Could not fetch offset from kafka for group %s partition %s due to %s."
-          .format(group, topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
     }
-    channel.disconnect()
-    offsetMap.toMap
-  }
 
-  private def describePartition(zkUtils: ZkUtils,
-                                group: String,
-                                topic: String,
-                                partition: Int,
-                                offsetOpt: Option[Long],
-                                opts: ConsumerGroupCommandOptions,
-                                ownerOpt: Option[String] = None) {
-    val topicPartition = new TopicPartition(topic, partition)
-    val groupDirs = new ZKGroupTopicDirs(group, topic)
-    val useNewConsumer: Boolean = opts.options.has(opts.newConsumerOpt)
-    val owner: Option[String] = if (useNewConsumer) ownerOpt else zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1
-    def print(logEndOffset: Long): Unit = {
-      val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _)
-      println("%s, %s, %s, %s, %s, %s, %s"
-        .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none")))
+    protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
+      val consumer = getConsumer()
+      val topicPartition = new TopicPartition(topic, partition)
+      consumer.assign(List(topicPartition).asJava)
+      consumer.seekToEnd(topicPartition)
+      val logEndOffset = consumer.position(topicPartition)
+      LogEndOffsetResult.LogEndOffset(logEndOffset)
     }
-    zkUtils.getLeaderForPartition(topic, partition) match {
-      case Some(-1) =>
-        println("%s, %s, %s, %s, %s, %s, %s"
-          .format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none")))
-      case Some(brokerId) =>
-        if (useNewConsumer) {
-          val consumerOpt = getNewConsumer(zkUtils, brokerId)
-          consumerOpt match {
-            case Some(consumer) =>
-              consumer.assign(List(topicPartition))
-              consumer.seekToEnd(topicPartition)
-              val logEndOffset = consumer.position(topicPartition)
-              consumer.close()
-              print(logEndOffset)
-            case None => // ignore
-          }
-        } else {
-          val consumerOpt = getZkConsumer(zkUtils, brokerId)
-          consumerOpt match {
-            case Some(consumer) =>
-              val topicAndPartition: TopicAndPartition = new TopicAndPartition(topicPartition.topic(), topicPartition.partition())
-              val request =
-                OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
-              val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-              consumer.close()
-              print(logEndOffset)
-            case None => // ignore
-          }
-        }
-      case None =>
-        println("No broker for partition %s".format(topicPartition))
+
+    def close() {
+      adminClient.close()
+      if (consumer != null) consumer.close()
     }
-  }
 
-  private def getNewConsumer(zkUtils: ZkUtils, brokerId: Int): Option[KafkaConsumer[String, String]] = {
-    try {
-      zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
-        case Some(brokerInfoString) =>
-          Json.parseFull(brokerInfoString) match {
-            case Some(m) =>
-              val brokerInfo = m.asInstanceOf[Map[String, Any]]
-              val host = brokerInfo.get("host").get.asInstanceOf[String]
-              val port = brokerInfo.get("port").get.asInstanceOf[Int]
-              val deserializer: String = (new StringDeserializer).getClass.getName
-              val properties: Properties = new Properties()
-              properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host + ":" + port)
-              properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroupCommand")
-              properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-              properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
-              properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
-              properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
-              Some(new KafkaConsumer[String, String](properties))
-            case None =>
-              throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
-          }
-        case None =>
-          throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
-      }
-    } catch {
-      case t: Throwable =>
-        println("Could not parse broker info due to " + t.getMessage)
-        None
+    private def createAdminClient(): AdminClient =
+      AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt))
+
+    private def getConsumer() = {
+      if (consumer == null)
+        consumer = createNewConsumer()
+      consumer
     }
-  }
 
-  private def getZkConsumer(zkUtils: ZkUtils, brokerId: Int): Option[SimpleConsumer] = {
-    try {
-      zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
-        case Some(brokerInfoString) =>
-          Json.parseFull(brokerInfoString) match {
-            case Some(m) =>
-              val brokerInfo = m.asInstanceOf[Map[String, Any]]
-              val host = brokerInfo.get("host").get.asInstanceOf[String]
-              val port = brokerInfo.get("port").get.asInstanceOf[Int]
-              Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand"))
-            case None =>
-              throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
-          }
-        case None =>
-          throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
-      }
-    } catch {
-      case t: Throwable =>
-        println("Could not parse broker info due to " + t.getMessage)
-        None
+    private def createNewConsumer(): KafkaConsumer[String, String] = {
+      val properties = new Properties()
+      val deserializer = (new StringDeserializer).getClass.getName
+      val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)
+      properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
+      properties.put(ConsumerConfig.GROUP_ID_CONFIG, opts.options.valueOf(opts.groupOpt))
+      properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+      properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
+      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
+      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
+      new KafkaConsumer(properties)
     }
+
+  }
+
+  sealed trait LogEndOffsetResult
+
+  object LogEndOffsetResult {
+    case class LogEndOffset(value: Long) extends LogEndOffsetResult
+    case object Unknown extends LogEndOffsetResult
+    case object Ignore extends LogEndOffsetResult
   }
 
   class ConsumerGroupCommandOptions(args: Array[String]) {
-    val ZkConnectDoc = "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+    val ZkConnectDoc = "REQUIRED (unless new-consumer is used): The connection string for the zookeeper connection in the form host:port. " +
       "Multiple URLS can be given to allow fail-over."
     val BootstrapServerDoc = "REQUIRED (only when using new-consumer): The server to connect to."
     val GroupDoc = "The consumer group we wish to act on."
@@ -390,10 +435,23 @@ object ConsumerGroupCommand {
       // check required args
       if (options.has(newConsumerOpt)) {
         CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
+
+        if (options.has(zkConnectOpt))
+          CommandLineUtils.printUsageAndDie(parser, s"Option $zkConnectOpt is not valid with $newConsumerOpt")
+
         if (options.has(deleteOpt))
-          CommandLineUtils.printUsageAndDie(parser, "Option %s does not work with %s".format(deleteOpt, newConsumerOpt))
-      } else
+          CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is not valid with $newConsumerOpt. Note that " +
+            "there's no need to delete group metadata for the new consumer as it is automatically deleted when the last " +
+            "member leaves")
+
+      } else {
         CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+
+        if (options.has(bootstrapServerOpt))
+          CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is only valid with $newConsumerOpt")
+
+      }
+
       if (options.has(describeOpt))
         CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
       if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))