You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/20 01:48:57 UTC

kafka git commit: KAFKA-6071; Use ZookeeperClient in LogManager

Repository: kafka
Updated Branches:
  refs/heads/trunk 5ec6765bd -> fdbd4d62f


KAFKA-6071; Use ZookeeperClient in LogManager

Author: Manikumar Reddy <ma...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #4089 from omkreddy/KAFKA-6071-ZK-LOGMANAGER


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fdbd4d62
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fdbd4d62
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fdbd4d62

Branch: refs/heads/trunk
Commit: fdbd4d62f30991db7e7ce8ee9004a0431c8c226e
Parents: 5ec6765
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Thu Oct 19 18:48:54 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Oct 19 18:48:54 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogManager.scala  |  9 +++---
 .../main/scala/kafka/server/KafkaServer.scala   | 31 ++++++++++----------
 2 files changed, 20 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fdbd4d62/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 0c92c71..bd7f023 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -22,8 +22,8 @@ import java.nio.file.Files
 import java.util.concurrent._
 
 import com.yammer.metrics.core.Gauge
-import kafka.admin.AdminUtils
 import kafka.common.KafkaException
+import kafka.controller.KafkaControllerZkUtils
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
@@ -888,7 +888,7 @@ object LogManager {
 
   def apply(config: KafkaConfig,
             initialOfflineDirs: Seq[String],
-            zkUtils: ZkUtils,
+            zkUtils: KafkaControllerZkUtils,
             brokerState: BrokerState,
             kafkaScheduler: KafkaScheduler,
             time: Time,
@@ -897,9 +897,8 @@ object LogManager {
     val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
     val defaultLogConfig = LogConfig(defaultProps)
 
-    val topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) =>
-      topic -> LogConfig.fromProps(defaultProps, configs)
-    }
+    val (topicConfigs, failed) = zkUtils.getLogConfigs(zkUtils.getAllTopicsInCluster, defaultProps)
+    if (!failed.isEmpty) throw failed.head._2
 
     // read the log configurations from zookeeper
     val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,

http://git-wip-us.apache.org/repos/asf/kafka/blob/fdbd4d62/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 739405f..101e646 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -219,21 +219,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
-        /* start log manager */
-        logManager = LogManager(config, initialOfflineDirs, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
-        logManager.startup()
-
-        metadataCache = new MetadataCache(config.brokerId)
-        credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
-
-        socketServer = new SocketServer(config, metrics, time, credentialProvider)
-        socketServer.startup()
-
-        /* start replica manager */
-        replicaManager = createReplicaManager(isShuttingDown)
-        replicaManager.startup()
-
-        /* start kafka controller */
         val zookeeperClient = new ZookeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
           config.zkConnectionTimeoutMs, new StateChangeHandler {
             override def onReconnectionTimeout(): Unit = {
@@ -249,6 +234,22 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
             override def beforeInitializingSession(): Unit = kafkaController.expire()
           })
         kafkaControllerZkUtils = new KafkaControllerZkUtils(zookeeperClient, zkUtils.isSecure)
+
+        /* start log manager */
+        logManager = LogManager(config, initialOfflineDirs, kafkaControllerZkUtils, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
+        logManager.startup()
+
+        metadataCache = new MetadataCache(config.brokerId)
+        credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
+
+        socketServer = new SocketServer(config, metrics, time, credentialProvider)
+        socketServer.startup()
+
+        /* start replica manager */
+        replicaManager = createReplicaManager(isShuttingDown)
+        replicaManager.startup()
+
+        /* start kafka controller */
         kafkaController = new KafkaController(config, kafkaControllerZkUtils, time, metrics, threadNamePrefix)
         kafkaController.startup()