You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/01/03 20:10:46 UTC

[1/2] git commit: KAFKA-1185 Improve leader elector module to have a resign API; reviewed by Guozhang Wang and Jun Rao

Updated Branches:
  refs/heads/trunk b23cf1968 -> a119f532c


KAFKA-1185 Improve leader elector module to have a resign API; reviewed by Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10fa2000
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10fa2000
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10fa2000

Branch: refs/heads/trunk
Commit: 10fa20001dd22a2cfc7da065d75d7cf6c0009b42
Parents: b5d1687
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Dec 20 14:39:03 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Dec 20 14:39:03 2013 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 32 ++++++++++-------
 .../kafka/server/ZookeeperLeaderElector.scala   | 36 +++++++++++---------
 2 files changed, 40 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/10fa2000/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 965d0e5..2fcc36d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -110,8 +110,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
   private val partitionStateMachine = new PartitionStateMachine(this)
   private val replicaStateMachine = new ReplicaStateMachine(this)
-  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
-    config.brokerId)
+  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath,
+                                                             onControllerFailover, onControllerResignation, config.brokerId)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@ -256,6 +256,22 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   /**
+   * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is
+   * required to clean up internal controller data structures
+   */
+  def onControllerResignation() {
+    controllerContext.controllerLock synchronized {
+      Utils.unregisterMBean(KafkaController.MBeanName)
+      partitionStateMachine.shutdown()
+      replicaStateMachine.shutdown()
+      if(controllerContext.controllerChannelManager != null) {
+        controllerContext.controllerChannelManager.shutdown()
+        controllerContext.controllerChannelManager = null
+      }
+    }
+  }
+
+  /**
    * Returns true if this broker is the current controller.
    */
   def isActive(): Boolean = {
@@ -894,16 +910,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     @throws(classOf[Exception])
     def handleNewSession() {
       info("ZK expired; shut down all controller components and try to re-elect")
-      controllerContext.controllerLock synchronized {
-        Utils.unregisterMBean(KafkaController.MBeanName)
-        partitionStateMachine.shutdown()
-        replicaStateMachine.shutdown()
-        if(controllerContext.controllerChannelManager != null) {
-          controllerContext.controllerChannelManager.shutdown()
-          controllerContext.controllerChannelManager = null
-        }
-        controllerElector.elect
-      }
+      onControllerResignation()
+      controllerElector.elect
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/10fa2000/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index cc6f1eb..b189619 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -30,7 +30,10 @@ import kafka.common.KafkaException
  * leader is dead, this class will handle automatic re-election and if it succeeds, it invokes the leader state change
  * callback
  */
-class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: String, onBecomingLeader: () => Unit,
+class ZookeeperLeaderElector(controllerContext: ControllerContext,
+                             electionPath: String,
+                             onBecomingLeader: () => Unit,
+                             onResigningAsLeader: () => Unit,
                              brokerId: Int)
   extends LeaderElector with Logging {
   var leaderId = -1
@@ -58,23 +61,22 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
       info(brokerId + " successfully elected as leader")
       leaderId = brokerId
       onBecomingLeader()
-      } catch {
-        case e: ZkNodeExistsException =>
-          // If someone else has written the path, then
-          leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
-            case Some(controller) => KafkaController.parseControllerId(controller)
-            case None => {
-              warn("A leader has been elected but just resigned, this will result in another round of election")
-              -1
-            }
+    } catch {
+      case e: ZkNodeExistsException =>
+        // If someone else has written the path, then
+        leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+          case Some(controller) => KafkaController.parseControllerId(controller)
+          case None => {
+            warn("A leader has been elected but just resigned, this will result in another round of election")
+            -1
           }
-          if (leaderId != -1)
-            debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
-        case e2: Throwable =>
-          error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
-          leaderId = -1
+        }
+        if (leaderId != -1)
+          debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
+      case e2: Throwable =>
+        error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
+        resign()
     }
-
     amILeader
   }
 
@@ -116,6 +118,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
       controllerContext.controllerLock synchronized {
         debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
           .format(brokerId, dataPath))
