You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/04 22:43:53 UTC

[2/5] git commit: KAFKA-668 Store jmx port in broker zk string - controlled shutdown admin tool should not require controller JMX url/port to be supplied. Reviewed by Jun Rao.

KAFKA-668 Store jmx port in broker zk string - controlled shutdown admin tool should not require controller JMX url/port to be supplied. Reviewed by Jun Rao.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/25d77cc6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/25d77cc6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/25d77cc6

Branch: refs/heads/trunk
Commit: 25d77cc69c5f12394f55e22516fc90fd3c272166
Parents: c6d4110
Author: Joel Koshy <jj...@apache.org>
Authored: Wed Jan 2 14:08:46 2013 -0800
Committer: Joel Koshy <jj...@apache.org>
Committed: Wed Jan 2 14:08:46 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/admin/ShutdownBroker.scala    |   33 +++++++--------
 .../main/scala/kafka/server/KafkaZooKeeper.scala   |    3 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |    4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |    2 +-
 4 files changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/25d77cc6/core/src/main/scala/kafka/admin/ShutdownBroker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ShutdownBroker.scala b/core/src/main/scala/kafka/admin/ShutdownBroker.scala
index 9571fd5..a38a567 100644
--- a/core/src/main/scala/kafka/admin/ShutdownBroker.scala
+++ b/core/src/main/scala/kafka/admin/ShutdownBroker.scala
@@ -29,27 +29,32 @@ import scala.Some
 
 object ShutdownBroker extends Logging {
 
-  private case class ShutdownParams(zkConnect: String, brokerId: java.lang.Integer, jmxUrl: String)
+  private case class ShutdownParams(zkConnect: String, brokerId: java.lang.Integer)
 
   private def invokeShutdown(params: ShutdownParams): Boolean = {
     var zkClient: ZkClient = null
     try {
       zkClient = new ZkClient(params.zkConnect, 30000, 30000, ZKStringSerializer)
+
       val controllerBrokerId = ZkUtils.getController(zkClient)
-      val controllerOpt = ZkUtils.getBrokerInfo(zkClient, controllerBrokerId)
-      controllerOpt match {
-        case Some(controller) =>
-          val jmxUrl = new JMXServiceURL(params.jmxUrl)
+      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + controllerBrokerId)._1 match {
+        case Some(controllerInfo) =>
+          val parsed = controllerInfo.split(":")
+          val controllerHost = parsed(0)
+          val controllerJmxPort = parsed(2)
+          val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi"
+                                         .format(controllerHost, controllerJmxPort))
+          info("Connecting to jmx url " + jmxUrl)
           val jmxc = JMXConnectorFactory.connect(jmxUrl, null)
           val mbsc = jmxc.getMBeanServerConnection
           val leaderPartitionsRemaining = mbsc.invoke(new ObjectName(KafkaController.MBeanName),
-            "shutdownBroker",
-            Array(params.brokerId),
-            Array(classOf[Int].getName)).asInstanceOf[Int]
+                                                      "shutdownBroker",
+                                                      Array(params.brokerId),
+                                                      Array(classOf[Int].getName)).asInstanceOf[Int]
           val shutdownComplete = (leaderPartitionsRemaining == 0)
           info("Shutdown status: " + (if (shutdownComplete)
-                  "complete" else
-                  "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining)))
+            "complete" else
+            "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining)))
           shutdownComplete
         case None =>
           error("Operation failed due to controller failure on %d.".format(controllerBrokerId))
@@ -88,11 +93,6 @@ object ShutdownBroker extends Logging {
             .describedAs("retry interval in ms (> 1000)")
             .ofType(classOf[java.lang.Integer])
             .defaultsTo(1000)
-    val jmxUrlOpt = parser.accepts("jmx.url", "Controller's JMX URL.")
-            .withRequiredArg
-            .describedAs("JMX url.")
-            .ofType(classOf[String])
-            .defaultsTo("service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi")
 
     val options = parser.parse(args : _*)
     CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt)
@@ -100,8 +100,7 @@ object ShutdownBroker extends Logging {
     val retryIntervalMs = options.valueOf(retryIntervalOpt).intValue.max(1000)
     val numRetries = options.valueOf(numRetriesOpt).intValue
 
-    val shutdownParams =
-      ShutdownParams(options.valueOf(zkConnectOpt), options.valueOf(brokerOpt), options.valueOf(jmxUrlOpt))
+    val shutdownParams = ShutdownParams(options.valueOf(zkConnectOpt), options.valueOf(brokerOpt))
 
     if (!invokeShutdown(shutdownParams)) {
       (1 to numRetries).takeWhile(attempt => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/25d77cc6/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
index 42f8239..0e6c656 100644
--- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
+++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
@@ -47,7 +47,8 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
         InetAddress.getLocalHost.getCanonicalHostName 
       else
         config.hostName 
-    ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port)
+    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
+    ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, jmxPort)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/25d77cc6/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index eabedd0..5ba5938 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -180,11 +180,11 @@ object ZkUtils extends Logging {
     replicas.contains(brokerId.toString)
   }
 
-  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int) {
+  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val broker = new Broker(id, host, port)
     try {
-      createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString)
+      createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString + ":" + jmxPort)
     } catch {
       case e: ZkNodeExistsException =>
         throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/25d77cc6/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index cebb371..a508895 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -334,7 +334,7 @@ object TestUtils extends Logging {
 
   def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
     val brokers = ids.map(id => new Broker(id, "localhost", 6667))
-    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port))
+    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, jmxPort = -1))
     brokers
   }