You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/01/03 20:10:47 UTC

[2/2] git commit: Rebased from trunk to resolve conflicts between KAFKA-1185 and KAFKA-930

Rebased from trunk to resolve conflicts between KAFKA-1185 and KAFKA-930


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a119f532
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a119f532
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a119f532

Branch: refs/heads/trunk
Commit: a119f532c8b310122d85391efe11fd26027ef7f9
Parents: 10fa200 b23cf19
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Jan 3 11:06:04 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Jan 3 11:06:04 2014 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 102 +++++++++++++++++--
 .../main/scala/kafka/server/KafkaConfig.scala   |  12 +++
 2 files changed, 108 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a119f532/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/controller/KafkaController.scala
index 2fcc36d,ca2f09b..6215cb8
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@@ -110,8 -110,11 +110,11 @@@ class KafkaController(val config : Kafk
    val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
    private val partitionStateMachine = new PartitionStateMachine(this)
    private val replicaStateMachine = new ReplicaStateMachine(this)
-   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath,
-                                                              onControllerFailover, onControllerResignation, config.brokerId)
+   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
 -    config.brokerId)
++    onControllerResignation, config.brokerId)
+   // have a separate scheduler for the controller to be able to start and stop independently of the
+   // kafka server
+   private val autoRebalanceScheduler = new KafkaScheduler(1)
    val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
    private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
    private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@@ -910,8 -921,77 +937,71 @@@
      @throws(classOf[Exception])
      def handleNewSession() {
        info("ZK expired; shut down all controller components and try to re-elect")
-       onControllerResignation()
-       controllerElector.elect
+       controllerContext.controllerLock synchronized {
 -        Utils.unregisterMBean(KafkaController.MBeanName)
 -        partitionStateMachine.shutdown()
 -        replicaStateMachine.shutdown()
 -        if(controllerContext.controllerChannelManager != null) {
 -          controllerContext.controllerChannelManager.shutdown()
 -          controllerContext.controllerChannelManager = null
 -        }
++        onControllerResignation()
+         controllerElector.elect
+       }
+     }
+   }
+ 
+   private def checkAndTriggerPartitionRebalance(): Unit = {
+     if (isActive()) {
+       trace("checking need to trigger partition rebalance")
+       // get all the active brokers
+       var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
+       controllerContext.controllerLock synchronized {
+         preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy {
+           case(topicAndPartition, assignedReplicas) => assignedReplicas.head
+         }
+       }
+       debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
+       // for each broker, check if a preferred replica election needs to be triggered
+       preferredReplicasForTopicsByBrokers.foreach {
+         case(leaderBroker, topicAndPartitionsForBroker) => {
+           var imbalanceRatio: Double = 0
+           var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
+           controllerContext.controllerLock synchronized {
+             topicsNotInPreferredReplica =
+               topicAndPartitionsForBroker.filter {
+                 case(topicPartition, replicas) => {
+                   controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
+                 }
+               }
+             debug("topics not in preferred replica " + topicsNotInPreferredReplica)
+             val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
+             val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
+             imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
+             trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
+           }
+           // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
+           // that need to be on this broker
+           if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
+             controllerContext.controllerLock synchronized {
+               // do this check only if the broker is live and there are no partitions being reassigned currently
+               // and preferred replica election is not in progress
+               if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+                   controllerContext.partitionsBeingReassigned.size == 0 &&
+                   controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
+                 val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
+                 val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
+                 val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
+                 try {
+                   ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
+                   info("Created preferred replica election path with %s".format(jsonData))
+                 } catch {
+                   case e2: ZkNodeExistsException =>
+                     val partitionsUndergoingPreferredReplicaElection =
+                       PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
+                     error("Preferred replica leader election currently in progress for " +
+                           "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
+                   case e3: Throwable =>
+                     error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys))
+                 }
+               }
+             }
+           }
+         }
+       }
      }
    }
  }