You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/05/17 08:32:04 UTC

[kafka] branch trunk updated: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs (#12165)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5f039bae1c7 KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs (#12165)
5f039bae1c7 is described below

commit 5f039bae1c74ac465a1a4f47a4721df7f15a4366
Author: dengziming <de...@gmail.com>
AuthorDate: Tue May 17 16:31:28 2022 +0800

    KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs (#12165)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Luke Chen <sh...@gmail.com>
---
 .../unit/kafka/server/ServerShutdownTest.scala     | 34 +++++++++++++++-------
 1 file changed, 23 insertions(+), 11 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 0464a340744..08c00bcae6d 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -39,7 +39,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.BrokerState
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
+import org.junit.jupiter.api.{BeforeEach, Disabled, Test, TestInfo, Timeout}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -144,15 +144,15 @@ class ServerShutdownTest extends KafkaServerTestHarness {
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testCleanShutdownAfterFailedStartup(quorum: String): Unit = {
-    if (quorum == "zk") {
-      propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, "50")
-      propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectProp, "some.invalid.hostname.foo.bar.local:65535")
-      verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException](quorum)
-    } else {
+    if (isKRaftTest()) {
       propsToChangeUponRestart.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, "1000")
       shutdownBroker()
       shutdownKRaftController()
-      verifyCleanShutdownAfterFailedStartup[CancellationException](quorum)
+      verifyCleanShutdownAfterFailedStartup[CancellationException]
+    } else {
+      propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, "50")
+      propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectProp, "some.invalid.hostname.foo.bar.local:65535")
+      verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException]
     }
   }
 
@@ -165,7 +165,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
       val partitionDir = new File(dirName, s"$topic-0")
       partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1))
     }
-    verifyCleanShutdownAfterFailedStartup[KafkaStorageException](quorum)
+    verifyCleanShutdownAfterFailedStartup[KafkaStorageException]
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -177,6 +177,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
     verifyNonDaemonThreadsStatus()
   }
 
+  @Disabled
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("kraft"))
   def testCleanShutdownWithKRaftControllerUnavailable(quorum: String): Unit = {
@@ -186,7 +187,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
     verifyNonDaemonThreadsStatus()
   }
 
-  private def verifyCleanShutdownAfterFailedStartup[E <: Exception](quorum: String)(implicit exceptionClassTag: ClassTag[E]): Unit = {
+  private def verifyCleanShutdownAfterFailedStartup[E <: Exception](implicit exceptionClassTag: ClassTag[E]): Unit = {
     try {
       recreateBroker(startup = true)
       fail("Expected KafkaServer setup to fail and throw exception")
@@ -195,13 +196,24 @@ class ServerShutdownTest extends KafkaServerTestHarness {
       // identify the correct exception, making sure the server was shutdown, and cleaning up if anything
       // goes wrong so that awaitShutdown doesn't hang
       case e: Exception =>
-        assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e")
-        assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else BrokerState.SHUTTING_DOWN, brokers.head.brokerState)
+        assertCause(exceptionClassTag.runtimeClass, e)
+        assertEquals(if (isKRaftTest()) BrokerState.SHUTTING_DOWN else BrokerState.NOT_RUNNING, brokers.head.brokerState)
     } finally {
       shutdownBroker()
     }
   }
 
+  private def assertCause(expectedClass: Class[_], e: Throwable): Unit = {
+    var cause = e
+    while (cause != null) {
+      if (expectedClass.isInstance(cause)) {
+        return
+      }
+      cause = cause.getCause
+    }
+    fail(s"Failed to assert cause of $e, expected cause $expectedClass")
+  }
+
   private[this] def isNonDaemonKafkaThread(t: Thread): Boolean = {
     !t.isDaemon && t.isAlive && t.getName.startsWith(this.getClass.getName)
   }