You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:00 UTC

[35/37] git commit: KAFKA-776 Changing ZK format breaks some tools; reviewed by Neha Narkhede

KAFKA-776 Changing ZK format breaks some tools; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/db65c957
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/db65c957
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/db65c957

Branch: refs/heads/trunk
Commit: db65c9573533b5d6f8a3a5cbfdd287ebdcde72e4
Parents: 89622c8
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Wed Feb 27 19:17:43 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Feb 27 19:23:04 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/admin/ShutdownBroker.scala    |   36 ++++++++-------
 core/src/main/scala/kafka/cluster/Broker.scala     |    4 +-
 .../scala/kafka/controller/KafkaController.scala   |    2 +-
 .../scala/kafka/tools/ConsumerOffsetChecker.scala  |   32 ++++++++-----
 .../main/scala/kafka/tools/ExportZkOffsets.scala   |   10 +++-
 .../kafka/tools/VerifyConsumerRebalance.scala      |    5 ++-
 6 files changed, 54 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/db65c957/core/src/main/scala/kafka/admin/ShutdownBroker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ShutdownBroker.scala b/core/src/main/scala/kafka/admin/ShutdownBroker.scala
index a38a567..bb20edb 100644
--- a/core/src/main/scala/kafka/admin/ShutdownBroker.scala
+++ b/core/src/main/scala/kafka/admin/ShutdownBroker.scala
@@ -25,6 +25,7 @@ import javax.management.remote.{JMXServiceURL, JMXConnectorFactory}
 import javax.management.ObjectName
 import kafka.controller.KafkaController
 import scala.Some
