You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/09/23 23:38:37 UTC

[kafka] branch 3.0 updated: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown (#11351)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 954c66b  KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown (#11351)
954c66b is described below

commit 954c66bdec807e04ed9cba5e135daa23077d4a8e
Author: Cong Ding <co...@ccding.com>
AuthorDate: Thu Sep 23 18:28:39 2021 -0500

    KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown (#11351)
    
    This also fixes KAFKA-13070.
    
    We have seen a problem caused by shutting down the scheduler before shutting down LogManager.
    
    When LogManager was closing partitions one by one, the scheduler called to delete old segments due to retention. However, the old segments could have been closed by the LogManager, which caused an exception and subsequently marked logdir as offline. As a result, the broker didn't flush the remaining partitions and didn't write the clean shutdown marker. Ultimately the broker took hours to recover the log during restart.
    
    This PR essentially reverts #10538
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Kowshik Prakasam <kp...@confluent.io>, Jun Rao <ju...@gmail.com>
---
 .../src/main/scala/kafka/server/BrokerServer.scala | 16 +++++--
 core/src/main/scala/kafka/server/KafkaServer.scala | 16 +++++--
 .../main/scala/kafka/utils/KafkaScheduler.scala    | 50 ++++++++++++++--------
 3 files changed, 59 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index d2079c4..6bf3217 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -453,6 +453,19 @@ class BrokerServer(
       }
       metadataSnapshotter.foreach(snapshotter => CoreUtils.swallow(snapshotter.close(), this))
 
+      /**
+       * We must shutdown the scheduler early because otherwise, the scheduler could touch other
+       * resources that might have been shutdown and cause exceptions.
+       * For example, if we didn't shutdown the scheduler first, when LogManager was closing
+       * partitions one by one, the scheduler might concurrently delete old segments due to
+       * retention. However, the old segments could have been closed by the LogManager, which would
+       * cause an IOException and subsequently mark logdir as offline. As a result, the broker would
+       * not flush the remaining partitions or write the clean shutdown marker. Ultimately, the
+       * broker would have to take hours to recover the log during restart.
+       */
+      if (kafkaScheduler != null)
+        CoreUtils.swallow(kafkaScheduler.shutdown(), this)
+
       if (transactionCoordinator != null)
         CoreUtils.swallow(transactionCoordinator.shutdown(), this)
       if (groupCoordinator != null)
@@ -472,9 +485,6 @@ class BrokerServer(
 
       if (logManager != null)
         CoreUtils.swallow(logManager.shutdown(), this)
-      // be sure to shutdown scheduler after log manager
-      if (kafkaScheduler != null)
-        CoreUtils.swallow(kafkaScheduler.shutdown(), this)
 
       if (quotaManagers != null)
         CoreUtils.swallow(quotaManagers.shutdown(), this)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c22eab2..96576cd 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -672,6 +672,19 @@ class KafkaServer(
         if (controlPlaneRequestHandlerPool != null)
           CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this)
 
+        /**
+         * We must shutdown the scheduler early because otherwise, the scheduler could touch other
+         * resources that might have been shutdown and cause exceptions.
+         * For example, if we didn't shutdown the scheduler first, when LogManager was closing
+         * partitions one by one, the scheduler might concurrently delete old segments due to
+         * retention. However, the old segments could have been closed by the LogManager, which would
+         * cause an IOException and subsequently mark logdir as offline. As a result, the broker would
+         * not flush the remaining partitions or write the clean shutdown marker. Ultimately, the
+         * broker would have to take hours to recover the log during restart.
+         */
+        if (kafkaScheduler != null)
+          CoreUtils.swallow(kafkaScheduler.shutdown(), this)
+
         if (dataPlaneRequestProcessor != null)
           CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
         if (controlPlaneRequestProcessor != null)
@@ -699,9 +712,6 @@ class KafkaServer(
 
         if (logManager != null)
           CoreUtils.swallow(logManager.shutdown(), this)
-        // be sure to shutdown scheduler after log manager
-        if (kafkaScheduler != null)
-          CoreUtils.swallow(kafkaScheduler.shutdown(), this)
 
         if (kafkaController != null)
           CoreUtils.swallow(kafkaController.shutdown(), this)
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index f1585af..bec511b 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -21,6 +21,8 @@ import java.util.concurrent._
 import atomic._
 import org.apache.kafka.common.utils.KafkaThread
 
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
 /**
  * A scheduler for running jobs
  * 
@@ -107,21 +109,25 @@ class KafkaScheduler(val threads: Int,
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
     this synchronized {
-      ensureRunning()
-      val runnable: Runnable = () => {
-        try {
-          trace("Beginning execution of scheduled task '%s'.".format(name))
-          fun()
-        } catch {
-          case t: Throwable => error(s"Uncaught exception in scheduled task '$name'", t)
-        } finally {
-          trace("Completed execution of scheduled task '%s'.".format(name))
+      if (isStarted) {
+        val runnable: Runnable = () => {
+          try {
+            trace("Beginning execution of scheduled task '%s'.".format(name))
+            fun()
+          } catch {
+            case t: Throwable => error(s"Uncaught exception in scheduled task '$name'", t)
+          } finally {
+            trace("Completed execution of scheduled task '%s'.".format(name))
+          }
         }
+        if (period >= 0)
+          executor.scheduleAtFixedRate(runnable, delay, period, unit)
+        else
+          executor.schedule(runnable, delay, unit)
+      } else {
+        info("Kafka scheduler is not running at the time task '%s' is scheduled. The task is ignored.".format(name))
+        new NoOpScheduledFutureTask
       }
-      if (period >= 0)
-        executor.scheduleAtFixedRate(runnable, delay, period, unit)
-      else
-        executor.schedule(runnable, delay, unit)
     }
   }
 
@@ -141,9 +147,19 @@ class KafkaScheduler(val threads: Int,
       executor != null
     }
   }
-  
-  private def ensureRunning(): Unit = {
-    if (!isStarted)
-      throw new IllegalStateException("Kafka scheduler is not running.")
+}
+
+private class NoOpScheduledFutureTask() extends ScheduledFuture[Unit] {
+  override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
+  override def isCancelled: Boolean = true
+  override def isDone: Boolean = true
+  override def get(): Unit = {}
+  override def get(timeout: Long, unit: TimeUnit): Unit = {}
+  override def getDelay(unit: TimeUnit): Long = 0
+  override def compareTo(o: Delayed): Int = {
+    val diff = getDelay(NANOSECONDS) - o.getDelay(NANOSECONDS)
+    if (diff < 0) -1
+    else if (diff > 0) 1
+    else 0
   }
 }