You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/04 22:43:53 UTC
[2/5] git commit: KAFKA-668 Store jmx port in broker zk string -
controlled shutdown admin tool should not require controller JMX url/port to
be supplied. Reviewed by Jun Rao.
KAFKA-668 Store jmx port in broker zk string - controlled shutdown admin tool should not require controller JMX url/port to be supplied. Reviewed by Jun Rao.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/25d77cc6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/25d77cc6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/25d77cc6
Branch: refs/heads/trunk
Commit: 25d77cc69c5f12394f55e22516fc90fd3c272166
Parents: c6d4110
Author: Joel Koshy <jj...@apache.org>
Authored: Wed Jan 2 14:08:46 2013 -0800
Committer: Joel Koshy <jj...@apache.org>
Committed: Wed Jan 2 14:08:46 2013 -0800
----------------------------------------------------------------------
.../main/scala/kafka/admin/ShutdownBroker.scala | 33 +++++++--------
.../main/scala/kafka/server/KafkaZooKeeper.scala | 3 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
4 files changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/25d77cc6/core/src/main/scala/kafka/admin/ShutdownBroker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ShutdownBroker.scala b/core/src/main/scala/kafka/admin/ShutdownBroker.scala
index 9571fd5..a38a567 100644
--- a/core/src/main/scala/kafka/admin/ShutdownBroker.scala
+++ b/core/src/main/scala/kafka/admin/ShutdownBroker.scala
@@ -29,27 +29,32 @@ import scala.Some
object ShutdownBroker extends Logging {
- private case class ShutdownParams(zkConnect: String, brokerId: java.lang.Integer, jmxUrl: String)
+ private case class ShutdownParams(zkConnect: String, brokerId: java.lang.Integer)
private def invokeShutdown(params: ShutdownParams): Boolean = {
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(params.zkConnect, 30000, 30000, ZKStringSerializer)
+
val controllerBrokerId = ZkUtils.getController(zkClient)
- val controllerOpt = ZkUtils.getBrokerInfo(zkClient, controllerBrokerId)
- controllerOpt match {
- case Some(controller) =>
- val jmxUrl = new JMXServiceURL(params.jmxUrl)
+ ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + controllerBrokerId)._1 match {
+ case Some(controllerInfo) =>
+ val parsed = controllerInfo.split(":")
+ val controllerHost = parsed(0)
+ val controllerJmxPort = parsed(2)
+ val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi"
+ .format(controllerHost, controllerJmxPort))
+ info("Connecting to jmx url " + jmxUrl)
val jmxc = JMXConnectorFactory.connect(jmxUrl, null)
val mbsc = jmxc.getMBeanServerConnection
val leaderPartitionsRemaining = mbsc.invoke(new ObjectName(KafkaController.MBeanName),
- "shutdownBroker",
- Array(params.brokerId),
- Array(classOf[Int].getName)).asInstanceOf[Int]
+ "shutdownBroker",
+ Array(params.brokerId),
+ Array(classOf[Int].getName)).asInstanceOf[Int]
val shutdownComplete = (leaderPartitionsRemaining == 0)
info("Shutdown status: " + (if (shutdownComplete)
- "complete" else
- "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining)))
+ "complete" else
+ "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining)))
shutdownComplete
case None =>
error("Operation failed due to controller failure on %d.".format(controllerBrokerId))
@@ -88,11 +93,6 @@ object ShutdownBroker extends Logging {
.describedAs("retry interval in ms (> 1000)")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000)
- val jmxUrlOpt = parser.accepts("jmx.url", "Controller's JMX URL.")
- .withRequiredArg
- .describedAs("JMX url.")
- .ofType(classOf[String])
- .defaultsTo("service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi")
val options = parser.parse(args : _*)
CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt)
@@ -100,8 +100,7 @@ object ShutdownBroker extends Logging {
val retryIntervalMs = options.valueOf(retryIntervalOpt).intValue.max(1000)
val numRetries = options.valueOf(numRetriesOpt).intValue
- val shutdownParams =
- ShutdownParams(options.valueOf(zkConnectOpt), options.valueOf(brokerOpt), options.valueOf(jmxUrlOpt))
+ val shutdownParams = ShutdownParams(options.valueOf(zkConnectOpt), options.valueOf(brokerOpt))
if (!invokeShutdown(shutdownParams)) {
(1 to numRetries).takeWhile(attempt => {
http://git-wip-us.apache.org/repos/asf/kafka/blob/25d77cc6/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
index 42f8239..0e6c656 100644
--- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
+++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
@@ -47,7 +47,8 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
InetAddress.getLocalHost.getCanonicalHostName
else
config.hostName
- ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port)
+ val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
+ ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, jmxPort)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/25d77cc6/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index eabedd0..5ba5938 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -180,11 +180,11 @@ object ZkUtils extends Logging {
replicas.contains(brokerId.toString)
}
- def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int) {
+ def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val broker = new Broker(id, host, port)
try {
- createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString)
+ createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString + ":" + jmxPort)
} catch {
case e: ZkNodeExistsException =>
throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/25d77cc6/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index cebb371..a508895 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -334,7 +334,7 @@ object TestUtils extends Logging {
def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
val brokers = ids.map(id => new Broker(id, "localhost", 6667))
- brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port))
+ brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, jmxPort = -1))
brokers
}