You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/15 01:59:27 UTC

[GitHub] [kafka] kowshik opened a new pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

kowshik opened a new pull request #9596:
URL: https://github.com/apache/kafka/pull/9596


   The asynchronous shutdown in `LogManager` has the shortcoming that if during shutdown any of the internal futures fail, then we do not always ensure that all futures are completed before `LogManager.shutdown` returns. As a result, despite the `shut down completed` message from KafkaServer is seen in the error logs, some futures continue to run from inside LogManager attempting to close the logs. This is misleading and it could possibly break the general rule of avoiding post-shutdown activity in the Broker.
   
   In this PR, we fix he above behavior such that we prevent leakage of threads. If any of the futures throw an error, we  skip creating of checkpoint and clean shutdown file only for the affected log directory. We continue to wait for all futures to complete for all the directories.
   
   **Test plan:**
   
   Added a new unit test: `LogManager. testHandlingExceptionsDuringShutdown`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#discussion_r524853231



##########
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##########
@@ -83,6 +87,51 @@ class LogManagerTest {
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
   }
 
+  /**
+   * Tests that all internal futures are completed before LogManager.shutdown() returns to the
+   * caller during error situations.
+   */
+  @Test
+  def testHandlingExceptionsDuringShutdown(): Unit = {
+    logManager.shutdown()

Review comment:
       Thinking about it again, you are right. I have eliminated the need for the `shutdown()` now by using a `LogManager` instance specific to the test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik edited a comment on pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
kowshik edited a comment on pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#issuecomment-727523253


   cc @dhruvilshah3 @junrao @hachikuji for review


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#discussion_r525850037



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],
 
     try {
       for ((dir, dirJobs) <- jobs) {
-        dirJobs.foreach(_.get)
+        val hasErrors = dirJobs.exists {

Review comment:
       Thats a really good point. Done.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],
 
     try {
       for ((dir, dirJobs) <- jobs) {
-        dirJobs.foreach(_.get)
+        val hasErrors = dirJobs.exists {
+          future =>

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],
 
     try {
       for ((dir, dirJobs) <- jobs) {
-        dirJobs.foreach(_.get)
+        val hasErrors = dirJobs.exists {
+          future =>
+            try {

Review comment:
       Good idea, done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#discussion_r524345459



##########
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##########
@@ -83,6 +87,51 @@ class LogManagerTest {
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
   }
 
+  /**
+   * Tests that all internal futures are completed before LogManager.shutdown() returns to the
+   * caller during error situations.
+   */
+  @Test
+  def testHandlingExceptionsDuringShutdown(): Unit = {
+    logManager.shutdown()
+
+    // We create two directories logDir1 and logDir2 to help effectively test error handling
+    // during LogManager.shutdown().
+    val logDir1 = TestUtils.tempDir()
+    val logDir2 = TestUtils.tempDir()
+    logManager = createLogManager(Seq(logDir1, logDir2))
+    assertEquals(2, logManager.liveLogDirs.size)
+    logManager.startup()
+
+    val log1 = logManager.getOrCreateLog(new TopicPartition(name, 0), () => logConfig)
+    val log2 = logManager.getOrCreateLog(new TopicPartition(name, 1), () => logConfig)
+
+    val logFile1 = new File(logDir1, name + "-0")
+    assertTrue(logFile1.exists)
+    val logFile2 = new File(logDir2, name + "-1")
+    assertTrue(logFile2.exists)
+
+    log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
+    log1.takeProducerSnapshot()
+    log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
+
+    log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)
+    log2.takeProducerSnapshot()
+    log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)
+
+    // This should cause log1.close() to fail during LogManger shutdown sequence.
+    FileUtils.deleteDirectory(logFile1)

Review comment:
       Sorry I do not understand the question.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#discussion_r524673732



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -477,27 +477,41 @@ class LogManager(logDirs: Seq[File],
       jobs(dir) = jobsForDir.map(pool.submit).toSeq
     }
 
+    var firstExceptionOpt: Option[Throwable] = Option.empty
     try {
       for ((dir, dirJobs) <- jobs) {
-        dirJobs.foreach(_.get)
+        val errorsForDirJobs = dirJobs.map {
+          future =>
+            try {
+              future.get
+              Option.empty
+            } catch {
+              case e: ExecutionException =>
+                error(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}")
+                Some(e.getCause)
+            }
+        }.filter{ e => e.isDefined }.map{ e => e.get }
+
+        if (firstExceptionOpt.isEmpty) {
+          firstExceptionOpt = errorsForDirJobs.headOption
+        }
 
-        val logs = logsInDir(localLogsByDir, dir)
+        if (errorsForDirJobs.isEmpty) {
+          val logs = logsInDir(localLogsByDir, dir)
 
-        // update the last flush point
-        debug(s"Updating recovery points at $dir")
-        checkpointRecoveryOffsetsInDir(dir, logs)
+          // update the last flush point
+          debug(s"Updating recovery points at $dir")
+          checkpointRecoveryOffsetsInDir(dir, logs)
 
-        debug(s"Updating log start offsets at $dir")
-        checkpointLogStartOffsetsInDir(dir, logs)
+          debug(s"Updating log start offsets at $dir")
+          checkpointLogStartOffsetsInDir(dir, logs)
 
-        // mark that the shutdown was clean by creating marker file
-        debug(s"Writing clean shutdown marker at $dir")
-        CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
+          // mark that the shutdown was clean by creating marker file
+          debug(s"Writing clean shutdown marker at $dir")
+          CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
+        }
       }
-    } catch {
-      case e: ExecutionException =>
-        error(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}")
-        throw e.getCause
+      firstExceptionOpt.foreach{ e => throw e}

Review comment:
       Hmm, since we are about to shut down the JVM, should we just log a WARN here instead of throwing the exception?

##########
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##########
@@ -83,6 +87,51 @@ class LogManagerTest {
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
   }
 
+  /**
+   * Tests that all internal futures are completed before LogManager.shutdown() returns to the
+   * caller during error situations.
+   */
+  @Test
+  def testHandlingExceptionsDuringShutdown(): Unit = {
+    logManager.shutdown()

Review comment:
       Hmm, do we need this given that we do this in tearDown() already?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao merged pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
junrao merged pull request #9596:
URL: https://github.com/apache/kafka/pull/9596


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lqjack commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
lqjack commented on a change in pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#discussion_r524053292



##########
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##########
@@ -83,6 +87,51 @@ class LogManagerTest {
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
   }
 
+  /**
+   * Tests that all internal futures are completed before LogManager.shutdown() returns to the
+   * caller during error situations.
+   */
+  @Test
+  def testHandlingExceptionsDuringShutdown(): Unit = {
+    logManager.shutdown()
+
+    // We create two directories logDir1 and logDir2 to help effectively test error handling
+    // during LogManager.shutdown().
+    val logDir1 = TestUtils.tempDir()
+    val logDir2 = TestUtils.tempDir()
+    logManager = createLogManager(Seq(logDir1, logDir2))
+    assertEquals(2, logManager.liveLogDirs.size)
+    logManager.startup()
+
+    val log1 = logManager.getOrCreateLog(new TopicPartition(name, 0), () => logConfig)
+    val log2 = logManager.getOrCreateLog(new TopicPartition(name, 1), () => logConfig)
+
+    val logFile1 = new File(logDir1, name + "-0")
+    assertTrue(logFile1.exists)
+    val logFile2 = new File(logDir2, name + "-1")
+    assertTrue(logFile2.exists)
+
+    log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
+    log1.takeProducerSnapshot()
+    log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
+
+    log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)
+    log2.takeProducerSnapshot()
+    log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)
+
+    // This should cause log1.close() to fail during LogManger shutdown sequence.
+    FileUtils.deleteDirectory(logFile1)

Review comment:
       If the end user delete the log files Manually , the server cannot be stopped. and The cannot startup it again? so in this case ,how do they resolve it ?  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lqjack commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
lqjack commented on a change in pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#discussion_r524907470



##########
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##########
@@ -83,6 +87,51 @@ class LogManagerTest {
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
   }
 
+  /**
+   * Tests that all internal futures are completed before LogManager.shutdown() returns to the
+   * caller during error situations.
+   */
+  @Test
+  def testHandlingExceptionsDuringShutdown(): Unit = {
+    logManager.shutdown()
+
+    // We create two directories logDir1 and logDir2 to help effectively test error handling
+    // during LogManager.shutdown().
+    val logDir1 = TestUtils.tempDir()
+    val logDir2 = TestUtils.tempDir()
+    logManager = createLogManager(Seq(logDir1, logDir2))
+    assertEquals(2, logManager.liveLogDirs.size)
+    logManager.startup()
+
+    val log1 = logManager.getOrCreateLog(new TopicPartition(name, 0), () => logConfig)
+    val log2 = logManager.getOrCreateLog(new TopicPartition(name, 1), () => logConfig)
+
+    val logFile1 = new File(logDir1, name + "-0")
+    assertTrue(logFile1.exists)
+    val logFile2 = new File(logDir2, name + "-1")
+    assertTrue(logFile2.exists)
+
+    log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
+    log1.takeProducerSnapshot()
+    log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
+
+    log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)
+    log2.takeProducerSnapshot()
+    log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)
+
+    // This should cause log1.close() to fail during LogManger shutdown sequence.
+    FileUtils.deleteDirectory(logFile1)

Review comment:
       What if error occur during the shutdown of the broker ?  should we log the error info to the log or just throw the exception ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
kowshik commented on pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#issuecomment-729473539


   Thanks for the review @ijuma ! I have addressed the comments in 8716429b48cad8af6ad73109c1d9f7442823c02f .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#discussion_r525527982



##########
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##########
@@ -83,6 +87,51 @@ class LogManagerTest {
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
   }
 
+  /**
+   * Tests that all internal futures are completed before LogManager.shutdown() returns to the
+   * caller during error situations.
+   */
+  @Test
+  def testHandlingExceptionsDuringShutdown(): Unit = {
+    logManager.shutdown()
+
+    // We create two directories logDir1 and logDir2 to help effectively test error handling
+    // during LogManager.shutdown().
+    val logDir1 = TestUtils.tempDir()
+    val logDir2 = TestUtils.tempDir()
+    logManager = createLogManager(Seq(logDir1, logDir2))
+    assertEquals(2, logManager.liveLogDirs.size)
+    logManager.startup()
+
+    val log1 = logManager.getOrCreateLog(new TopicPartition(name, 0), () => logConfig)
+    val log2 = logManager.getOrCreateLog(new TopicPartition(name, 1), () => logConfig)
+
+    val logFile1 = new File(logDir1, name + "-0")
+    assertTrue(logFile1.exists)
+    val logFile2 = new File(logDir2, name + "-1")
+    assertTrue(logFile2.exists)
+
+    log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
+    log1.takeProducerSnapshot()
+    log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
+
+    log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)
+    log2.takeProducerSnapshot()
+    log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)
+
+    // This should cause log1.close() to fail during LogManger shutdown sequence.
+    FileUtils.deleteDirectory(logFile1)

Review comment:
       It depends on the kind of error, but we do log the error information to the log today from within `KafkaServer.shutdown()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#discussion_r525835495



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],
 
     try {
       for ((dir, dirJobs) <- jobs) {
-        dirJobs.foreach(_.get)
+        val hasErrors = dirJobs.exists {

Review comment:
       This looks wrong. `exists` short-circuits. I think you want `map` followed by `exists`.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],
 
     try {
       for ((dir, dirJobs) <- jobs) {
-        dirJobs.foreach(_.get)
+        val hasErrors = dirJobs.exists {
+          future =>

Review comment:
       Nit: this should be in the previous line.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],
 
     try {
       for ((dir, dirJobs) <- jobs) {
-        dirJobs.foreach(_.get)
+        val hasErrors = dirJobs.exists {
+          future =>
+            try {

Review comment:
       You can use `scala.util.Try` to wrap the call and get a `Success` or `Failure`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#discussion_r524848584



##########
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##########
@@ -83,6 +87,51 @@ class LogManagerTest {
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
   }
 
+  /**
+   * Tests that all internal futures are completed before LogManager.shutdown() returns to the
+   * caller during error situations.
+   */
+  @Test
+  def testHandlingExceptionsDuringShutdown(): Unit = {
+    logManager.shutdown()

Review comment:
       Yeah this explicit shutdown is needed to:
   1) Re-create a new `LogManager` instance with multiple `logDirs` for this test. This is different from the default one provided in `setUp()`.
   2) Help do some additional checks post shutdown (towards the end of this test).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#discussion_r524847888



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -477,27 +477,41 @@ class LogManager(logDirs: Seq[File],
       jobs(dir) = jobsForDir.map(pool.submit).toSeq
     }
 
+    var firstExceptionOpt: Option[Throwable] = Option.empty
     try {
       for ((dir, dirJobs) <- jobs) {
-        dirJobs.foreach(_.get)
+        val errorsForDirJobs = dirJobs.map {
+          future =>
+            try {
+              future.get
+              Option.empty
+            } catch {
+              case e: ExecutionException =>
+                error(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}")
+                Some(e.getCause)
+            }
+        }.filter{ e => e.isDefined }.map{ e => e.get }
+
+        if (firstExceptionOpt.isEmpty) {
+          firstExceptionOpt = errorsForDirJobs.headOption
+        }
 
-        val logs = logsInDir(localLogsByDir, dir)
+        if (errorsForDirJobs.isEmpty) {
+          val logs = logsInDir(localLogsByDir, dir)
 
-        // update the last flush point
-        debug(s"Updating recovery points at $dir")
-        checkpointRecoveryOffsetsInDir(dir, logs)
+          // update the last flush point
+          debug(s"Updating recovery points at $dir")
+          checkpointRecoveryOffsetsInDir(dir, logs)
 
-        debug(s"Updating log start offsets at $dir")
-        checkpointLogStartOffsetsInDir(dir, logs)
+          debug(s"Updating log start offsets at $dir")
+          checkpointLogStartOffsetsInDir(dir, logs)
 
-        // mark that the shutdown was clean by creating marker file
-        debug(s"Writing clean shutdown marker at $dir")
-        CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
+          // mark that the shutdown was clean by creating marker file
+          debug(s"Writing clean shutdown marker at $dir")
+          CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
+        }
       }
-    } catch {
-      case e: ExecutionException =>
-        error(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}")
-        throw e.getCause
+      firstExceptionOpt.foreach{ e => throw e}

Review comment:
       Great point. I've changed the code to do the same.
   My understanding is that the exception swallow safety net exists inside `KafkaServer.shutdown()` today, but it makes sense to also just log a warning here instead instead of relying on the safety net: https://github.com/apache/kafka/blob/bb34c5c8cc32d1b769a34329e34b83cda040aafc/core/src/main/scala/kafka/server/KafkaServer.scala#L732.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
kowshik commented on pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#issuecomment-728654950


   Thanks for the review @junrao! I have addressed the comments in f917f0c24cebbb0fb5eb7029ccb6676734b60b3e.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

Posted by GitBox <gi...@apache.org>.
kowshik commented on pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#issuecomment-727523253


   cc @dhruvilshah3 @junrao @hachikuji for review/discussion


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org