You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/01 00:33:58 UTC

[GitHub] [kafka] junrao commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly

junrao commented on code in PR #12136:
URL: https://github.com/apache/kafka/pull/12136#discussion_r886228739


##########
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##########
@@ -158,14 +158,29 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = {
+  def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = {
     createTopic(topic)
     shutdownBroker()
     config.logDirs.foreach { dirName =>
       val partitionDir = new File(dirName, s"$topic-0")
       partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1))
     }
-    verifyCleanShutdownAfterFailedStartup[KafkaStorageException]
+
+    val expectedStatusCode = Some(1)
+    var receivedStatusCode = Option.empty[Int]

Review Comment:
   Should receivedStatusCode be volatile since it's set in a different thread?



##########
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##########
@@ -158,14 +158,29 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = {
+  def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = {
     createTopic(topic)
     shutdownBroker()
     config.logDirs.foreach { dirName =>
       val partitionDir = new File(dirName, s"$topic-0")
       partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1))
     }
-    verifyCleanShutdownAfterFailedStartup[KafkaStorageException]
+
+    val expectedStatusCode = Some(1)
+    var receivedStatusCode = Option.empty[Int]
+    Exit.setHaltProcedure((statusCode, _) => {
+      receivedStatusCode = Some(statusCode)
+    }.asInstanceOf[Nothing])
+
+    try {
+      val recreateBrokerExec: Executable = () => recreateBroker(true)
+      // this startup should fail with no online log dir (due to corrupted log), and exit directly without throwing exception
+      assertDoesNotThrow(recreateBrokerExec)
+      // JVM should exit with status code 1
+      assertEquals(expectedStatusCode, receivedStatusCode)
+    } finally {
+      Exit.resetExitProcedure()

Review Comment:
   Should this be resetHaltProcedure?



##########
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##########
@@ -158,14 +158,29 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = {
+  def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = {
     createTopic(topic)
     shutdownBroker()
     config.logDirs.foreach { dirName =>
       val partitionDir = new File(dirName, s"$topic-0")
       partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1))
     }
-    verifyCleanShutdownAfterFailedStartup[KafkaStorageException]
+
+    val expectedStatusCode = Some(1)
+    var receivedStatusCode = Option.empty[Int]
+    Exit.setHaltProcedure((statusCode, _) => {
+      receivedStatusCode = Some(statusCode)
+    }.asInstanceOf[Nothing])
+
+    try {
+      val recreateBrokerExec: Executable = () => recreateBroker(true)
+      // this startup should fail with no online log dir (due to corrupted log), and exit directly without throwing exception
+      assertDoesNotThrow(recreateBrokerExec)
+      // JVM should exit with status code 1
+      assertEquals(expectedStatusCode, receivedStatusCode)

Review Comment:
   LogDirFailureHandler sets the exit flag but runs in a separate thread. Should we do this in a waitUntil() block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org