You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/21 21:34:20 UTC

[GitHub] [kafka] ccding opened a new pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

ccding opened a new pull request #11351:
URL: https://github.com/apache/kafka/pull/11351


   We have seen an exception caused by shutting down the scheduler before shutting down LogManager.
   
   When LogManager was closing partitions one by one, the scheduler called to delete old segments due to retention. However, the old segments could have been closed by the LogManager, which caused an exception and subsequently marked logdir as offline. As a result, the broker didn't flush the remaining partitions and didn't write the clean shutdown marker. Ultimately the broker took hours to recover the log during restart.
   
   This PR essentially reverts https://github.com/apache/kafka/pull/10538
   
   I believe the exception https://github.com/apache/kafka/pull/10538 saw is at https://github.com/apache/kafka/blob/5a6f19b2a1ff72c52ad627230ffdf464456104ee/core/src/main/scala/kafka/log/LocalLog.scala#L895-L903 which called the scheduler and crashed the compaction thread. The effect of this exception has been mitigated by https://github.com/apache/kafka/pull/10763
   
   cc @rondagostino @ijuma @cmccabe @junrao @dhruvilshah3 as authors/reviewers of the PRs mentioned above to make sure this change look okay.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r715088887



##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -141,9 +145,14 @@ class KafkaScheduler(val threads: Int,
       executor != null
     }
   }
-  
-  private def ensureRunning(): Unit = {
-    if (!isStarted)
-      throw new IllegalStateException("Kafka scheduler is not running.")
-  }
+}
+
+private class NoOpScheduledFutureTask() extends ScheduledFuture[Unit] {
+  override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
+  override def isCancelled: Boolean = true
+  override def isDone: Boolean = true
+  override def get(): Unit = {}
+  override def get(timeout: Long, unit: TimeUnit): Unit = {}
+  override def getDelay(unit: TimeUnit): Long = 0
+  override def compareTo(o: Delayed): Int = 0

Review comment:
       NoOpScheduledFutureTask extends ScheduledFuture, ScheduledFuture extends Delayed, Delayed extends Comparable<Delayed>: therefore we should use Delayed here. Also, it doesn't compile if I changed it to ScheduledFuture
   
   Fixed the return value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r715088887



##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -141,9 +145,14 @@ class KafkaScheduler(val threads: Int,
       executor != null
     }
   }
