You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/20 01:48:57 UTC
kafka git commit: KAFKA-6071; Use ZookeeperClient in LogManager
Repository: kafka
Updated Branches:
refs/heads/trunk 5ec6765bd -> fdbd4d62f
KAFKA-6071; Use ZookeeperClient in LogManager
Author: Manikumar Reddy <ma...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #4089 from omkreddy/KAFKA-6071-ZK-LOGMANAGER
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fdbd4d62
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fdbd4d62
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fdbd4d62
Branch: refs/heads/trunk
Commit: fdbd4d62f30991db7e7ce8ee9004a0431c8c226e
Parents: 5ec6765
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Thu Oct 19 18:48:54 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Oct 19 18:48:54 2017 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogManager.scala | 9 +++---
.../main/scala/kafka/server/KafkaServer.scala | 31 ++++++++++----------
2 files changed, 20 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fdbd4d62/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 0c92c71..bd7f023 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -22,8 +22,8 @@ import java.nio.file.Files
import java.util.concurrent._
import com.yammer.metrics.core.Gauge
-import kafka.admin.AdminUtils
import kafka.common.KafkaException
+import kafka.controller.KafkaControllerZkUtils
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
@@ -888,7 +888,7 @@ object LogManager {
def apply(config: KafkaConfig,
initialOfflineDirs: Seq[String],
- zkUtils: ZkUtils,
+ zkUtils: KafkaControllerZkUtils,
brokerState: BrokerState,
kafkaScheduler: KafkaScheduler,
time: Time,
@@ -897,9 +897,8 @@ object LogManager {
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultLogConfig = LogConfig(defaultProps)
- val topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) =>
- topic -> LogConfig.fromProps(defaultProps, configs)
- }
+ val (topicConfigs, failed) = zkUtils.getLogConfigs(zkUtils.getAllTopicsInCluster, defaultProps)
+ if (!failed.isEmpty) throw failed.head._2
// read the log configurations from zookeeper
val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
http://git-wip-us.apache.org/repos/asf/kafka/blob/fdbd4d62/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 739405f..101e646 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -219,21 +219,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
- /* start log manager */
- logManager = LogManager(config, initialOfflineDirs, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
- logManager.startup()
-
- metadataCache = new MetadataCache(config.brokerId)
- credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
-
- socketServer = new SocketServer(config, metrics, time, credentialProvider)
- socketServer.startup()
-
- /* start replica manager */
- replicaManager = createReplicaManager(isShuttingDown)
- replicaManager.startup()
-
- /* start kafka controller */
val zookeeperClient = new ZookeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs, new StateChangeHandler {
override def onReconnectionTimeout(): Unit = {
@@ -249,6 +234,22 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
override def beforeInitializingSession(): Unit = kafkaController.expire()
})
kafkaControllerZkUtils = new KafkaControllerZkUtils(zookeeperClient, zkUtils.isSecure)
+
+ /* start log manager */
+ logManager = LogManager(config, initialOfflineDirs, kafkaControllerZkUtils, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
+ logManager.startup()
+
+ metadataCache = new MetadataCache(config.brokerId)
+ credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
+
+ socketServer = new SocketServer(config, metrics, time, credentialProvider)
+ socketServer.startup()
+
+ /* start replica manager */
+ replicaManager = createReplicaManager(isShuttingDown)
+ replicaManager.startup()
+
+ /* start kafka controller */
kafkaController = new KafkaController(config, kafkaControllerZkUtils, time, metrics, threadNamePrefix)
kafkaController.startup()