You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2014/02/25 09:27:19 UTC

[09/19] git commit: commit missing code

commit missing code


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/033872b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/033872b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/033872b3

Branch: refs/heads/trunk
Commit: 033872b316fd5a68d7463138d8199fb5d821f41b
Parents: a4cd17a
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Tue Nov 19 17:38:13 2013 -0800
Committer: Sriram Subramanian <sr...@gmail.com>
Committed: Tue Nov 19 17:38:13 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/controller/KafkaController.scala | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/033872b3/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 476ed86..e2ad682 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -114,7 +114,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     config.brokerId)
   // have a separate scheduler for the controller to be able to start and stop independently of the
   // kafka server
-  private val controllerScheduler = new KafkaScheduler(1)
+  private val autoRebalanceScheduler = new KafkaScheduler(1)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@ -255,8 +255,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
       if (config.autoLeaderRebalanceEnable) {
         info("starting the partition rebalance scheduler")
-        controllerScheduler.startup()
-        controllerScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
+        autoRebalanceScheduler.startup()
+        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
           5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
       }
     }
@@ -502,7 +502,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       isRunning = false
       partitionStateMachine.shutdown()
       replicaStateMachine.shutdown()
-      controllerScheduler.shutdown()
+      if (config.autoLeaderRebalanceEnable)
+        autoRebalanceScheduler.shutdown()
       if(controllerContext.controllerChannelManager != null) {
         controllerContext.controllerChannelManager.shutdown()
         controllerContext.controllerChannelManager = null