You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2014/02/25 09:27:19 UTC
[09/19] git commit: commit missing code
commit missing code
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/033872b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/033872b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/033872b3
Branch: refs/heads/trunk
Commit: 033872b316fd5a68d7463138d8199fb5d821f41b
Parents: a4cd17a
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Tue Nov 19 17:38:13 2013 -0800
Committer: Sriram Subramanian <sr...@gmail.com>
Committed: Tue Nov 19 17:38:13 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/controller/KafkaController.scala | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/033872b3/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 476ed86..e2ad682 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -114,7 +114,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
config.brokerId)
// have a separate scheduler for the controller to be able to start and stop independently of the
// kafka server
- private val controllerScheduler = new KafkaScheduler(1)
+ private val autoRebalanceScheduler = new KafkaScheduler(1)
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@ -255,8 +255,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
- controllerScheduler.startup()
- controllerScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
+ autoRebalanceScheduler.startup()
+ autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
}
}
@@ -502,7 +502,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
isRunning = false
partitionStateMachine.shutdown()
replicaStateMachine.shutdown()
- controllerScheduler.shutdown()
+ if (config.autoLeaderRebalanceEnable)
+ autoRebalanceScheduler.shutdown()
if(controllerContext.controllerChannelManager != null) {
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager = null