You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/12 03:26:37 UTC

[1/11] git commit: ConsumerOffsetChecker does not work with 0.8; kafka-685; patched by Maxime Brugidou; reviewed by Jun Rao

ConsumerOffsetChecker does not work with 0.8; kafka-685; patched by Maxime Brugidou; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fd94251d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fd94251d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fd94251d

Branch: refs/heads/trunk
Commit: fd94251d89180cdb6cf815cf2d6f568e15b85c59
Parents: 2f4bfc6
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jan 8 13:43:53 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 8 13:43:53 2013 -0800

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsumerOffsetChecker.scala  |   65 +++++++--------
 1 files changed, 31 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fd94251d/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index db9acc9..3161435 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -29,66 +29,59 @@ import scala.collection._
 
 object ConsumerOffsetChecker extends Logging {
 
-  private val consumerMap: mutable.Map[String, Option[SimpleConsumer]] = mutable.Map()
+  private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()
 
-  private val BidPidPattern = """(\d+)-(\d+)""".r
+  private val BrokerIpPattern = """^([^:]+):(\d+).*$""".r
+  // e.g., 127.0.0.1:9092:9999 (with JMX port)
 
-  private val BrokerIpPattern = """.*:([^:]+):(\d+$)""".r
-  // e.g., 127.0.0.1-1315436360737:127.0.0.1:9092
-  // e.g., host.domain.com-1315436360737:host.domain.com:9092
-
-  private def getConsumer(zkClient: ZkClient, bid: String): Option[SimpleConsumer] = {
+  private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {
     val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
     val consumer = brokerInfo match {
-      case BrokerIpPattern(ip, port) =>
+      case Some(BrokerIpPattern(ip, port)) =>
         Some(new SimpleConsumer(ip, port.toInt, 10000, 100000, "ConsumerOffsetChecker"))
       case _ =>
-        error("Could not parse broker info %s".format(brokerInfo))
+        error("Could not parse broker info %s with regex %s".format(brokerInfo, BrokerIpPattern.toString()))
         None
     }
     consumer
   }
 
   private def processPartition(zkClient: ZkClient,
-                               group: String, topic: String, bidPid: String) {
+                               group: String, topic: String, pid: Int) {
     val offset = ZkUtils.readData(zkClient, "/consumers/%s/offsets/%s/%s".
-            format(group, topic, bidPid))._1.toLong
+            format(group, topic, pid))._1.toLong
     val owner = ZkUtils.readDataMaybeNull(zkClient, "/consumers/%s/owners/%s/%s".
-            format(group, topic, bidPid))._1
-    println("%s,%s,%s (Group,Topic,BrokerId-PartitionId)".format(group, topic, bidPid))
-    println("%20s%s".format("Owner = ", owner))
-    println("%20s%d".format("Consumer offset = ", offset))
-    println("%20s%,d (%,.2fG)".format("= ", offset, offset / math.pow(1024, 3)))
-
-    bidPid match {
-      case BidPidPattern(bid, pid) =>
-        val consumerOpt = consumerMap.getOrElseUpdate(
-          bid, getConsumer(zkClient, bid))
+            format(group, topic, pid))._1
+
+    ZkUtils.getLeaderForPartition(zkClient, topic, pid) match {
+      case Some(bid) =>
+        val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkClient, bid))
         consumerOpt match {
           case Some(consumer) =>
-            val topicAndPartition = TopicAndPartition(topic, pid.toInt)
+            val topicAndPartition = TopicAndPartition(topic, pid)
             val request =
               OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
             val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-            println("%20s%d".format("Log size = ", logSize))
-            println("%20s%,d (%,.2fG)".format("= ", logSize, logSize / math.pow(1024, 3)))
 
             val lag = logSize - offset
-            println("%20s%d".format("Consumer lag = ", lag))
-            println("%20s%,d (%,.2fG)".format("= ", lag, lag / math.pow(1024, 3)))
-            println()
+            println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offset, logSize, lag,
+              owner match {case Some(ownerStr) => ownerStr case None => "none"}))
+            consumer.close()
           case None => // ignore
         }
-      case _ =>
-        error("Could not parse broker/partition pair %s".format(bidPid))
+      case None =>
+        error("No broker for partition %s - %s".format(topic, pid))
     }
   }
 
   private def processTopic(zkClient: ZkClient, group: String, topic: String) {
-    val bidsPids = ZkUtils.getChildrenParentMayNotExist(
-      zkClient, "/consumers/%s/offsets/%s".format(group, topic)).toList
-    bidsPids.sorted.foreach {
-      bidPid => processPartition(zkClient, group, topic, bidPid)
+    val pidMap = ZkUtils.getPartitionsForTopics(zkClient, Seq(topic))
+    pidMap.get(topic) match {
+      case Some(pids) =>
+        pids.sorted.foreach {
+          pid => processPartition(zkClient, group, topic, pid)
+        }
+      case None => // ignore
     }
   }
 
@@ -112,6 +105,7 @@ object ConsumerOffsetChecker extends Logging {
             withRequiredArg().ofType(classOf[String])
     val groupOpt = parser.accepts("group", "Consumer group.").
             withRequiredArg().ofType(classOf[String])
+    parser.accepts("broker-info", "Print broker info")
     parser.accepts("help", "Print this message.")
 
     val options = parser.parse(args : _*)
@@ -147,11 +141,14 @@ object ConsumerOffsetChecker extends Logging {
       debug("zkConnect = %s; topics = %s; group = %s".format(
         zkConnect, topicList.toString(), group))
 
+      println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner"))
       topicList.sorted.foreach {
         topic => processTopic(zkClient, group, topic)
       }
 
-      printBrokerInfo()
+      if (options.has("broker-info"))
+        printBrokerInfo();
+
     }
     finally {
       for (consumerOpt <- consumerMap.values) {