You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/08 22:44:20 UTC
git commit: ConsumerOffsetChecker does not work with 0.8; kafka-685;
patched by Maxime Brugidou; reviewed by Jun Rao
Updated Branches:
refs/heads/0.8 2f4bfc645 -> fd94251d8
ConsumerOffsetChecker does not work with 0.8; kafka-685; patched by Maxime Brugidou; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fd94251d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fd94251d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fd94251d
Branch: refs/heads/0.8
Commit: fd94251d89180cdb6cf815cf2d6f568e15b85c59
Parents: 2f4bfc6
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jan 8 13:43:53 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 8 13:43:53 2013 -0800
----------------------------------------------------------------------
.../scala/kafka/tools/ConsumerOffsetChecker.scala | 65 +++++++--------
1 files changed, 31 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fd94251d/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index db9acc9..3161435 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -29,66 +29,59 @@ import scala.collection._
object ConsumerOffsetChecker extends Logging {
- private val consumerMap: mutable.Map[String, Option[SimpleConsumer]] = mutable.Map()
+ private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()
- private val BidPidPattern = """(\d+)-(\d+)""".r
+ private val BrokerIpPattern = """^([^:]+):(\d+).*$""".r
+ // e.g., 127.0.0.1:9092:9999 (with JMX port)
- private val BrokerIpPattern = """.*:([^:]+):(\d+$)""".r
- // e.g., 127.0.0.1-1315436360737:127.0.0.1:9092
- // e.g., host.domain.com-1315436360737:host.domain.com:9092
-
- private def getConsumer(zkClient: ZkClient, bid: String): Option[SimpleConsumer] = {
+ private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {
val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
val consumer = brokerInfo match {
- case BrokerIpPattern(ip, port) =>
+ case Some(BrokerIpPattern(ip, port)) =>
Some(new SimpleConsumer(ip, port.toInt, 10000, 100000, "ConsumerOffsetChecker"))
case _ =>
- error("Could not parse broker info %s".format(brokerInfo))
+ error("Could not parse broker info %s with regex %s".format(brokerInfo, BrokerIpPattern.toString()))
None
}
consumer
}
private def processPartition(zkClient: ZkClient,
- group: String, topic: String, bidPid: String) {
+ group: String, topic: String, pid: Int) {
val offset = ZkUtils.readData(zkClient, "/consumers/%s/offsets/%s/%s".
- format(group, topic, bidPid))._1.toLong
+ format(group, topic, pid))._1.toLong
val owner = ZkUtils.readDataMaybeNull(zkClient, "/consumers/%s/owners/%s/%s".
- format(group, topic, bidPid))._1
- println("%s,%s,%s (Group,Topic,BrokerId-PartitionId)".format(group, topic, bidPid))
- println("%20s%s".format("Owner = ", owner))
- println("%20s%d".format("Consumer offset = ", offset))
- println("%20s%,d (%,.2fG)".format("= ", offset, offset / math.pow(1024, 3)))
-
- bidPid match {
- case BidPidPattern(bid, pid) =>
- val consumerOpt = consumerMap.getOrElseUpdate(
- bid, getConsumer(zkClient, bid))
+ format(group, topic, pid))._1
+
+ ZkUtils.getLeaderForPartition(zkClient, topic, pid) match {
+ case Some(bid) =>
+ val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkClient, bid))
consumerOpt match {
case Some(consumer) =>
- val topicAndPartition = TopicAndPartition(topic, pid.toInt)
+ val topicAndPartition = TopicAndPartition(topic, pid)
val request =
OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
- println("%20s%d".format("Log size = ", logSize))
- println("%20s%,d (%,.2fG)".format("= ", logSize, logSize / math.pow(1024, 3)))
val lag = logSize - offset
- println("%20s%d".format("Consumer lag = ", lag))
- println("%20s%,d (%,.2fG)".format("= ", lag, lag / math.pow(1024, 3)))
- println()
+ println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offset, logSize, lag,
+ owner match {case Some(ownerStr) => ownerStr case None => "none"}))
+ consumer.close()
case None => // ignore
}
- case _ =>
- error("Could not parse broker/partition pair %s".format(bidPid))
+ case None =>
+ error("No broker for partition %s - %s".format(topic, pid))
}
}
private def processTopic(zkClient: ZkClient, group: String, topic: String) {
- val bidsPids = ZkUtils.getChildrenParentMayNotExist(
- zkClient, "/consumers/%s/offsets/%s".format(group, topic)).toList
- bidsPids.sorted.foreach {
- bidPid => processPartition(zkClient, group, topic, bidPid)
+ val pidMap = ZkUtils.getPartitionsForTopics(zkClient, Seq(topic))
+ pidMap.get(topic) match {
+ case Some(pids) =>
+ pids.sorted.foreach {
+ pid => processPartition(zkClient, group, topic, pid)
+ }
+ case None => // ignore
}
}
@@ -112,6 +105,7 @@ object ConsumerOffsetChecker extends Logging {
withRequiredArg().ofType(classOf[String])
val groupOpt = parser.accepts("group", "Consumer group.").
withRequiredArg().ofType(classOf[String])
+ parser.accepts("broker-info", "Print broker info")
parser.accepts("help", "Print this message.")
val options = parser.parse(args : _*)
@@ -147,11 +141,14 @@ object ConsumerOffsetChecker extends Logging {
debug("zkConnect = %s; topics = %s; group = %s".format(
zkConnect, topicList.toString(), group))
+ println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner"))
topicList.sorted.foreach {
topic => processTopic(zkClient, group, topic)
}
- printBrokerInfo()
+ if (options.has("broker-info"))
+ printBrokerInfo();
+
}
finally {
for (consumerOpt <- consumerMap.values) {