+        if(amILeader)
+          onResigningAsLeader()
         elect
       }
     }


[2/2] git commit: Rebased from trunk to resolve conflicts between KAFKA-1185 and KAFKA-930

Posted by ne...@apache.org.
Rebased from trunk to resolve conflicts between KAFKA-1185 and KAFKA-930


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a119f532
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a119f532
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a119f532

Branch: refs/heads/trunk
Commit: a119f532c8b310122d85391efe11fd26027ef7f9
Parents: 10fa200 b23cf19
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Jan 3 11:06:04 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Jan 3 11:06:04 2014 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 102 +++++++++++++++++--
 .../main/scala/kafka/server/KafkaConfig.scala   |  12 +++
 2 files changed, 108 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a119f532/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/controller/KafkaController.scala
index 2fcc36d,ca2f09b..6215cb8
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@@ -110,8 -110,11 +110,11 @@@ class KafkaController(val config : Kafk
    val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
    private val partitionStateMachine = new PartitionStateMachine(this)
    private val replicaStateMachine = new ReplicaStateMachine(this)
-   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath,
-                                                              onControllerFailover, onControllerResignation, config.brokerId)
+   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
 -    config.brokerId)
++    onControllerResignation, config.brokerId)
+   // have a separate scheduler for the controller to be able to start and stop independently of the
+   // kafka server
+   private val autoRebalanceScheduler = new KafkaScheduler(1)
    val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
    private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
    private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@@ -910,8 -921,77 +937,71 @@@
      @throws(classOf[Exception])
      def handleNewSession() {
        info("ZK expired; shut down all controller components and try to re-elect")
-       onControllerResignation()
-       controllerElector.elect
+       controllerContext.controllerLock synchronized {
 -        Utils.unregisterMBean(KafkaController.MBeanName)
 -        partitionStateMachine.shutdown()
 -        replicaStateMachine.shutdown()
 -        if(controllerContext.controllerChannelManager != null) {
 -          controllerContext.controllerChannelManager.shutdown()
 -          controllerContext.controllerChannelManager = null
 -        }
++        onControllerResignation()
+         controllerElector.elect
+       }
+     }
+   }
+ 
+   private def checkAndTriggerPartitionRebalance(): Unit = {
+     if (isActive()) {
+       trace("checking need to trigger partition rebalance")
+       // get all the active brokers
+       var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
+       controllerContext.controllerLock synchronized {
+         preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy {
+           case(topicAndPartition, assignedReplicas) => assignedReplicas.head
+         }
+       }
+       debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
+       // for each broker, check if a preferred replica election needs to be triggered
+       preferredReplicasForTopicsByBrokers.foreach {
+         case(leaderBroker, topicAndPartitionsForBroker) => {
+           var imbalanceRatio: Double = 0
+           var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
+           controllerContext.controllerLock synchronized {
+             topicsNotInPreferredReplica =
+               topicAndPartitionsForBroker.filter {
+                 case(topicPartition, replicas) => {
+                   controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
+                 }
+               }
+             debug("topics not in preferred replica " + topicsNotInPreferredReplica)
+             val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
+             val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
+             imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
+             trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
+           }
+           // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
+           // that need to be on this broker
+           if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
+             controllerContext.controllerLock synchronized {
+               // do this check only if the broker is live and there are no partitions being reassigned currently
+               // and preferred replica election is not in progress
+               if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+                   controllerContext.partitionsBeingReassigned.size == 0 &&
+                   controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
+                 val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
+                 val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
+                 val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
+                 try {
+                   ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
+                   info("Created preferred replica election path with %s".format(jsonData))
+                 } catch {
+                   case e2: ZkNodeExistsException =>
+                     val partitionsUndergoingPreferredReplicaElection =
+                       PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
+                     error("Preferred replica leader election currently in progress for " +
+                           "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
+                   case e3: Throwable =>
+                     error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys))
+                 }
+               }
+             }
+           }
+         }
+       }
      }
    }
  }