You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:00 UTC
[34/37] git commit: KAFKA-776 Changing ZK format breaks some tools;
reviewed by Neha Narkhede
KAFKA-776 Changing ZK format breaks some tools; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cea27356
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cea27356
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cea27356
Branch: refs/heads/trunk
Commit: cea27356feedb66f91b734c9c63de1ea44ac0d47
Parents: 89622c8
Author: Neha Narkhede <ne...@gmail.com>
Authored: Wed Feb 27 19:17:43 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Feb 27 19:17:43 2013 -0800
----------------------------------------------------------------------
.../main/scala/kafka/admin/ShutdownBroker.scala | 36 ++++++++-------
core/src/main/scala/kafka/cluster/Broker.scala | 4 +-
.../scala/kafka/controller/KafkaController.scala | 2 +-
.../scala/kafka/tools/ConsumerOffsetChecker.scala | 32 ++++++++-----
.../main/scala/kafka/tools/ExportZkOffsets.scala | 10 +++-
.../kafka/tools/VerifyConsumerRebalance.scala | 5 ++-
6 files changed, 54 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/cea27356/core/src/main/scala/kafka/admin/ShutdownBroker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ShutdownBroker.scala b/core/src/main/scala/kafka/admin/ShutdownBroker.scala
index a38a567..bb20edb 100644
--- a/core/src/main/scala/kafka/admin/ShutdownBroker.scala
+++ b/core/src/main/scala/kafka/admin/ShutdownBroker.scala
@@ -25,6 +25,7 @@ import javax.management.remote.{JMXServiceURL, JMXConnectorFactory}
import javax.management.ObjectName
import kafka.controller.KafkaController
import scala.Some
+import kafka.common.BrokerNotAvailableException
object ShutdownBroker extends Logging {
@@ -35,15 +36,22 @@ object ShutdownBroker extends Logging {
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(params.zkConnect, 30000, 30000, ZKStringSerializer)
-
val controllerBrokerId = ZkUtils.getController(zkClient)
ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + controllerBrokerId)._1 match {
case Some(controllerInfo) =>
- val parsed = controllerInfo.split(":")
- val controllerHost = parsed(0)
- val controllerJmxPort = parsed(2)
- val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi"
- .format(controllerHost, controllerJmxPort))
+ var controllerHost: String = null
+ var controllerJmxPort: Int = -1
+ try {
+ Json.parseFull(controllerInfo) match {
+ case Some(m) =>
+ val brokerInfo = m.asInstanceOf[Map[String, Any]]
+ controllerHost = brokerInfo.get("host").get.toString
+ controllerJmxPort = brokerInfo.get("jmx_port").get.asInstanceOf[Int]
+ case None =>
+ throw new BrokerNotAvailableException("Broker id %d does not exist".format(controllerBrokerId))
+ }
+ }
+ val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(controllerHost, controllerJmxPort))
info("Connecting to jmx url " + jmxUrl)
val jmxc = JMXConnectorFactory.connect(jmxUrl, null)
val mbsc = jmxc.getMBeanServerConnection
@@ -52,21 +60,17 @@ object ShutdownBroker extends Logging {
Array(params.brokerId),
Array(classOf[Int].getName)).asInstanceOf[Int]
val shutdownComplete = (leaderPartitionsRemaining == 0)
- info("Shutdown status: " + (if (shutdownComplete)
- "complete" else
- "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining)))
+ info("Shutdown status: " +
+ (if (shutdownComplete) "complete" else "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining)))
shutdownComplete
case None =>
- error("Operation failed due to controller failure on %d.".format(controllerBrokerId))
- false
+ throw new BrokerNotAvailableException("Broker id %d does not exist".format(controllerBrokerId))
}
- }
- catch {
+ } catch {
case t: Throwable =>
- error("Operation failed due to %s.".format(t.getMessage), t)
+ error("Operation failed due to controller failure", t)
false
- }
- finally {
+ } finally {
if (zkClient != null)
zkClient.close()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/cea27356/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 6e91d29..435c473 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -35,11 +35,11 @@ private[kafka] object Broker {
Json.parseFull(brokerInfoString) match {
case Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
- val host = brokerInfo.get("host").get.toString
+ val host = brokerInfo.get("host").get.asInstanceOf[String]
val port = brokerInfo.get("port").get.asInstanceOf[Int]
new Broker(id, host, port)
case None =>
- throw new BrokerNotAvailableException("Broker id %s does not exist".format(id))
+ throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
}
} catch {
case t => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)
http://git-wip-us.apache.org/repos/asf/kafka/blob/cea27356/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4d253da..48eae7e 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -194,7 +194,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
}
- debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))
+ debug("Remaining partitions to move from broker %d: %s".format(id, partitionsRemaining.mkString(",")))
partitionsRemaining.size
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/cea27356/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 3161435..0e6d9b8 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -20,10 +20,10 @@ package kafka.tools
import joptsimple._
import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
+import kafka.utils.{Json, ZkUtils, ZKStringSerializer, Logging}
import kafka.consumer.SimpleConsumer
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
-import kafka.common.TopicAndPartition
+import kafka.common.{BrokerNotAvailableException, TopicAndPartition}
import scala.collection._
@@ -31,19 +31,27 @@ object ConsumerOffsetChecker extends Logging {
private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()
- private val BrokerIpPattern = """^([^:]+):(\d+).*$""".r
- // e.g., 127.0.0.1:9092:9999 (with JMX port)
-
private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {
- val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
- val consumer = brokerInfo match {
- case Some(BrokerIpPattern(ip, port)) =>
- Some(new SimpleConsumer(ip, port.toInt, 10000, 100000, "ConsumerOffsetChecker"))
- case _ =>
- error("Could not parse broker info %s with regex %s".format(brokerInfo, BrokerIpPattern.toString()))
+ try {
+ ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1 match {
+ case Some(brokerInfoString) =>
+ Json.parseFull(brokerInfoString) match {
+ case Some(m) =>
+ val brokerInfo = m.asInstanceOf[Map[String, Any]]
+ val host = brokerInfo.get("host").get.asInstanceOf[String]
+ val port = brokerInfo.get("port").get.asInstanceOf[Int]
+ Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerOffsetChecker"))
+ case None =>
+ throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))
+ }
+ case None =>
+ throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))
+ }
+ } catch {
+ case t: Throwable =>
+ error("Could not parse broker info", t)
None
}
- consumer
}
private def processPartition(zkClient: ZkClient,
http://git-wip-us.apache.org/repos/asf/kafka/blob/cea27356/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index 0add886..005231f 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -100,9 +100,13 @@ object ExportZkOffsets extends Logging {
for (bidPid <- bidPidList) {
val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic)
val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid
- val offsetVal = ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1
- fileWriter.write(offsetPath + ":" + offsetVal + "\n")
- debug(offsetPath + " => " + offsetVal)
+ ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1 match {
+ case Some(offsetVal) =>
+ fileWriter.write(offsetPath + ":" + offsetVal + "\n")
+ debug(offsetPath + " => " + offsetVal)
+ case None =>
+ error("Could not retrieve offset value from " + offsetPath)
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/cea27356/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index 40ea69d..d9c8bae 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -104,7 +104,10 @@ object VerifyConsumerRebalance extends Logging {
}
// try reading the partition owner path for see if a valid consumer id exists there
val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
- val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1
+ val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1 match {
+ case Some(m) => m
+ case None => null
+ }
if(partitionOwner == null) {
error("No owner for topic %s partition %s".format(topic, partition))
rebalanceSucceeded = false