-  
-  private def ensureRunning(): Unit = {
-    if (!isStarted)
-      throw new IllegalStateException("Kafka scheduler is not running.")
-  }
+}
+
+private class NoOpScheduledFutureTask() extends ScheduledFuture[Unit] {
+  override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
+  override def isCancelled: Boolean = true
+  override def isDone: Boolean = true
+  override def get(): Unit = {}
+  override def get(timeout: Long, unit: TimeUnit): Unit = {}
+  override def getDelay(unit: TimeUnit): Long = 0
+  override def compareTo(o: Delayed): Int = 0

Review comment:
       NoOpScheduledFutureTask extends ScheduledFuture, ScheduledFuture extends Delayed, Delayed extends Comparable\<Delayed\>: therefore we should use Delayed here. Also, it doesn't compile if I changed it to ScheduledFuture
   
   Fixed the return value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714391884



##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -53,7 +53,7 @@ trait Scheduler {
    * @param unit The unit for the preceding times.
    * @return A Future object to manage the task scheduled.
    */
-  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : ScheduledFuture[_]
+  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : Option[ScheduledFuture[_]]

Review comment:
       We can return a Future that does nothing too, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714188175



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -482,6 +482,20 @@ class BrokerServer(
       }
       metadataSnapshotter.foreach(snapshotter => CoreUtils.swallow(snapshotter.close(), this))
 
+      /**
+       * We must shutdown the scheduler early because otherwise, the scheduler could touch other
+       * resources that might have been shutdown and cause exceptions.
+       * For example, if we didn't shutdown the scheduler first, when LogManager was closing
+       * partitions one by one, the scheduler might concurrently delete old segments due to
+       * retention. However, the old segments could have been closed by the LogManager, which would
+       * cause an exception and subsequently mark logdir as offline. As a result, the broker would
+       * not flush the remaining partitions or write the clean shutdown marker. Ultimately, the
+       * broker would have to take hours to recover the log during restart and are subject to
+       * potential data loss.

Review comment:
       I don't think unclean shutdown will cause data loss if acks = all is used. So, we can just remove that statement.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#issuecomment-926239908


   cherry-picked to 3.0 branch too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714377435



##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -53,7 +53,7 @@ trait Scheduler {
    * @param unit The unit for the preceding times.
    * @return A Future object to manage the task scheduled.
    */
-  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : ScheduledFuture[_]
+  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : Option[ScheduledFuture[_]]

Review comment:
       Because we decided not to throw an exception, to avoid polluting the log during broker shutdown. If we don't throw, we must return something: Option looks better than null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r713478871



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -499,12 +499,13 @@ class BrokerServer(
       if (clientToControllerChannelManager != null)
         CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-      if (logManager != null)
-        CoreUtils.swallow(logManager.shutdown(), this)
-      // be sure to shutdown scheduler after log manager
+      // be sure to shutdown scheduler before log manager

Review comment:
       (1) Could we add some comment to explain why we want to shut down the log manager later?
   (2) If we are shutting down, it makes sense to stop the scheduler first since there is no guarantee any asynchronously scheduled task will complete. If this is the case, should we shut down the scheduler before any other component (e.g. ReplicaManger also uses scheduler)?
   (3) Once the scheduler is shut down, scheduling new tasks causes IllegalStateException. That's probably the exception that https://github.com/apache/kafka/pull/10538 tries to fix. To avoid polluting the log, perhaps we could change KafkaScheduler such that it avoids throwing an exception once it's shutdown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714186270



##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -107,7 +107,10 @@ class KafkaScheduler(val threads: Int,
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
     this synchronized {
-      ensureRunning()
+      if (!isStarted) {
+        info("Kafka scheduler is not running at the time '%s' is scheduled.".format(name))
+        return null

Review comment:
       In UnifiedLog, we have code uses the returned future.
   
   `  val producerExpireCheck = scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
   `
   
   With this change, perhaps we could return an Option and let the caller deal with it accordingly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714216176



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -499,12 +499,13 @@ class BrokerServer(
       if (clientToControllerChannelManager != null)
         CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-      if (logManager != null)
-        CoreUtils.swallow(logManager.shutdown(), this)
-      // be sure to shutdown scheduler after log manager
+      // be sure to shutdown scheduler before log manager

Review comment:
       Yeah, it does not wait for previously submitted tasks to complete execution, also does not kill the tasks. KafkaScheduler does call `awaitTermination` with a 1-day timeout. Do you think we want a shorter timeout?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714375625



##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -53,7 +53,7 @@ trait Scheduler {
    * @param unit The unit for the preceding times.
    * @return A Future object to manage the task scheduled.
    */
-  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : ScheduledFuture[_]
+  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : Option[ScheduledFuture[_]]

Review comment:
       Why do we need an `Option` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r713531154



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -499,12 +499,13 @@ class BrokerServer(
       if (clientToControllerChannelManager != null)
         CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-      if (logManager != null)
-        CoreUtils.swallow(logManager.shutdown(), this)
-      // be sure to shutdown scheduler after log manager
+      // be sure to shutdown scheduler before log manager

Review comment:
       Thanks for the code review. I moved `kafkaScheduler.shutdown` upfront, changed the exception to an info-level log, and added comments.
   
   For the exception to log change: I looked at the code that calls `startup` and `shutdown`. It appears we always call startup right after creating the scheduler object and call shutdown when calling the parent's shutdown or close. I think the callers make sure that they won't call `scheduler.schedule` to a scheduler that has been shutdown, unless we are shutting down the broker. So it should be okay to change the `IllegalStateException` to an info-level log




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714188175



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -482,6 +482,20 @@ class BrokerServer(
       }
       metadataSnapshotter.foreach(snapshotter => CoreUtils.swallow(snapshotter.close(), this))
 
+      /**
+       * We must shutdown the scheduler early because otherwise, the scheduler could touch other
+       * resources that might have been shutdown and cause exceptions.
+       * For example, if we didn't shutdown the scheduler first, when LogManager was closing
+       * partitions one by one, the scheduler might concurrently delete old segments due to
+       * retention. However, the old segments could have been closed by the LogManager, which would
+       * cause an exception and subsequently mark logdir as offline. As a result, the broker would
+       * not flush the remaining partitions or write the clean shutdown marker. Ultimately, the
+       * broker would have to take hours to recover the log during restart and are subject to
+       * potential data loss.

Review comment:
       I don't think unclean shutdown will cause data loss if acks = all is used.

##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -107,7 +107,10 @@ class KafkaScheduler(val threads: Int,
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
     this synchronized {
-      ensureRunning()
+      if (!isStarted) {
+        info("Kafka scheduler is not running at the time '%s' is scheduled.".format(name))
+        return null

Review comment:
       In UnifiedLog, we have code uses the returned future.
   
   `  val producerExpireCheck = scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
   `
   With this change, perhaps we could return an Option and let the caller deal with it accordingly?

##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -107,7 +107,10 @@ class KafkaScheduler(val threads: Int,
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
     this synchronized {
-      ensureRunning()
+      if (!isStarted) {
+        info("Kafka scheduler is not running at the time '%s' is scheduled.".format(name))

Review comment:
       Perhaps we could add that we are ignoring the named task.

##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -482,6 +482,20 @@ class BrokerServer(
       }
       metadataSnapshotter.foreach(snapshotter => CoreUtils.swallow(snapshotter.close(), this))
 
+      /**
+       * We must shutdown the scheduler early because otherwise, the scheduler could touch other
+       * resources that might have been shutdown and cause exceptions.
+       * For example, if we didn't shutdown the scheduler first, when LogManager was closing
+       * partitions one by one, the scheduler might concurrently delete old segments due to
+       * retention. However, the old segments could have been closed by the LogManager, which would
+       * cause an exception and subsequently mark logdir as offline. As a result, the broker would

Review comment:
       exception => IOException ?

##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -499,12 +499,13 @@ class BrokerServer(
       if (clientToControllerChannelManager != null)
         CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-      if (logManager != null)
-        CoreUtils.swallow(logManager.shutdown(), this)
-      // be sure to shutdown scheduler after log manager
+      // be sure to shutdown scheduler before log manager

Review comment:
       @ijuma and @ccding :  The description of ScheduledThreadPoolExecutor.shutdown() says 
   
   `This method does not wait for previously submitted tasks to complete execution. Use awaitTermination to do that.
   `
   
   So, we probably want to call awaitTermination() with a timeout like 10secs to make sure all existing tasks complete before shutting down other components.
   
   We probably can't use shutdownNow() since it interrupts the task and could cause IOException when blocking operations (e.g., force) are applied on a file channel. This will then lead to unclean shutdown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#issuecomment-925172265


   @ccding : For the side effect that you mentioned, a segment to be renamed to .delete, but won't be physically deleted. This seems fine since on broker restart, we have the logic to delete all .delete files.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding edited a comment on pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ccding edited a comment on pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#issuecomment-925201730


   @junrao for the side effect, I am saying
   ```
   sortedNewSegments.foreach(_.changeFileSuffixes(SwapFileSuffix, "")) 
   ```
   will execute. #10763 mainly targets the case that this line doesn't run before shutting down.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#issuecomment-925334750


   I pushed twice with the two commits. It appears Jenkins is currently running on the first commit, and the second one is pending. Unfortunately, I don't have permission to stop it from running the first one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#issuecomment-925205714


   Addressed all comments above. PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714439705



##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -53,7 +53,7 @@ trait Scheduler {
    * @param unit The unit for the preceding times.
    * @return A Future object to manage the task scheduled.
    */
-  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : ScheduledFuture[_]
+  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : Option[ScheduledFuture[_]]

Review comment:
       The question is, do we need to let the caller know that the schedule call has failed? I am not sure if we will have this use case in the future, though the current codebase doesn't have such a case.
   
   Please let me know if you still think we should return a Future that does nothing. I can update the PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714410283



##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -103,11 +103,14 @@ class KafkaScheduler(val threads: Int,
     schedule(name, fun, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
   }
 
-  def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: TimeUnit): ScheduledFuture[_] = {
+  def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: TimeUnit): Option[ScheduledFuture[_]] = {
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
     this synchronized {
-      ensureRunning()
+      if (!isStarted) {
+        info("Kafka scheduler is not running at the time task '%s' is scheduled. The task is ignored.".format(name))
+        return None

Review comment:
       Thanks. Fixed it. I somehow got the habit from other languages of returning first in error cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714467779



##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -53,7 +53,7 @@ trait Scheduler {
    * @param unit The unit for the preceding times.
    * @return A Future object to manage the task scheduled.
    */
-  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : ScheduledFuture[_]
+  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : Option[ScheduledFuture[_]]

Review comment:
       Yeah, it seems that we are not using this functionality at the moment so not sure if the additional complexity helps. Also, we can't guarantee that the submission will not go through just before the scheduler is closed. So, it gives a misleading impression that we can count on this behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao merged pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
junrao merged pull request #11351:
URL: https://github.com/apache/kafka/pull/11351


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714082956



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -499,12 +499,13 @@ class BrokerServer(
       if (clientToControllerChannelManager != null)
         CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-      if (logManager != null)
-        CoreUtils.swallow(logManager.shutdown(), this)
-      // be sure to shutdown scheduler after log manager
+      // be sure to shutdown scheduler before log manager

Review comment:
       @junrao `shutdown`:
   
   "Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down."
   
   Maybe you were thinking of `shutdownNow`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r713489901



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -499,12 +499,13 @@ class BrokerServer(
       if (clientToControllerChannelManager != null)
         CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-      if (logManager != null)
-        CoreUtils.swallow(logManager.shutdown(), this)
-      // be sure to shutdown scheduler after log manager
+      // be sure to shutdown scheduler before log manager

Review comment:
       Are all tasks scheduled of the async variety? It would be nice if we would stop issuing new async tasks when shutdown starts, but let the existing ones complete.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714283329



##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -53,7 +53,7 @@ trait Scheduler {
    * @param unit The unit for the preceding times.
    * @return A Future object to manage the task scheduled.
    */
-  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : ScheduledFuture[_]
+  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : Option[ScheduledFuture[_]]

Review comment:
       Could we change the comment on return value accordingly?

##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -537,12 +537,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   newGauge(LogMetricNames.LogEndOffset, () => logEndOffset, tags)
   newGauge(LogMetricNames.Size, () => size, tags)
 
-  val producerExpireCheck = scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
+  val producerExpireCheckOption = scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
     lock synchronized {
       producerStateManager.removeExpiredProducers(time.milliseconds)
     }
   }, period = producerIdExpirationCheckIntervalMs, delay = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
 
+  if (producerExpireCheckOption.isEmpty)
+    throw new IllegalStateException("Failed to schedule PeriodicProducerExpirationCheck witch KafkaScheduler.")

Review comment:
       If we get here, it's because we are shutting down the broker. So, it doesn't seem it's an illegal state. Perhaps we could just let it go?

##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -499,12 +499,13 @@ class BrokerServer(
       if (clientToControllerChannelManager != null)
         CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-      if (logManager != null)
-        CoreUtils.swallow(logManager.shutdown(), this)
-      // be sure to shutdown scheduler after log manager
+      // be sure to shutdown scheduler before log manager

Review comment:
       @ccding : Missed that part. `awaitTermination` with a 1-day timeout is fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#issuecomment-925201730


   @junrao for the side effect, I am mainly saying
   ```
   sortedNewSegments.foreach(_.changeFileSuffixes(SwapFileSuffix, "")) 
   ```
   will execute. #10763 mainly targets the case that this line doesn't run before shutting down.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#issuecomment-924557403


   FYI: there is a side effect of not throwing `IllegalStateException`. The `deleteSegment` function will not throw and we will be able to rename the `.swap` files to regular files during shutdown. This is a good thing.
   
   https://github.com/apache/kafka/blob/5a6f19b2a1ff72c52ad627230ffdf464456104ee/core/src/main/scala/kafka/log/LocalLog.scala#L895-L909


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714121176



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -499,12 +499,13 @@ class BrokerServer(
       if (clientToControllerChannelManager != null)
         CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-      if (logManager != null)
-        CoreUtils.swallow(logManager.shutdown(), this)
-      // be sure to shutdown scheduler after log manager
+      // be sure to shutdown scheduler before log manager

Review comment:
       IIUC, the code (1) lets running tasks to complete, (2) waits for tasks scheduled with `executor.schedule(runnable, delay, unit)` to finish after the delay, and (3) cancels future tasks scheduled by `executor.scheduleAtFixedRate(runnable, delay, period, unit)`
   
   https://github.com/apache/kafka/blob/b61ec0003f907b61243102811fdb2e92f8d7d2c5/core/src/main/scala/kafka/utils/KafkaScheduler.scala#L121-L124
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714401442



##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -103,11 +103,14 @@ class KafkaScheduler(val threads: Int,
     schedule(name, fun, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
   }
 
-  def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: TimeUnit): ScheduledFuture[_] = {
+  def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: TimeUnit): Option[ScheduledFuture[_]] = {
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
     this synchronized {
-      ensureRunning()
+      if (!isStarted) {
+        info("Kafka scheduler is not running at the time task '%s' is scheduled. The task is ignored.".format(name))
+        return None

Review comment:
       With little bit refactor, it looks possible to avoid the `return` statement here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r715068733



##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -141,9 +145,14 @@ class KafkaScheduler(val threads: Int,
       executor != null
     }
   }
-  
-  private def ensureRunning(): Unit = {
-    if (!isStarted)
-      throw new IllegalStateException("Kafka scheduler is not running.")
-  }
+}
+
+private class NoOpScheduledFutureTask() extends ScheduledFuture[Unit] {
+  override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
+  override def isCancelled: Boolean = true
+  override def isDone: Boolean = true
+  override def get(): Unit = {}
+  override def get(timeout: Long, unit: TimeUnit): Unit = {}
+  override def getDelay(unit: TimeUnit): Long = 0
+  override def compareTo(o: Delayed): Int = 0

Review comment:
       Should Delayed be ScheduledFuture? Also, instead of always returning 0, it seems that it's better to return 0 if the other instance is NoOpScheduledFutureTask and return -1 or 1 otherwise?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r713494615



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -499,12 +499,13 @@ class BrokerServer(
       if (clientToControllerChannelManager != null)
         CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-      if (logManager != null)
-        CoreUtils.swallow(logManager.shutdown(), this)
-      // be sure to shutdown scheduler after log manager
+      // be sure to shutdown scheduler before log manager

Review comment:
       It seems that none of the scheduled calls depends on the completion of the returned future. When shutting down KafkaScheduler, we call ScheduledThreadPoolExecutor.shutdown(), which doesn't wait for all existing tasks to complete. This seems ok since for important tasks (e.g. flushing the log), we make explicit calls during shutdown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org