You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2016/12/20 02:35:49 UTC

kafka git commit: KAFKA-4447; Controller resigned but it also acts as a controller for a long time

Repository: kafka
Updated Branches:
  refs/heads/trunk 8b84d14c6 -> a786be947


KAFKA-4447; Controller resigned but it also acts as a controller for a long time

Author: Ismael Juma <is...@juma.me.uk>
Author: xiguantiaozhan <ka...@126.com>
Author: tuyang <tu...@meituan.com>

Reviewers: Jiangjie Qin <be...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Onur Karaman <ok...@linkedin.com>

Closes #2191 from xiguantiaozhan/avoid_swamp_controllerLog


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a786be94
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a786be94
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a786be94

Branch: refs/heads/trunk
Commit: a786be94788816bbce32a6cd6ffcf8949ed95556
Parents: 8b84d14
Author: Json Tu <ka...@126.com>
Authored: Mon Dec 19 18:35:30 2016 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Mon Dec 19 18:35:30 2016 -0800

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   |   8 +-
 .../consumer/ZookeeperTopicEventWatcher.scala   |   6 +-
 .../kafka/controller/ControllerZkListener.scala |  62 +++++++++++
 .../kafka/controller/KafkaController.scala      | 105 ++++++++-----------
 .../controller/PartitionStateMachine.scala      |  62 +++++------
 .../kafka/controller/ReplicaStateMachine.scala  |  16 +--
 .../src/main/scala/kafka/server/KafkaApis.scala |   4 +-
 .../scala/kafka/server/KafkaHealthcheck.scala   |   4 +-
 .../kafka/server/ZookeeperLeaderElector.scala   |  11 +-
 .../controller/ControllerFailoverTest.scala     |   2 +-
 .../unit/kafka/server/BaseRequestTest.scala     |   4 +-
 .../unit/kafka/server/MetadataRequestTest.scala |   4 +-
 12 files changed, 156 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a786be94/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 0b89477..4ef32e9 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -497,7 +497,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                  val topicCount: TopicCount,
                                  val loadBalancerListener: ZKRebalancerListener)
     extends IZkStateListener {
-    @throws(classOf[Exception])
+    @throws[Exception]
     def handleStateChanged(state: KeeperState) {
       // do nothing, since zkclient will do reconnect for us.
     }
@@ -509,7 +509,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
      * @throws Exception
      *             On any error.
      */
-    @throws(classOf[Exception])
+    @throws[Exception]
     def handleNewSession() {
       /**
        *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
@@ -545,7 +545,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       }
     }
 
-    @throws(classOf[Exception])
+    @throws[Exception]
     def handleDataDeleted(dataPath : String) {
       // TODO: This need to be implemented when we support delete topic
       warn("Topic for path " + dataPath + " gets deleted, which should not happen at this time")
@@ -597,7 +597,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
     watcherExecutorThread.start()
 
-    @throws(classOf[Exception])
+    @throws[Exception]
     def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
       rebalanceEventTriggered()
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a786be94/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index b9f2d41..d00f465 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -59,7 +59,7 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
 
   class ZkTopicEventListener extends IZkChildListener {
 
-    @throws(classOf[Exception])
+    @throws[Exception]
     def handleChildChange(parent: String, children: java.util.List[String]) {
       lock.synchronized {
         try {
@@ -81,10 +81,10 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
   class ZkSessionExpireListener(val topicEventListener: ZkTopicEventListener)
     extends IZkStateListener {
 
-    @throws(classOf[Exception])
+    @throws[Exception]
     def handleStateChanged(state: KeeperState) { }
 
-    @throws(classOf[Exception])
+    @throws[Exception]
     def handleNewSession() {
       lock.synchronized {
         if (zkUtils != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a786be94/core/src/main/scala/kafka/controller/ControllerZkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerZkListener.scala b/core/src/main/scala/kafka/controller/ControllerZkListener.scala
new file mode 100644
index 0000000..f7557ed
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/ControllerZkListener.scala
@@ -0,0 +1,62 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.controller
+
+import kafka.utils.Logging
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener}
+
+import scala.collection.JavaConverters._
+
+trait ControllerZkListener extends Logging {
+  logIdent = s"[$logName on Controller " + controller.config.brokerId + "]: "
+  protected def logName: String
+  protected def controller: KafkaController
+}
+
+trait ControllerZkChildListener extends IZkChildListener with ControllerZkListener {
+  @throws[Exception]
+  final def handleChildChange(parentPath: String, currentChildren: java.util.List[String]): Unit = {
+    // Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has moved
+    if (controller.isActive)
+      doHandleChildChange(parentPath, currentChildren.asScala)
+  }
+
+  @throws[Exception]
+  def doHandleChildChange(parentPath: String, currentChildren: Seq[String]): Unit
+}
+
+trait ControllerZkDataListener extends IZkDataListener with ControllerZkListener {
+  @throws[Exception]
+  final def handleDataChange(dataPath: String, data: AnyRef): Unit = {
+    // Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has moved
+    if (controller.isActive)
+      doHandleDataChange(dataPath, data)
+  }
+
+  @throws[Exception]
+  def doHandleDataChange(dataPath: String, data: AnyRef): Unit
+
+  @throws[Exception]
+  final def handleDataDeleted(dataPath: String): Unit = {
+    // Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has moved
+    if (controller.isActive)
+      doHandleDataDeleted(dataPath)
+  }
+
+  @throws[Exception]
+  def doHandleDataDeleted(dataPath: String): Unit
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a786be94/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index d3137c3..0aec81d 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -16,8 +16,6 @@
  */
 package kafka.controller
 
-import java.util
-
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse}
@@ -39,7 +37,7 @@ import kafka.utils.CoreUtils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
+import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
 import java.util.concurrent.locks.ReentrantLock
 
@@ -152,7 +150,7 @@ object KafkaController extends Logging {
   }
 }
 
