You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/10 01:49:45 UTC
svn commit: r1167405 -
/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
Author: junrao
Date: Fri Sep 9 23:49:45 2011
New Revision: 1167405
URL: http://svn.apache.org/viewvc?rev=1167405&view=rev
Log:
Tool to watch consumer offsets and lag; patched by Joel Joshy; reviewed by Jun Rao; KAFKA-127
Added:
incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala?rev=1167405&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala Fri Sep 9 23:49:45 2011
@@ -0,0 +1,147 @@
+
+package kafka.tools
+
+
+import joptsimple._
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZkUtils, ZKStringSerializer}
+import org.apache.log4j.Logger
+import kafka.consumer.SimpleConsumer
+import collection.mutable.Map
+object ConsumerOffsetChecker {
+ private val logger = Logger.getLogger(getClass)
+
+ private val consumerMap: Map[String, Option[SimpleConsumer]] = Map()
+
+ private val BidPidPattern = """(\d+)-(\d+)""".r
+
+ private val BrokerIpPattern = """.*:(\d+\.\d+\.\d+\.\d+):(\d+$)""".r
+ // e.g., 127.0.0.1-1315436360737:127.0.0.1:9092
+
+ private def getConsumer(zkClient: ZkClient, bid: String): Option[SimpleConsumer] = {
+ val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))
+ val consumer = brokerInfo match {
+ case BrokerIpPattern(ip, port) =>
+ Some(new SimpleConsumer(ip, port.toInt, 10000, 100000))
+ case _ =>
+ logger.error("Could not parse broker info %s".format(brokerInfo))
+ None
+ }
+ consumer
+ }
+
+ private def processPartition(zkClient: ZkClient,
+ group: String, topic: String, bidPid: String) {
+ val offset = ZkUtils.readData(zkClient, "/consumers/%s/offsets/%s/%s".
+ format(group, topic, bidPid)).toLong
+ val owner = ZkUtils.readDataMaybeNull(zkClient, "/consumers/%s/owners/%s/%s".
+ format(group, topic, bidPid))
+ println("%s,%s,%s (Group,Topic,BrokerId-PartitionId)".format(group, topic, bidPid))
+ println("%20s%s".format("Owner = ", owner))
+ println("%20s%d".format("Consumer offset = ", offset))
+ println("%20s%,d (%,.2fG)".format("= ", offset, offset / math.pow(1024, 3)))
+
+ bidPid match {
+ case BidPidPattern(bid, pid) =>
+ val consumerOpt = consumerMap.getOrElseUpdate(
+ bid, getConsumer(zkClient, bid))
+ consumerOpt match {
+ case Some(consumer) =>
+ val logSize =
+ consumer.getOffsetsBefore(topic, pid.toInt, -1, 1).last.toLong
+ println("%20s%d".format("Log size = ", logSize))
+ println("%20s%,d (%,.2fG)".format("= ", logSize, logSize / math.pow(1024, 3)))
+
+ val lag = logSize - offset
+ println("%20s%d".format("Consumer lag = ", lag))
+ println("%20s%,d (%,.2fG)".format("= ", lag, lag / math.pow(1024, 3)))
+ println()
+ case None => // ignore
+ }
+ case _ =>
+ logger.error("Could not parse broker/partition pair %s".format(bidPid))
+ }
+ }
+
+ private def processTopic(zkClient: ZkClient, group: String, topic: String) {
+ val bidsPids = ZkUtils.getChildrenParentMayNotExist(
+ zkClient, "/consumers/%s/offsets/%s".format(group, topic)).toList
+ bidsPids.foreach {
+ bidPid => processPartition(zkClient, group, topic, bidPid)
+ }
+ }
+
+ private def printBrokerInfo() {
+ println("BROKER INFO")
+ for ((bid, consumerOpt) <- consumerMap)
+ consumerOpt match {
+ case Some(consumer) =>
+ println("%s -> %s:%d".format(bid, consumer.host, consumer.port))
+ case None => // ignore
+ }
+ }
+
+ def main(args: Array[String]) {
+ val parser = new OptionParser()
+
+ val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.").
+ withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]);
+ val topicsOpt = parser.accepts("topic",
+ "Comma-separated list of consumer topics (all topics if absent).").
+ withRequiredArg().ofType(classOf[String])
+ val groupOpt = parser.accepts("group", "Consumer group.").
+ withRequiredArg().ofType(classOf[String])
+ parser.accepts("help", "Print this message.")
+
+ val options = parser.parse(args : _*)
+
+ if (options.has("help")) {
+ parser.printHelpOn(System.out)
+ System.exit(0)
+ }
+
+ for (opt <- List(groupOpt))
+ if (!options.has(opt)) {
+ System.err.println("Missing required argument: %s".format(opt))
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+
+ val zkConnect = options.valueOf(zkConnectOpt)
+ val group = options.valueOf(groupOpt)
+ val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt))
+ else None
+
+
+ var zkClient: ZkClient = null
+ try {
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+
+ val topicList = topics match {
+ case Some(x) => x.split(",").view.toList
+ case None => ZkUtils.getChildren(
+ zkClient, "/consumers/%s/offsets".format(group)).toList
+ }
+
+ logger.debug("zkConnect = %s; topics = %s; group = %s".format(
+ zkConnect, topicList.toString(), group))
+
+ topicList.foreach {
+ topic => processTopic(zkClient, group, topic)
+ }
+
+ printBrokerInfo()
+ }
+ finally {
+ for (consumerOpt <- consumerMap.values) {
+ consumerOpt match {
+ case Some(consumer) => consumer.close()
+ case None => // ignore
+ }
+ }
+ if (zkClient != null)
+ zkClient.close()
+ }
+ }
+}
+