You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/16 23:04:29 UTC
kafka git commit: KAFKA-2831;
Do not use ZKUtils in `ConsumerGroupCommand` if `new-consumer` is used
Repository: kafka
Updated Branches:
refs/heads/trunk 4a3d244a2 -> 5fc4546de
KAFKA-2831; Do not use ZKUtils in `ConsumerGroupCommand` if `new-consumer` is used
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ashish Singh <as...@cloudera.com>, Jun Rao <ju...@gmail.com>
Closes #528 from ijuma/kafka-2831-consumer-group-command-zookeeper-new-consumer
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5fc4546d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5fc4546d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5fc4546d
Branch: refs/heads/trunk
Commit: 5fc4546de7f238a8ee9c6f0b4fe276f0da47707c
Parents: 4a3d244
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Nov 16 14:04:26 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Nov 16 14:04:26 2015 -0800
----------------------------------------------------------------------
.../main/scala/kafka/admin/AdminClient.scala | 16 +-
.../kafka/admin/ConsumerGroupCommand.scala | 542 ++++++++++---------
2 files changed, 310 insertions(+), 248 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5fc4546d/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 1dea28b..181080f 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -146,11 +146,10 @@ class AdminClient(val time: Time,
GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
}
- case class ConsumerSummary(
- memberId: String,
- clientId: String,
- clientHost: String,
- assignment: List[TopicPartition])
+ case class ConsumerSummary(memberId: String,
+ clientId: String,
+ clientHost: String,
+ assignment: List[TopicPartition])
def describeConsumerGroup(groupId: String): List[ConsumerSummary] = {
val group = describeGroup(groupId)
@@ -169,6 +168,11 @@ class AdminClient(val time: Time,
List.empty
}
}
+
+ def close() {
+ client.close()
+ }
+
}
object AdminClient {
@@ -249,4 +253,4 @@ object AdminClient {
highLevelClient,
bootstrapCluster.nodes().asScala.toList)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5fc4546d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index c29efe4..2d95767 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -17,7 +17,6 @@
package kafka.admin
-
import java.util.Properties
import joptsimple.{OptionParser, OptionSpec}
@@ -33,7 +32,7 @@ import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.utils.Utils
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.{Set, mutable}
object ConsumerGroupCommand {
@@ -41,306 +40,352 @@ object ConsumerGroupCommand {
def main(args: Array[String]) {
val opts = new ConsumerGroupCommandOptions(args)
- if(args.length == 0)
+ if (args.length == 0)
CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.")
// should have exactly one action
val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
- if(actions != 1)
+ if (actions != 1)
CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete")
opts.checkArgs()
- val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
- 30000,
- 30000,
- JaasUtils.isZkSecurityEnabled())
+ val consumerGroupService = {
+ if (opts.options.has(opts.newConsumerOpt)) new KafkaConsumerGroupService(opts)
+ else new ZkConsumerGroupService(opts)
+ }
try {
if (opts.options.has(opts.listOpt))
- list(zkUtils, opts)
+ consumerGroupService.list()
else if (opts.options.has(opts.describeOpt))
- describe(zkUtils, opts)
- else if (opts.options.has(opts.deleteOpt))
- delete(zkUtils, opts)
+ consumerGroupService.describe()
+ else if (opts.options.has(opts.deleteOpt)) {
+ consumerGroupService match {
+ case service: ZkConsumerGroupService => service.delete()
+ case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService")
+ }
+ }
} catch {
case e: Throwable =>
println("Error while executing consumer group command " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
- zkUtils.close()
+ consumerGroupService.close()
}
}
- def list(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
- val useNewConsumer = opts.options.has(opts.newConsumerOpt)
- if (!useNewConsumer)
- zkUtils.getConsumerGroups().foreach(println)
- else {
- val adminClient = createAndGetAdminClient(opts)
- adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId))
- }
+ private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = {
+ val configsToBeAdded = opts.options.valuesOf(opts.configOpt).asScala.map(_.split("""\s*=\s*"""))
+ require(configsToBeAdded.forall(config => config.length == 2),
+ "Invalid config: all configs to be added must be in the format \"key=val\".")
+ val props = new Properties
+ configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
+ props
}
- def createAndGetAdminClient(opts: ConsumerGroupCommandOptions): AdminClient = {
- AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt))
- }
+ sealed trait ConsumerGroupService {
- def describe(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
- val useNewConsumer = opts.options.has(opts.newConsumerOpt)
- val group = opts.options.valueOf(opts.groupOpt)
- val configs = parseConfigs(opts)
- val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
- val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
+ def list(): Unit
- println("%s, %s, %s, %s, %s, %s, %s"
- .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER"))
+ def describe() {
+ describeGroup(opts.options.valueOf(opts.groupOpt))
+ }
- if (!useNewConsumer) {
- val topics = zkUtils.getTopicsByConsumerGroup(group)
- if (topics.isEmpty) {
- println("No topic available for consumer group provided")
- } else {
- topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts))
- }
- } else {
- val consumers = createAndGetAdminClient(opts).describeConsumerGroup(group)
+ def close(): Unit
- if (consumers.isEmpty) {
- println(s"Consumer group, ${group}, does not exist or is rebalancing.")
- } else {
- consumers.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x.assignment.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), Option("%s_%s".format(x.clientId, x.clientHost))))
+ protected def opts: ConsumerGroupCommandOptions
+
+ protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult
+
+ protected def describeGroup(group: String): Unit
+
+ protected def describeTopicPartition(group: String,
+ topicPartitions: Seq[TopicAndPartition],
+ getPartitionOffset: TopicAndPartition => Option[Long],
+ getOwner: TopicAndPartition => Option[String]): Unit = {
+ topicPartitions
+ .sortBy { case topicPartition => topicPartition.partition }
+ .foreach { topicPartition =>
+ describePartition(group, topicPartition.topic, topicPartition.partition, getPartitionOffset(topicPartition),
+ getOwner(topicPartition))
+ }
+ }
+
+ protected def printDescribeHeader() {
+ println("GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER")
+ }
+
+ private def describePartition(group: String,
+ topic: String,
+ partition: Int,
+ offsetOpt: Option[Long],
+ ownerOpt: Option[String]) {
+ def print(logEndOffset: Option[Long]): Unit = {
+ val lag = offsetOpt.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset))
+ println(Seq(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset.getOrElse("unknown"),
+ lag.getOrElse("unknown"), ownerOpt.getOrElse("none")).mkString(", "))
+ }
+ getLogEndOffset(topic, partition) match {
+ case LogEndOffsetResult.LogEndOffset(logEndOffset) => print(Some(logEndOffset))
+ case LogEndOffsetResult.Unknown => print(None)
+ case LogEndOffsetResult.Ignore =>
}
}
+
}
- def delete(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
- if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt)) {
- deleteForTopic(zkUtils, opts)
+ class ZkConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {
+
+ private val zkUtils = {
+ val zkUrl = opts.options.valueOf(opts.zkConnectOpt)
+ ZkUtils(zkUrl, 30000, 30000, JaasUtils.isZkSecurityEnabled)
}
- else if (opts.options.has(opts.groupOpt)) {
- deleteForGroup(zkUtils, opts)
+
+ def close() {
+ zkUtils.close()
}
- else if (opts.options.has(opts.topicOpt)) {
- deleteAllForTopic(zkUtils, opts)
+
+ def list() {
+ zkUtils.getConsumerGroups().foreach(println)
}
- }
- private def deleteForGroup(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
- val groups = opts.options.valuesOf(opts.groupOpt)
- groups.foreach { group =>
- try {
- if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group))
- println("Deleted all consumer group information for group %s in zookeeper.".format(group))
+ def delete() {
+ if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt))
+ deleteForTopic()
+ else if (opts.options.has(opts.groupOpt))
+ deleteForGroup()
+ else if (opts.options.has(opts.topicOpt))
+ deleteAllForTopic()
+ }
+
+ protected def describeGroup(group: String) {
+ val configs = parseConfigs(opts)
+ val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
+ val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
+ val topics = zkUtils.getTopicsByConsumerGroup(group)
+ if (topics.isEmpty)
+ println("No topic available for consumer group provided")
+ printDescribeHeader()
+ topics.foreach(topic => describeTopic(group, topic, channelSocketTimeoutMs, channelRetryBackoffMs))
+ }
+
+ private def describeTopic(group: String,
+ topic: String,
+ channelSocketTimeoutMs: Int,
+ channelRetryBackoffMs: Int) {
+ val topicPartitions = getTopicPartitions(topic)
+ val groupDirs = new ZKGroupTopicDirs(group, topic)
+ val ownerByTopicPartition = topicPartitions.flatMap { topicPartition =>
+ zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + topicPartition.partition)._1.map { owner =>
+ topicPartition -> owner
+ }
+ }.toMap
+ val partitionOffsets = getPartitionOffsets(group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
+ describeTopicPartition(group, topicPartitions, partitionOffsets.get, ownerByTopicPartition.get)
+ }
+
+ private def getTopicPartitions(topic: String): Seq[TopicAndPartition] = {
+ val topicPartitionMap = zkUtils.getPartitionsForTopics(Seq(topic))
+ val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
+ partitions.map(TopicAndPartition(topic, _))
+ }
+
+ protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
+ zkUtils.getLeaderForPartition(topic, partition) match {
+ case Some(-1) => LogEndOffsetResult.Unknown
+ case Some(brokerId) =>
+ getZkConsumer(brokerId).map { consumer =>
+ val topicAndPartition = new TopicAndPartition(topic, partition)
+ val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+ val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+ consumer.close()
+ LogEndOffsetResult.LogEndOffset(logEndOffset)
+ }.getOrElse(LogEndOffsetResult.Ignore)
+ case None =>
+ println(s"No broker for partition ${new TopicPartition(topic, partition)}")
+ LogEndOffsetResult.Ignore
+ }
+ }
+
+ private def getPartitionOffsets(group: String,
+ topicPartitions: Seq[TopicAndPartition],
+ channelSocketTimeoutMs: Int,
+ channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = {
+ val offsetMap = mutable.Map[TopicAndPartition, Long]()
+ val channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs)
+ channel.send(OffsetFetchRequest(group, topicPartitions))
+ val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload())
+
+ offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
+ if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
+ val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
+ // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
+ // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
+ try {
+ val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
+ offsetMap.put(topicAndPartition, offset)
+ } catch {
+ case z: ZkNoNodeException =>
+ println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper."
+ .format(group, topicAndPartition))
+ }
+ }
+ else if (offsetAndMetadata.error == ErrorMapping.NoError)
+ offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
else
- println("Delete for group %s failed because its consumers are still active.".format(group))
+ println("Could not fetch offset from kafka for group %s partition %s due to %s."
+ .format(group, topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
}
- catch {
- case e: ZkNoNodeException =>
- println("Delete for group %s failed because group does not exist.".format(group))
+ channel.disconnect()
+ offsetMap.toMap
+ }
+
+ private def deleteForGroup() {
+ val groups = opts.options.valuesOf(opts.groupOpt)
+ groups.asScala.foreach { group =>
+ try {
+ if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group))
+ println("Deleted all consumer group information for group %s in zookeeper.".format(group))
+ else
+ println("Delete for group %s failed because its consumers are still active.".format(group))
+ }
+ catch {
+ case e: ZkNoNodeException =>
+ println("Delete for group %s failed because group does not exist.".format(group))
+ }
}
}
- }
- private def deleteForTopic(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
- val groups = opts.options.valuesOf(opts.groupOpt)
- val topic = opts.options.valueOf(opts.topicOpt)
- Topic.validate(topic)
- groups.foreach { group =>
- try {
- if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
- println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic))
- else
- println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic))
+ private def deleteForTopic() {
+ val groups = opts.options.valuesOf(opts.groupOpt)
+ val topic = opts.options.valueOf(opts.topicOpt)
+ Topic.validate(topic)
+ groups.asScala.foreach { group =>
+ try {
+ if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
+ println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic))
+ else
+ println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic))
+ }
+ catch {
+ case e: ZkNoNodeException =>
+ println("Delete for group %s topic %s failed because group does not exist.".format(group, topic))
+ }
}
- catch {
- case e: ZkNoNodeException =>
- println("Delete for group %s topic %s failed because group does not exist.".format(group, topic))
+ }
+
+ private def deleteAllForTopic() {
+ val topic = opts.options.valueOf(opts.topicOpt)
+ Topic.validate(topic)
+ AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
+ println("Deleted consumer group information for all inactive consumer groups for topic %s in zookeeper.".format(topic))
+ }
+
+ private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
+ try {
+ zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
+ case Some(brokerInfoString) =>
+ Json.parseFull(brokerInfoString) match {
+ case Some(m) =>
+ val brokerInfo = m.asInstanceOf[Map[String, Any]]
+ val host = brokerInfo.get("host").get.asInstanceOf[String]
+ val port = brokerInfo.get("port").get.asInstanceOf[Int]
+ Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand"))
+ case None =>
+ throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
+ }
+ case None =>
+ throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
+ }
+ } catch {
+ case t: Throwable =>
+ println("Could not parse broker info due to " + t.getMessage)
+ None
}
}
- }
- private def deleteAllForTopic(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
- val topic = opts.options.valueOf(opts.topicOpt)
- Topic.validate(topic)
- AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
- println("Deleted consumer group information for all inactive consumer groups for topic %s in zookeeper.".format(topic))
}
- private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = {
- val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))
- require(configsToBeAdded.forall(config => config.length == 2),
- "Invalid config: all configs to be added must be in the format \"key=val\".")
- val props = new Properties
- configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
- props
- }
+ class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {
- private def describeTopic(zkUtils: ZkUtils,
- group: String,
- topic: String,
- channelSocketTimeoutMs: Int,
- channelRetryBackoffMs: Int,
- opts: ConsumerGroupCommandOptions) {
- val topicPartitions = getTopicPartitions(zkUtils, topic)
- describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, topicPartitions)
- }
+ private val adminClient = createAdminClient()
- def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition], owner: Option[String] = None): Unit = {
- val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
- topicPartitions
- .sortBy { case topicPartition => topicPartition.partition }
- .foreach { topicPartition =>
- describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition), opts, owner)
- }
- }
+ // `consumer` is only needed for `describe`, so we instantiate it lazily
+ private var consumer: KafkaConsumer[String, String] = null
- private def getTopicPartitions(zkUtils: ZkUtils, topic: String) = {
- val topicPartitionMap = zkUtils.getPartitionsForTopics(Seq(topic))
- val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
- partitions.map(TopicAndPartition(topic, _))
- }
+ def list() {
+ adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId))
+ }
- private def getPartitionOffsets(zkUtils: ZkUtils,
- group: String,
- topicPartitions: Seq[TopicAndPartition],
- channelSocketTimeoutMs: Int,
- channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = {
- val offsetMap = mutable.Map[TopicAndPartition, Long]()
- val channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs)
- channel.send(OffsetFetchRequest(group, topicPartitions))
- val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload())
-
- offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
- if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
- val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
- // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
- // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
- try {
- val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
- offsetMap.put(topicAndPartition, offset)
- } catch {
- case z: ZkNoNodeException =>
- println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper."
- .format(group, topicAndPartition))
+ protected def describeGroup(group: String) {
+ val consumerSummaries = adminClient.describeConsumerGroup(group)
+ if (consumerSummaries.isEmpty)
+ println(s"Consumer group `${group}` does not exist or is rebalancing.")
+ else {
+ val consumer = getConsumer()
+ printDescribeHeader()
+ consumerSummaries.foreach { consumerSummary =>
+ val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
+ val partitionOffsets = topicPartitions.flatMap { topicPartition =>
+ Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
+ topicPartition -> offsetAndMetadata.offset
+ }
+ }.toMap
+ describeTopicPartition(group, topicPartitions, partitionOffsets.get,
+ _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
}
}
- else if (offsetAndMetadata.error == ErrorMapping.NoError)
- offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
- else
- println("Could not fetch offset from kafka for group %s partition %s due to %s."
- .format(group, topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
}
- channel.disconnect()
- offsetMap.toMap
- }
- private def describePartition(zkUtils: ZkUtils,
- group: String,
- topic: String,
- partition: Int,
- offsetOpt: Option[Long],
- opts: ConsumerGroupCommandOptions,
- ownerOpt: Option[String] = None) {
- val topicPartition = new TopicPartition(topic, partition)
- val groupDirs = new ZKGroupTopicDirs(group, topic)
- val useNewConsumer: Boolean = opts.options.has(opts.newConsumerOpt)
- val owner: Option[String] = if (useNewConsumer) ownerOpt else zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1
- def print(logEndOffset: Long): Unit = {
- val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _)
- println("%s, %s, %s, %s, %s, %s, %s"
- .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none")))
+ protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
+ val consumer = getConsumer()
+ val topicPartition = new TopicPartition(topic, partition)
+ consumer.assign(List(topicPartition).asJava)
+ consumer.seekToEnd(topicPartition)
+ val logEndOffset = consumer.position(topicPartition)
+ LogEndOffsetResult.LogEndOffset(logEndOffset)
}
- zkUtils.getLeaderForPartition(topic, partition) match {
- case Some(-1) =>
- println("%s, %s, %s, %s, %s, %s, %s"
- .format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none")))
- case Some(brokerId) =>
- if (useNewConsumer) {
- val consumerOpt = getNewConsumer(zkUtils, brokerId)
- consumerOpt match {
- case Some(consumer) =>
- consumer.assign(List(topicPartition))
- consumer.seekToEnd(topicPartition)
- val logEndOffset = consumer.position(topicPartition)
- consumer.close()
- print(logEndOffset)
- case None => // ignore
- }
- } else {
- val consumerOpt = getZkConsumer(zkUtils, brokerId)
- consumerOpt match {
- case Some(consumer) =>
- val topicAndPartition: TopicAndPartition = new TopicAndPartition(topicPartition.topic(), topicPartition.partition())
- val request =
- OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
- val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
- consumer.close()
- print(logEndOffset)
- case None => // ignore
- }
- }
- case None =>
- println("No broker for partition %s".format(topicPartition))
+
+ def close() {
+ adminClient.close()
+ if (consumer != null) consumer.close()
}
- }
- private def getNewConsumer(zkUtils: ZkUtils, brokerId: Int): Option[KafkaConsumer[String, String]] = {
- try {
- zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
- case Some(brokerInfoString) =>
- Json.parseFull(brokerInfoString) match {
- case Some(m) =>
- val brokerInfo = m.asInstanceOf[Map[String, Any]]
- val host = brokerInfo.get("host").get.asInstanceOf[String]
- val port = brokerInfo.get("port").get.asInstanceOf[Int]
- val deserializer: String = (new StringDeserializer).getClass.getName
- val properties: Properties = new Properties()
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host + ":" + port)
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroupCommand")
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
- Some(new KafkaConsumer[String, String](properties))
- case None =>
- throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
- }
- case None =>
- throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
- }
- } catch {
- case t: Throwable =>
- println("Could not parse broker info due to " + t.getMessage)
- None
+ private def createAdminClient(): AdminClient =
+ AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt))
+
+ private def getConsumer() = {
+ if (consumer == null)
+ consumer = createNewConsumer()
+ consumer
}
- }
- private def getZkConsumer(zkUtils: ZkUtils, brokerId: Int): Option[SimpleConsumer] = {
- try {
- zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
- case Some(brokerInfoString) =>
- Json.parseFull(brokerInfoString) match {
- case Some(m) =>
- val brokerInfo = m.asInstanceOf[Map[String, Any]]
- val host = brokerInfo.get("host").get.asInstanceOf[String]
- val port = brokerInfo.get("port").get.asInstanceOf[Int]
- Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand"))
- case None =>
- throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
- }
- case None =>
- throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
- }
- } catch {
- case t: Throwable =>
- println("Could not parse broker info due to " + t.getMessage)
- None
+ private def createNewConsumer(): KafkaConsumer[String, String] = {
+ val properties = new Properties()
+ val deserializer = (new StringDeserializer).getClass.getName
+ val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, opts.options.valueOf(opts.groupOpt))
+ properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
+ new KafkaConsumer(properties)
}
+
+ }
+
+ sealed trait LogEndOffsetResult
+
+ object LogEndOffsetResult {
+ case class LogEndOffset(value: Long) extends LogEndOffsetResult
+ case object Unknown extends LogEndOffsetResult
+ case object Ignore extends LogEndOffsetResult
}
class ConsumerGroupCommandOptions(args: Array[String]) {
- val ZkConnectDoc = "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+ val ZkConnectDoc = "REQUIRED (unless new-consumer is used): The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over."
val BootstrapServerDoc = "REQUIRED (only when using new-consumer): The server to connect to."
val GroupDoc = "The consumer group we wish to act on."
@@ -390,10 +435,23 @@ object ConsumerGroupCommand {
// check required args
if (options.has(newConsumerOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
+
+ if (options.has(zkConnectOpt))
+ CommandLineUtils.printUsageAndDie(parser, s"Option $zkConnectOpt is not valid with $newConsumerOpt")
+
if (options.has(deleteOpt))
- CommandLineUtils.printUsageAndDie(parser, "Option %s does not work with %s".format(deleteOpt, newConsumerOpt))
- } else
+ CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is not valid with $newConsumerOpt. Note that " +
+ "there's no need to delete group metadata for the new consumer as it is automatically deleted when the last " +
+ "member leaves")
+
+ } else {
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+
+ if (options.has(bootstrapServerOpt))
+ CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is only valid with $newConsumerOpt")
+
+ }
+
if (options.has(describeOpt))
CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))