+import kafka.common.BrokerNotAvailableException
 
 
 object ShutdownBroker extends Logging {
@@ -35,15 +36,22 @@ object ShutdownBroker extends Logging {
     var zkClient: ZkClient = null
     try {
       zkClient = new ZkClient(params.zkConnect, 30000, 30000, ZKStringSerializer)
-
       val controllerBrokerId = ZkUtils.getController(zkClient)
       ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + controllerBrokerId)._1 match {
         case Some(controllerInfo) =>
-          val parsed = controllerInfo.split(":")
-          val controllerHost = parsed(0)
-          val controllerJmxPort = parsed(2)
-          val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi"
-                                         .format(controllerHost, controllerJmxPort))
+          var controllerHost: String = null
+          var controllerJmxPort: Int = -1
+          try {
+            Json.parseFull(controllerInfo) match {
+              case Some(m) =>
+                val brokerInfo = m.asInstanceOf[Map[String, Any]]
+                controllerHost = brokerInfo.get("host").get.toString
+                controllerJmxPort = brokerInfo.get("jmx_port").get.asInstanceOf[Int]
+              case None =>
+                throw new BrokerNotAvailableException("Broker id %d does not exist".format(controllerBrokerId))
+            }
+          }
+          val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(controllerHost, controllerJmxPort))
           info("Connecting to jmx url " + jmxUrl)
           val jmxc = JMXConnectorFactory.connect(jmxUrl, null)
           val mbsc = jmxc.getMBeanServerConnection
@@ -52,21 +60,17 @@ object ShutdownBroker extends Logging {
                                                       Array(params.brokerId),
                                                       Array(classOf[Int].getName)).asInstanceOf[Int]
           val shutdownComplete = (leaderPartitionsRemaining == 0)
-          info("Shutdown status: " + (if (shutdownComplete)
-            "complete" else
-            "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining)))
+          info("Shutdown status: " +
+            (if (shutdownComplete) "complete" else "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining)))
           shutdownComplete
         case None =>
-          error("Operation failed due to controller failure on %d.".format(controllerBrokerId))
-          false
+          throw new BrokerNotAvailableException("Broker id %d does not exist".format(controllerBrokerId))
       }
-    }
-    catch {
+    } catch {
       case t: Throwable =>
-        error("Operation failed due to %s.".format(t.getMessage), t)
+        error("Operation failed due to controller failure", t)
         false
-    }
-    finally {
+    } finally {
       if (zkClient != null)
         zkClient.close()
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/db65c957/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 6e91d29..435c473 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -35,11 +35,11 @@ private[kafka] object Broker {
       Json.parseFull(brokerInfoString) match {
         case Some(m) =>
           val brokerInfo = m.asInstanceOf[Map[String, Any]]
-          val host = brokerInfo.get("host").get.toString
+          val host = brokerInfo.get("host").get.asInstanceOf[String]
           val port = brokerInfo.get("port").get.asInstanceOf[Int]
           new Broker(id, host, port)
         case None =>
-          throw new BrokerNotAvailableException("Broker id %s does not exist".format(id))
+          throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
       }
     } catch {
       case t => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)

http://git-wip-us.apache.org/repos/asf/kafka/blob/db65c957/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4d253da..48eae7e 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -194,7 +194,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
       }
 
-      debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))
+      debug("Remaining partitions to move from broker %d: %s".format(id, partitionsRemaining.mkString(",")))
       partitionsRemaining.size
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/db65c957/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 3161435..0e6d9b8 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -20,10 +20,10 @@ package kafka.tools
 
 import joptsimple._
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
+import kafka.utils.{Json, ZkUtils, ZKStringSerializer, Logging}
 import kafka.consumer.SimpleConsumer
 import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
-import kafka.common.TopicAndPartition
+import kafka.common.{BrokerNotAvailableException, TopicAndPartition}
 import scala.collection._
 
 
@@ -31,19 +31,27 @@ object ConsumerOffsetChecker extends Logging {
 
   private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()
 
-  private val BrokerIpPattern = """^([^:]+):(\d+).*$""".r
-  // e.g., 127.0.0.1:9092:9999 (with JMX port)
-
   private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {
-    val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
-    val consumer = brokerInfo match {
-      case Some(BrokerIpPattern(ip, port)) =>
-        Some(new SimpleConsumer(ip, port.toInt, 10000, 100000, "ConsumerOffsetChecker"))
-      case _ =>
-        error("Could not parse broker info %s with regex %s".format(brokerInfo, BrokerIpPattern.toString()))
+    try {
+      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1 match {
+        case Some(brokerInfoString) =>
+          Json.parseFull(brokerInfoString) match {
+            case Some(m) =>
+              val brokerInfo = m.asInstanceOf[Map[String, Any]]
+              val host = brokerInfo.get("host").get.asInstanceOf[String]
+              val port = brokerInfo.get("port").get.asInstanceOf[Int]
+              Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerOffsetChecker"))
+            case None =>
+              throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))
+          }
+        case None =>
+          throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))
+      }
+    } catch {
+      case t: Throwable =>
+        error("Could not parse broker info", t)
         None
     }
-    consumer
   }
 
   private def processPartition(zkClient: ZkClient,

http://git-wip-us.apache.org/repos/asf/kafka/blob/db65c957/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index 0add886..005231f 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -100,9 +100,13 @@ object ExportZkOffsets extends Logging {
           for (bidPid <- bidPidList) {
             val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic)
             val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid
-            val offsetVal  = ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1
-            fileWriter.write(offsetPath + ":" + offsetVal + "\n")
-            debug(offsetPath + " => " + offsetVal)
+            ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1 match {
+              case Some(offsetVal) =>
+                fileWriter.write(offsetPath + ":" + offsetVal + "\n")
+                debug(offsetPath + " => " + offsetVal)
+              case None =>
+                error("Could not retrieve offset value from " + offsetPath)
+            }
           }
         }
       }      

http://git-wip-us.apache.org/repos/asf/kafka/blob/db65c957/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index 40ea69d..d9c8bae 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -104,7 +104,10 @@ object VerifyConsumerRebalance extends Logging {
         }
         // try reading the partition owner path for see if a valid consumer id exists there
         val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
-        val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1
+        val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1 match {
+          case Some(m) => m
+          case None => null
+        }
         if(partitionOwner == null) {
           error("No owner for topic %s partition %s".format(topic, partition))
           rebalanceSucceeded = false