You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/05/17 08:32:04 UTC
[kafka] branch trunk updated: KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs (#12165)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5f039bae1c7 KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs (#12165)
5f039bae1c7 is described below
commit 5f039bae1c74ac465a1a4f47a4721df7f15a4366
Author: dengziming <de...@gmail.com>
AuthorDate: Tue May 17 16:31:28 2022 +0800
KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs (#12165)
Reviewers: Jason Gustafson <ja...@confluent.io>, Luke Chen <sh...@gmail.com>
---
.../unit/kafka/server/ServerShutdownTest.scala | 34 +++++++++++++++-------
1 file changed, 23 insertions(+), 11 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 0464a340744..08c00bcae6d 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -39,7 +39,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.BrokerState
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
+import org.junit.jupiter.api.{BeforeEach, Disabled, Test, TestInfo, Timeout}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -144,15 +144,15 @@ class ServerShutdownTest extends KafkaServerTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCleanShutdownAfterFailedStartup(quorum: String): Unit = {
- if (quorum == "zk") {
- propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, "50")
- propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectProp, "some.invalid.hostname.foo.bar.local:65535")
- verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException](quorum)
- } else {
+ if (isKRaftTest()) {
propsToChangeUponRestart.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, "1000")
shutdownBroker()
shutdownKRaftController()
- verifyCleanShutdownAfterFailedStartup[CancellationException](quorum)
+ verifyCleanShutdownAfterFailedStartup[CancellationException]
+ } else {
+ propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, "50")
+ propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectProp, "some.invalid.hostname.foo.bar.local:65535")
+ verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException]
}
}
@@ -165,7 +165,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
val partitionDir = new File(dirName, s"$topic-0")
partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1))
}
- verifyCleanShutdownAfterFailedStartup[KafkaStorageException](quorum)
+ verifyCleanShutdownAfterFailedStartup[KafkaStorageException]
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -177,6 +177,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
verifyNonDaemonThreadsStatus()
}
+ @Disabled
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("kraft"))
def testCleanShutdownWithKRaftControllerUnavailable(quorum: String): Unit = {
@@ -186,7 +187,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
verifyNonDaemonThreadsStatus()
}
- private def verifyCleanShutdownAfterFailedStartup[E <: Exception](quorum: String)(implicit exceptionClassTag: ClassTag[E]): Unit = {
+ private def verifyCleanShutdownAfterFailedStartup[E <: Exception](implicit exceptionClassTag: ClassTag[E]): Unit = {
try {
recreateBroker(startup = true)
fail("Expected KafkaServer setup to fail and throw exception")
@@ -195,13 +196,24 @@ class ServerShutdownTest extends KafkaServerTestHarness {
// identify the correct exception, making sure the server was shutdown, and cleaning up if anything
// goes wrong so that awaitShutdown doesn't hang
case e: Exception =>
- assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e")
- assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else BrokerState.SHUTTING_DOWN, brokers.head.brokerState)
+ assertCause(exceptionClassTag.runtimeClass, e)
+ assertEquals(if (isKRaftTest()) BrokerState.SHUTTING_DOWN else BrokerState.NOT_RUNNING, brokers.head.brokerState)
} finally {
shutdownBroker()
}
}
+ private def assertCause(expectedClass: Class[_], e: Throwable): Unit = {
+ var cause = e
+ while (cause != null) {
+ if (expectedClass.isInstance(cause)) {
+ return
+ }
+ cause = cause.getCause
+ }
+ fail(s"Failed to assert cause of $e, expected cause $expectedClass")
+ }
+
private[this] def isNonDaemonKafkaThread(t: Thread): Boolean = {
!t.isDaemon && t.isAlive && t.getName.startsWith(this.getClass.getName)
}