You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/05 21:59:02 UTC

[GitHub] [kafka] mumrah commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup

mumrah commented on a change in pull request #10039:
URL: https://github.com/apache/kafka/pull/10039#discussion_r571268386



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File],
   /**
    *  Start the background threads to flush logs and do log cleanup
    */
-  def startup(): Unit = {
+  def startup(retrieveTopicNames: => Set[String]): Unit = {
+    startupWithTopicLogConfigOverrides(generateTopicLogConfigs(retrieveTopicNames))

Review comment:
       We are able to load the topic configs right away because they are coming from ZK, right?
   
   

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File],
   /**
    *  Start the background threads to flush logs and do log cleanup
    */
-  def startup(): Unit = {
+  def startup(retrieveTopicNames: => Set[String]): Unit = {
+    startupWithTopicLogConfigOverrides(generateTopicLogConfigs(retrieveTopicNames))
+  }
+
+  // visible for testing
+  private[log] def generateTopicLogConfigs(topicNames: Set[String]): Map[String, LogConfig] = {
+    val topicLogConfigs: mutable.Map[String, LogConfig] = mutable.Map()

Review comment:
       nit: can move type info to right hand side and just have `val topicLogConfigs = ...`

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File],
   /**
    *  Start the background threads to flush logs and do log cleanup
    */
-  def startup(): Unit = {
+  def startup(retrieveTopicNames: => Set[String]): Unit = {

Review comment:
       Feels slightly odd to pass in the set of topics to load here, but I can't think of a good way to avoid it. Perhaps we could pass MetadataCache into LogManager and let startup call MetadataCache#getAllTopics? That might be more risky though since it changes the startup order in KafkaServer, maybe we can look into this as a follow-up.
   
   Besides that, the name here seems strange. Maybe something like "topicsToLoad"?

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -352,13 +348,15 @@ class LogManager(logDirs: Seq[File],
         val numLogsLoaded = new AtomicInteger(0)
         numTotalLogs += logsToLoad.length
 
-        val jobsForDir = logsToLoad.map { logDir =>
+        val jobsForDir = logsToLoad
+          .filter(logDir => Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)

Review comment:
       Will the metadata topic get passed into LogManager? I would guess not since it's a Raft topic and not a regular Kafka topic. 
   
   Also style nit: you can do `logsToLoad.filter { logDir => ... }` similar to the `map` below




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org