-class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
   private val stateChangeLogger = KafkaController.stateChangeLogger
@@ -187,7 +185,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
     new Gauge[Int] {
       def value(): Int = {
         inLock(controllerContext.controllerLock) {
-          if (!isActive())
+          if (!isActive)
             0
           else
             controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader))
@@ -201,7 +199,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
     new Gauge[Int] {
       def value(): Int = {
         inLock(controllerContext.controllerLock) {
-          if (!isActive())
+          if (!isActive)
             0
           else
             controllerContext.partitionReplicaAssignment.count {
@@ -230,7 +228,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
    */
   def shutdownBroker(id: Int) : Set[TopicAndPartition] = {
 
-    if (!isActive()) {
+    if (!isActive) {
       throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
     }
 
@@ -398,7 +396,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
   /**
    * Returns true if this broker is the current controller.
    */
-  def isActive(): Boolean = {
+  def isActive: Boolean = {
     inLock(controllerContext.controllerLock) {
       controllerContext.controllerChannelManager != null
     }
@@ -1150,7 +1148,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
 
   class SessionExpirationListener() extends IZkStateListener with Logging {
     this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
-    @throws(classOf[Exception])
+
     def handleStateChanged(state: KeeperState) {
       // do nothing, since zkclient will do reconnect for us.
     }
@@ -1161,7 +1159,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
      *
      * @throws Exception On any error.
      */
-    @throws(classOf[Exception])
+    @throws[Exception]
     def handleNewSession() {
       info("ZK expired; shut down all controller components and try to re-elect")
       onControllerResignation()
@@ -1170,13 +1168,13 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
       }
     }
 
-    override def handleSessionEstablishmentError(error: Throwable): Unit = {
+    def handleSessionEstablishmentError(error: Throwable): Unit = {
       //no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError
     }
   }
 
   private def checkAndTriggerPartitionRebalance(): Unit = {
-    if (isActive()) {
+    if (isActive) {
       trace("checking need to trigger partition rebalance")
       // get all the active brokers
       var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
@@ -1234,18 +1232,18 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
  * If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned
  * partitions.
  */
-class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
-  this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
-  val zkUtils = controller.controllerContext.zkUtils
-  val controllerContext = controller.controllerContext
+class PartitionsReassignedListener(protected val controller: KafkaController) extends ControllerZkDataListener {
+  private val controllerContext = controller.controllerContext
+
+  protected def logName = "PartitionsReassignedListener"
 
   /**
    * Invoked when some partitions are reassigned by the admin command
    *
    * @throws Exception On any error.
    */
-  @throws(classOf[Exception])
-  def handleDataChange(dataPath: String, data: Object) {
+  @throws[Exception]
+  def doHandleDataChange(dataPath: String, data: AnyRef) {
     debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
       .format(dataPath, data))
     val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
@@ -1266,30 +1264,20 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
     }
   }
 
-  /**
-   * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
-   *
-   * @throws Exception On any error.
-   */
-  @throws(classOf[Exception])
-  def handleDataDeleted(dataPath: String) {
-  }
+  def doHandleDataDeleted(dataPath: String) {}
 }
 
-class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int,
-                                            reassignedReplicas: Set[Int])
-  extends IZkDataListener with Logging {
-  this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: "
-  val zkUtils = controller.controllerContext.zkUtils
-  val controllerContext = controller.controllerContext
+class ReassignedPartitionsIsrChangeListener(protected val controller: KafkaController, topic: String, partition: Int,
+                                            reassignedReplicas: Set[Int]) extends ControllerZkDataListener {
+  private val zkUtils = controller.controllerContext.zkUtils
+  private val controllerContext = controller.controllerContext
+
+  protected def logName = "ReassignedPartitionsIsrChangeListener"
 
   /**
    * Invoked when some partitions need to move leader to preferred replica
-   *
-   * @throws Exception On any error.
    */
-  @throws(classOf[Exception])
-  def handleDataChange(dataPath: String, data: Object) {
+  def doHandleDataChange(dataPath: String, data: AnyRef) {
     inLock(controllerContext.controllerLock) {
       debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
       val topicAndPartition = TopicAndPartition(topic, partition)
@@ -1325,13 +1313,8 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
     }
   }
 
-  /**
-   * @throws Exception
-   *             On any error.
-   */
-  @throws(classOf[Exception])
-  def handleDataDeleted(dataPath: String) {
-  }
+  def doHandleDataDeleted(dataPath: String) {}
+
 }
 
 /**
@@ -1339,23 +1322,22 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
  *
  * @param controller
  */
-class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging {
+class IsrChangeNotificationListener(protected val controller: KafkaController) extends ControllerZkChildListener {
 
-  override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = {
-    import scala.collection.JavaConverters._
+  protected def logName = "IsrChangeNotificationListener"
 
+  def doHandleChildChange(parentPath: String, currentChildren: Seq[String]): Unit = {
     inLock(controller.controllerContext.controllerLock) {
-      debug("[IsrChangeNotificationListener] Fired!!!")
-      val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
+      debug("ISR change notification listener fired")
       try {
-        val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.flatMap(x => getTopicAndPartition(x)).toSet
+        val topicAndPartitions = currentChildren.flatMap(getTopicAndPartition).toSet
         if (topicAndPartitions.nonEmpty) {
           controller.updateLeaderAndIsrCache(topicAndPartitions)
           processUpdateNotifications(topicAndPartitions)
         }
       } finally {
         // delete processed children
-        childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath(
+        currentChildren.map(x => controller.controllerContext.zkUtils.deletePath(
           ZkUtils.IsrChangeNotificationPath + "/" + x))
       }
     }
@@ -1404,28 +1386,28 @@ object IsrChangeNotificationListener {
  * Starts the preferred replica leader election for the list of partitions specified under
  * /admin/preferred_replica_election -
  */
-class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging {
-  this.logIdent = "[PreferredReplicaElectionListener on " + controller.config.brokerId + "]: "
-  val zkUtils = controller.controllerContext.zkUtils
-  val controllerContext = controller.controllerContext
+class PreferredReplicaElectionListener(protected val controller: KafkaController) extends ControllerZkDataListener {
+  private val controllerContext = controller.controllerContext
+
+  protected def logName = "PreferredReplicaElectionListener"
 
   /**
    * Invoked when some partitions are reassigned by the admin command
    *
    * @throws Exception On any error.
    */
-  @throws(classOf[Exception])
-  def handleDataChange(dataPath: String, data: Object) {
+  @throws[Exception]
+  def doHandleDataChange(dataPath: String, data: AnyRef) {
     debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
             .format(dataPath, data.toString))
     inLock(controllerContext.controllerLock) {
       val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
-      if(controllerContext.partitionsUndergoingPreferredReplicaElection.nonEmpty)
+      if (controllerContext.partitionsUndergoingPreferredReplicaElection.nonEmpty)
         info("These partitions are already undergoing preferred replica election: %s"
           .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
       val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
       val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
-      if(partitionsForTopicsToBeDeleted.nonEmpty) {
+      if (partitionsForTopicsToBeDeleted.nonEmpty) {
         error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
           .format(partitionsForTopicsToBeDeleted))
       }
@@ -1433,12 +1415,7 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
     }
   }
 
-  /**
-   * @throws Exception On any error.
-   */
-  @throws(classOf[Exception])
-  def handleDataDeleted(dataPath: String) {
-  }
+  def doHandleDataDeleted(dataPath: String) {}
 }
 
 case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a786be94/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index a5285c3..c0b94b1 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -17,13 +17,11 @@
 package kafka.controller
 
 import collection._
-import collection.JavaConverters._
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
 import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
 import kafka.utils.{Logging, ReplicationUtils}
 import kafka.utils.ZkUtils._
-import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.controller.Callbacks.CallbackBuilder
 import kafka.utils.CoreUtils._
@@ -48,8 +46,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
   private val hasStarted = new AtomicBoolean(false)
   private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
-  private val topicChangeListener = new TopicChangeListener()
-  private val deleteTopicsListener = new DeleteTopicsListener()
+  private val topicChangeListener = new TopicChangeListener(controller)
+  private val deleteTopicsListener = new DeleteTopicsListener(controller)
   private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty
   private val stateChangeLogger = KafkaController.stateChangeLogger
 
@@ -375,7 +373,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   }
 
   def registerPartitionChangeListener(topic: String) = {
-    partitionModificationsListeners.put(topic, new PartitionModificationsListener(topic))
+    partitionModificationsListeners.put(topic, new PartitionModificationsListener(controller, topic))
     zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
   }
 
@@ -406,17 +404,17 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   /**
    * This is the zookeeper listener that triggers all the state transitions for a partition
    */
-  class TopicChangeListener extends IZkChildListener with Logging {
-    this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: "
+  class TopicChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {
 
-    @throws(classOf[Exception])
-    def handleChildChange(parentPath : String, children : java.util.List[String]) {
+    protected def logName = "TopicChangeListener"
+
+    def doHandleChildChange(parentPath: String, children: Seq[String]) {
       inLock(controllerContext.controllerLock) {
         if (hasStarted.get) {
           try {
             val currentChildren = {
-              debug("Topic change listener fired for path %s with children %s".format(parentPath, children.asScala.mkString(",")))
-              children.asScala.toSet
+              debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
+              children.toSet
             }
             val newTopics = currentChildren -- controllerContext.allTopics
             val deletedTopics = controllerContext.allTopics -- currentChildren
@@ -431,7 +429,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             if (newTopics.nonEmpty)
               controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
           } catch {
-            case e: Throwable => error("Error while handling new topic", e )
+            case e: Throwable => error("Error while handling new topic", e)
           }
         }
       }
@@ -443,21 +441,22 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * 1. Add the topic to be deleted to the delete topics cache, only if the topic exists
    * 2. If there are topics to be deleted, it signals the delete topic thread
    */
-  class DeleteTopicsListener() extends IZkChildListener with Logging {
-    this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
-    val zkUtils = controllerContext.zkUtils
+  class DeleteTopicsListener(protected val controller: KafkaController) extends ControllerZkChildListener {
+    private val zkUtils = controllerContext.zkUtils
+
+    protected def logName = "DeleteTopicsListener"
 
     /**
      * Invoked when a topic is being deleted
      * @throws Exception On any error.
      */
-    @throws(classOf[Exception])
-    def handleChildChange(parentPath : String, children : java.util.List[String]) {
+    @throws[Exception]
+    def doHandleChildChange(parentPath: String, children: Seq[String]) {
       inLock(controllerContext.controllerLock) {
-        var topicsToBeDeleted = children.asScala.toSet
+        var topicsToBeDeleted = children.toSet
         debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
         val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
-        if(nonExistentTopics.nonEmpty) {
+        if (nonExistentTopics.nonEmpty) {
           warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
           nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
         }
@@ -481,29 +480,20 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
           for (topic <- topicsToBeDeleted) {
             info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled")
-            val zkUtils = controllerContext.zkUtils
             zkUtils.zkClient.delete(getDeleteTopicPath(topic))
           }
         }
       }
     }
 
-    /**
-     *
-     * @throws Exception
-   *             On any error.
-     */
-    @throws(classOf[Exception])
-    def handleDataDeleted(dataPath: String) {
-    }
+    def doHandleDataDeleted(dataPath: String) {}
   }
 
-  class PartitionModificationsListener(topic: String) extends IZkDataListener with Logging {
+  class PartitionModificationsListener(protected val controller: KafkaController, topic: String) extends ControllerZkDataListener {
 
-    this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: "
+    protected def logName = "AddPartitionsListener"
 
-    @throws(classOf[Exception])
-    def handleDataChange(dataPath : String, data: Object) {
+    def doHandleDataChange(dataPath: String, data: AnyRef) {
       inLock(controllerContext.controllerLock) {
         try {
           info(s"Partition modification triggered $data for path $dataPath")
@@ -521,15 +511,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             }
           }
         } catch {
-          case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
+          case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e)
         }
       }
     }
 
-    @throws(classOf[Exception])
-    def handleDataDeleted(parentPath : String) {
-      // this is not implemented for partition change
-    }
+    // this is not implemented for partition change
+    def doHandleDataDeleted(parentPath: String): Unit = {}
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a786be94/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index a26f95a..b106b01 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -17,13 +17,11 @@
 package kafka.controller
 
 import collection._
-import collection.JavaConverters._
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.common.{StateChangeFailedException, TopicAndPartition}
 import kafka.controller.Callbacks.CallbackBuilder
 import kafka.utils.{Logging, ReplicationUtils, ZkUtils}
-import org.I0Itec.zkclient.IZkChildListener
 import kafka.utils.CoreUtils._
 
 /**
@@ -49,7 +47,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   private val controllerId = controller.config.brokerId
   private val zkUtils = controllerContext.zkUtils
   private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
-  private val brokerChangeListener = new BrokerChangeListener()
+  private val brokerChangeListener = new BrokerChangeListener(controller)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
   private val hasStarted = new AtomicBoolean(false)
   private val stateChangeLogger = KafkaController.stateChangeLogger
@@ -348,15 +346,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   /**
    * This is the zookeeper listener that triggers all the state transitions for a replica
    */
-  class BrokerChangeListener() extends IZkChildListener with Logging {
-    this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
-    def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
-      info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.asScala.sorted.mkString(",")))
+  class BrokerChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {
+
+    protected def logName = "BrokerChangeListener"
+
+    def doHandleChildChange(parentPath: String, currentBrokerList: Seq[String]) {
+      info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
       inLock(controllerContext.controllerLock) {
         if (hasStarted.get) {
           ControllerStats.leaderElectionTimer.time {
             try {
-              val curBrokers = currentBrokerList.asScala.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
+              val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
               val curBrokerIds = curBrokers.map(_.id)
               val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
               val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds

http://git-wip-us.apache.org/repos/asf/kafka/blob/a786be94/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index a89d515..c66f9e3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1153,7 +1153,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     }
 
-    if (!controller.isActive()) {
+    if (!controller.isActive) {
       val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
         (topic, Errors.NOT_CONTROLLER)
       }
@@ -1203,7 +1203,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     }
 
-    if (!controller.isActive()) {
+    if (!controller.isActive) {
       val results = deleteTopicRequest.topics.asScala.map { topic =>
         (topic, Errors.NOT_CONTROLLER)
       }.toMap

http://git-wip-us.apache.org/repos/asf/kafka/blob/a786be94/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 0ae9124..4133145 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -92,12 +92,12 @@ class KafkaHealthcheck(brokerId: Int,
       }
     }
 
-    @throws(classOf[Exception])
+    @throws[Exception]
     override def handleStateChanged(state: KeeperState) {
       stateToMeterMap.get(state).foreach(_.mark())
     }
 
-    @throws(classOf[Exception])
+    @throws[Exception]
     override def handleNewSession() {
       info("re-registering broker info in ZK for broker " + brokerId)
       register()

http://git-wip-us.apache.org/repos/asf/kafka/blob/a786be94/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index d9e2b5b..ca0f6a0 100755
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -16,7 +16,6 @@
  */
 package kafka.server
 
-import kafka.utils.ZkUtils._
 import kafka.utils.CoreUtils._
 import kafka.utils.{Json, Logging, ZKCheckedEphemeral}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -121,7 +120,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
      * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
      * @throws Exception On any error.
      */
-    @throws(classOf[Exception])
+    @throws[Exception]
     def handleDataChange(dataPath: String, data: Object) {
       val shouldResign = inLock(controllerContext.controllerLock) {
         val amILeaderBeforeDataChange = amILeader
@@ -131,9 +130,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
         amILeaderBeforeDataChange && !amILeader
       }
 
-      if (shouldResign) {
+      if (shouldResign)
         onResigningAsLeader()
-      }
     }
 
     /**
@@ -141,7 +139,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
      * @throws Exception
      *             On any error.
      */
-    @throws(classOf[Exception])
+    @throws[Exception]
     def handleDataDeleted(dataPath: String) { 
       val shouldResign = inLock(controllerContext.controllerLock) {
         debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
@@ -149,9 +147,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
         amILeader
       }
 
-      if(shouldResign) {
+      if (shouldResign)
         onResigningAsLeader()
-      }
 
       inLock(controllerContext.controllerLock) {
         elect

http://git-wip-us.apache.org/repos/asf/kafka/blob/a786be94/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 63afd4e..fd23894 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -68,7 +68,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
     val epochMap: mutable.Map[Int, Int] = mutable.Map.empty
     for (server <- this.servers) {
       epochMap += (server.config.brokerId -> server.kafkaController.epoch)
-      if(server.kafkaController.isActive()) {
+      if(server.kafkaController.isActive) {
         controller = server
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a786be94/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 35dbbf0..a166495 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -56,13 +56,13 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
 
   def controllerSocketServer = {
     servers.find { server =>
-      server.kafkaController.isActive()
+      server.kafkaController.isActive
     }.map(_.socketServer).getOrElse(throw new IllegalStateException("No controller broker is available"))
   }
 
   def notControllerSocketServer = {
     servers.find { server =>
-      !server.kafkaController.isActive()
+      !server.kafkaController.isActive
     }.map(_.socketServer).getOrElse(throw new IllegalStateException("No non-controller broker is available"))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a786be94/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 11dd6fe..9bcf4fd 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -49,7 +49,7 @@ class MetadataRequestTest extends BaseRequestTest {
 
   @Test
   def testControllerId() {
-    val controllerServer = servers.find(_.kafkaController.isActive()).get
+    val controllerServer = servers.find(_.kafkaController.isActive).get
     val controllerId = controllerServer.config.brokerId
     val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
 
@@ -60,7 +60,7 @@ class MetadataRequestTest extends BaseRequestTest {
     controllerServer.shutdown()
     controllerServer.startup()
 
-    val controllerServer2 = servers.find(_.kafkaController.isActive()).get
+    val controllerServer2 = servers.find(_.kafkaController.isActive).get
     val controllerId2 = controllerServer2.config.brokerId
     assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2)
     TestUtils.waitUntilTrue(() => {