You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/10/30 15:10:55 UTC
[kafka] branch trunk updated: KAFKA-13340: Change
ZooKeeperTestHarness to QuorumTestHarness (#11417)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new af8100b KAFKA-13340: Change ZooKeeperTestHarness to QuorumTestHarness (#11417)
af8100b is described below
commit af8100b94fda4a27511797233e9845078ae8a69f
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Sat Oct 30 08:00:34 2021 -0700
KAFKA-13340: Change ZooKeeperTestHarness to QuorumTestHarness (#11417)
Change ZooKeeperTestHarness to QuorumTestHarness so that integration tests which inherit from
this class can test Kafka in both ZK and KRaft mode. Test cases which do this can specify the
modes they support by including a ParameterizedTest annotation before each test case, like the
following:
@ParameterizedTest
@valuesource(strings = Array("zk", "kraft"))
def testValidCreateTopicsRequests(quorum: String): Unit = { ... }
For each value that is specified here (zk, kraft), the test case will be run once in the
appropriate mode. So the test shown above is run twice. This allows integration tests to be
incrementally converted over to support KRaft mode, rather than rewritten to support it. For
now, test cases which do not specify a quorum argument will continue to run only in ZK mode.
JUnit5 makes the quorum annotation visible in the TestInfo object which each @BeforEeach
function in a test can optionally take. Therefore, this PR converts over the setUp function of
the quorum base class, plus every derived class, to take a TestInfo argument. The TestInfo
object gets "passed up the chain" to the base class, where it determines which quorum type we
create (ZK or KRaft). In a few cases, I discovered test cases inheriting from the test harness
that had more than one @BeforeEach function. Because the JUnit5 framework does not define the
order in which @BeforeEach hooks are run, I changed these to overload setUp() instead, to avoid
undefined behavior.
The general approach taken here is to make as much as possible work with KRaft, but to leave some
things as ZK-only when appropriate. For example, a test that explicitly requests an AdminZkClient
object will get an exception if it is running in KRaft mode. Similarly, tests which explicitly
request KafkaServer rather than KafkaBroker will get an exception when running in KRaft mode.
As a proof of concept, this PR converts over kafka.api.MetricsTest to support KRaft.
This PR also renames the quorum controller event handler thread to include the text
"QuorumControllerEventHandler". This allows QuorumTestHarness to check for hanging quorum
controller threads, as it does for hanging ZK-based controller threads.
Finally, ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
caused many failing test runs. Therefore, I disabled it here and filed KAFKA-13421 to fix the
test logic to be more reliable.
Reviewers: Jason Gustafson <ja...@confluent.io>, Igor Soarez <so...@apple.com>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 10 +-
core/src/main/scala/kafka/server/KafkaBroker.scala | 5 +
core/src/main/scala/kafka/server/KafkaServer.scala | 4 +-
.../test/junit/ZkClusterInvocationContext.java | 3 +-
.../kafka/admin/ListOffsetsIntegrationTest.scala | 6 +-
.../admin/ReassignPartitionsIntegrationTest.scala | 4 +-
.../kafka/api/AbstractConsumerTest.scala | 6 +-
.../AdminClientWithPoliciesIntegrationTest.scala | 6 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 9 +-
.../kafka/api/BaseAdminIntegrationTest.scala | 6 +-
.../kafka/api/BaseProducerSendTest.scala | 6 +-
.../integration/kafka/api/BaseQuotaTest.scala | 6 +-
.../integration/kafka/api/ClientIdQuotaTest.scala | 6 +-
.../integration/kafka/api/ConsumerBounceTest.scala | 1 +
.../kafka/api/ConsumerTopicCreationTest.scala | 6 +-
.../kafka/api/CustomQuotaCallbackTest.scala | 6 +-
.../DelegationTokenEndToEndAuthorizationTest.scala | 7 +-
.../api/DescribeAuthorizedOperationsTest.scala | 6 +-
.../kafka/api/EndToEndAuthorizationTest.scala | 12 +-
.../kafka/api/EndToEndClusterIdTest.scala | 6 +-
.../kafka/api/GroupAuthorizerIntegrationTest.scala | 7 +-
.../kafka/api/IntegrationTestHarness.scala | 45 ++-
.../integration/kafka/api/LogAppendTimeTest.scala | 6 +-
.../scala/integration/kafka/api/MetricsTest.scala | 6 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 7 +-
.../api/PlaintextEndToEndAuthorizationTest.scala | 6 +-
.../kafka/api/ProducerCompressionTest.scala | 10 +-
.../kafka/api/ProducerFailureHandlingTest.scala | 6 +-
.../SaslClientsWithInvalidCredentialsTest.scala | 6 +-
.../kafka/api/SaslEndToEndAuthorizationTest.scala | 6 +-
.../kafka/api/SaslMultiMechanismConsumerTest.scala | 6 +-
.../kafka/api/SaslPlainPlaintextConsumerTest.scala | 8 +-
.../kafka/api/SaslPlaintextConsumerTest.scala | 6 +-
.../SaslScramSslEndToEndAuthorizationTest.scala | 6 +-
.../kafka/api/SaslSslAdminIntegrationTest.scala | 9 +-
.../kafka/api/SaslSslConsumerTest.scala | 6 +-
.../kafka/api/SslEndToEndAuthorizationTest.scala | 6 +-
.../kafka/api/TransactionsExpirationTest.scala | 6 +-
.../integration/kafka/api/TransactionsTest.scala | 6 +-
.../api/TransactionsWithMaxInFlightOneTest.scala | 6 +-
.../kafka/api/UserClientIdQuotaTest.scala | 6 +-
.../integration/kafka/api/UserQuotaTest.scala | 6 +-
.../kafka/network/DynamicConnectionQuotaTest.scala | 6 +-
.../server/DynamicBrokerReconfigurationTest.scala | 11 +-
.../kafka/server/GssapiAuthenticationTest.scala | 6 +-
...ListenersWithSameSecurityProtocolBaseTest.scala | 10 +-
.../kafka/server/QuorumTestHarness.scala | 339 +++++++++++++++++++++
.../kafka/tools/MirrorMakerIntegrationTest.scala | 8 +-
.../scala/kafka/tools/GetOffsetShellTest.scala | 5 +-
.../src/test/scala/kafka/utils/TestInfoUtils.scala | 46 +++
.../other/kafka/ReplicationQuotasTestRig.scala | 10 +-
.../scala/unit/kafka/admin/AclCommandTest.scala | 10 +-
.../scala/unit/kafka/admin/AddPartitionsTest.scala | 6 +-
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 6 +-
.../kafka/admin/ConsumerGroupCommandTest.scala | 6 +-
.../kafka/admin/DelegationTokenCommandTest.scala | 6 +-
.../scala/unit/kafka/admin/DeleteTopicTest.scala | 6 +-
.../kafka/admin/TopicCommandIntegrationTest.scala | 4 +-
.../ZkNodeChangeNotificationListenerTest.scala | 11 +-
.../controller/ControllerIntegrationTest.scala | 10 +-
.../kafka/integration/KafkaServerTestHarness.scala | 113 +++++--
.../MetricsDuringTopicCreationDeletionTest.scala | 6 +-
.../integration/UncleanLeaderElectionTest.scala | 10 +-
.../scala/unit/kafka/metrics/MetricsTest.scala | 118 ++++---
.../kafka/security/auth/ZkAuthorizationTest.scala | 9 +-
.../security/authorizer/AclAuthorizerTest.scala | 12 +-
.../authorizer/AclAuthorizerWithZkSaslTest.scala | 12 +-
.../AuthorizerInterfaceDefaultTest.scala | 10 +-
.../delegation/DelegationTokenManagerTest.scala | 12 +-
.../AddPartitionsToTxnRequestServerTest.scala | 6 +-
.../unit/kafka/server/AdvertiseBrokerTest.scala | 4 +-
.../kafka/server/BrokerEpochIntegrationTest.scala | 10 +-
.../kafka/server/ControllerMutationQuotaTest.scala | 7 +-
.../DelegationTokenRequestsOnPlainTextTest.scala | 6 +-
.../kafka/server/DelegationTokenRequestsTest.scala | 6 +-
...nTokenRequestsWithDisableTokenFeatureTest.scala | 6 +-
.../kafka/server/DescribeClusterRequestTest.scala | 7 +-
.../unit/kafka/server/DynamicConfigTest.scala | 4 +-
.../FetchRequestDownConversionConfigTest.scala | 6 +-
.../kafka/server/FetchRequestMaxBytesTest.scala | 6 +-
.../FinalizedFeatureChangeListenerTest.scala | 5 +-
.../server/KafkaMetricReporterClusterIdTest.scala | 10 +-
.../KafkaMetricReporterExceptionHandlingTest.scala | 6 +-
.../kafka/server/KafkaMetricsReporterTest.scala | 10 +-
.../scala/unit/kafka/server/KafkaServerTest.scala | 4 +-
.../unit/kafka/server/LeaderElectionTest.scala | 10 +-
.../unit/kafka/server/LogDirFailureTest.scala | 6 +-
.../scala/unit/kafka/server/LogRecoveryTest.scala | 10 +-
.../unit/kafka/server/MetadataRequestTest.scala | 6 +-
.../server/MetadataRequestWithForwardingTest.scala | 6 +-
.../unit/kafka/server/OffsetFetchRequestTest.scala | 8 +-
.../scala/unit/kafka/server/ReplicaFetchTest.scala | 10 +-
.../unit/kafka/server/ReplicationQuotasTest.scala | 4 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 6 +-
.../kafka/server/ServerGenerateBrokerIdTest.scala | 10 +-
.../kafka/server/ServerGenerateClusterIdTest.scala | 10 +-
.../unit/kafka/server/ServerShutdownTest.scala | 10 +-
.../unit/kafka/server/ServerStartupTest.scala | 4 +-
.../TopicIdWithOldInterBrokerProtocolTest.scala | 6 +-
...chDrivenReplicationProtocolAcceptanceTest.scala | 10 +-
.../server/epoch/LeaderEpochIntegrationTest.scala | 4 +-
.../unit/kafka/utils/ReplicationUtilsTest.scala | 9 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 4 +-
.../scala/unit/kafka/zk/AdminZkClientTest.scala | 6 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 10 +-
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 101 ------
.../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 14 +-
.../apache/kafka/controller/QuorumController.java | 4 +-
.../TopicBasedRemoteLogMetadataManagerHarness.java | 3 +-
109 files changed, 946 insertions(+), 539 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index b3b8e83..ad549ff 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -139,7 +139,7 @@ class BrokerServer(
var clientQuotaMetadataManager: ClientQuotaMetadataManager = null
- private var _brokerTopicStats: BrokerTopicStats = null
+ @volatile var brokerTopicStats: BrokerTopicStats = null
val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
@@ -155,8 +155,6 @@ class BrokerServer(
def kafkaYammerMetrics: kafka.metrics.KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
- private[kafka] def brokerTopicStats = _brokerTopicStats
-
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
try {
@@ -178,7 +176,7 @@ class BrokerServer(
def replicaManager: ReplicaManager = _replicaManager
- def startup(): Unit = {
+ override def startup(): Unit = {
if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
try {
info("Starting broker")
@@ -190,7 +188,7 @@ class BrokerServer(
kafkaScheduler.startup()
/* register broker metrics */
- _brokerTopicStats = new BrokerTopicStats
+ brokerTopicStats = new BrokerTopicStats
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
@@ -543,7 +541,7 @@ class BrokerServer(
}
}
- def awaitShutdown(): Unit = {
+ override def awaitShutdown(): Unit = {
lock.lock()
try {
while (true) {
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala
index d1a2800..f4c6abc 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -22,6 +22,7 @@ import kafka.coordinator.group.GroupCoordinator
import kafka.log.LogManager
import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import kafka.network.SocketServer
+import kafka.security.CredentialProvider
import kafka.utils.KafkaScheduler
import org.apache.kafka.common.ClusterResource
import org.apache.kafka.common.internals.ClusterResourceListeners
@@ -82,7 +83,11 @@ trait KafkaBroker extends KafkaMetricsGroup {
def metadataCache: MetadataCache
def groupCoordinator: GroupCoordinator
def boundPort(listenerName: ListenerName): Int
+ def startup(): Unit
+ def awaitShutdown(): Unit
def shutdown(): Unit
+ def brokerTopicStats: BrokerTopicStats
+ def credentialProvider: CredentialProvider
// For backwards compatibility, we need to keep older metrics tied
// to their original name when this class was named `KafkaServer`
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 19ede1b..88ccb6d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -155,7 +155,7 @@ class KafkaServer(
}.toMap
private var _clusterId: String = null
- private var _brokerTopicStats: BrokerTopicStats = null
+ @volatile var _brokerTopicStats: BrokerTopicStats = null
private var _featureChangeListener: FinalizedFeatureChangeListener = null
@@ -169,7 +169,7 @@ class KafkaServer(
// Visible for testing
private[kafka] def zkClient = _zkClient
- private[kafka] def brokerTopicStats = _brokerTopicStats
+ override def brokerTopicStats = _brokerTopicStats
private[kafka] def featureChangeListener = _featureChangeListener
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index 4d4323f..4d9c6b7 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -23,6 +23,7 @@ import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
+import kafka.utils.EmptyTestInfo;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
@@ -252,7 +253,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public void start() {
if (started.compareAndSet(false, true)) {
- clusterReference.get().setUp();
+ clusterReference.get().setUp(new EmptyTestInfo());
}
}
diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
index c937030..6e20188 100644
--- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
@@ -36,8 +36,8 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
var adminClient: Admin = null
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
createTopic(topicName, 1, 1.toShort)
produceMessages()
adminClient = Admin.create(Map[String, Object](
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 55123f3..2969a95 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -24,7 +24,7 @@ import kafka.api.KAFKA_2_7_IV1
import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, ZkIsrManager}
import kafka.utils.Implicits._
import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, ConfigEntry, DescribeLogDirsResult, NewTopic}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.config.ConfigResource
@@ -39,7 +39,7 @@ import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
@Timeout(300)
-class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
+class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
var cluster: ReassignPartitionsTestCluster = null
diff --git a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
index a21f3ac..56bc47c 100644
--- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
import kafka.utils.{ShutdownableThread, TestUtils}
import kafka.server.{BaseRequestTest, KafkaConfig}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ArrayBuffer, Buffer}
@@ -74,8 +74,8 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
}
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
// create the test topic with all the brokers as replicas
createTopic(topic, 2, brokerCount)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 1ba56ea..61018bb 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.errors.{InvalidRequestException, PolicyViolationE
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.policy.AlterConfigPolicy
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@@ -43,8 +43,8 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
val brokerCount = 3
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
}
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e5f095d..fbb6f79 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -18,6 +18,7 @@ import java.util
import java.util.concurrent.ExecutionException
import java.util.regex.Pattern
import java.util.{Collections, Optional, Properties}
+
import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
import kafka.log.LogConfig
import kafka.security.authorizer.{AclAuthorizer, AclEntry}
@@ -60,11 +61,11 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, Uuid, requests}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
-
import java.util.Collections.singletonList
+
import scala.annotation.nowarn
import scala.collection.mutable
import scala.collection.mutable.Buffer
@@ -314,8 +315,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
)
@BeforeEach
- override def setUp(): Unit = {
- doSetup(createOffsetsTopic = false)
+ override def setUp(testInfo: TestInfo): Unit = {
+ doSetup(testInfo, createOffsetsTopic = false)
// Allow inter-broker communication
addAndVerifyAcls(Set(new AccessControlEntry(brokerPrincipal.toString, WildcardHost, CLUSTER_ACTION, ALLOW)), clusterResource)
diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index 7525cd2..9c8e32f 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.errors.{TopicExistsException, UnknownTopicOrParti
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.utils.Utils
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
import scala.jdk.CollectionConverters._
import scala.collection.Seq
@@ -51,8 +51,8 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
var client: Admin = _
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
waitUntilBrokerMetadataIsPropagated(servers)
}
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 1f7e9d1..8c2b6da 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.Buffer
@@ -56,8 +56,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
private val numRecords = 100
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), securityProtocol = SecurityProtocol.PLAINTEXT)
}
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 34ea07e..9f73236 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.collection.Map
import scala.jdk.CollectionConverters._
@@ -74,8 +74,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
var quotaTestClients: QuotaTestClients = _
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
val numPartitions = 1
val leaders = createTopic(topic1, numPartitions, brokerCount)
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index 28b0b15..e4cebe2 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -16,7 +16,7 @@ package kafka.api
import kafka.server.KafkaServer
import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
class ClientIdQuotaTest extends BaseQuotaTest {
@@ -24,8 +24,8 @@ class ClientIdQuotaTest extends BaseQuotaTest {
override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
quotaTestClients.alterClientQuotas(
quotaTestClients.clientQuotaAlteration(
quotaTestClients.clientQuotaEntity(None, Some(QuotaTestClients.DefaultEntity)),
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 90748e7..9fc8727f 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -299,6 +299,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
* Then, 1 consumer should be left out of the group.
*/
@Test
+ @Disabled // To be re-enabled once we fix KAFKA-13421
def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = {
val group = "group-max-size-test"
val topic = "group-max-size-test"
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
index 9335eac..e0853e4 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
@@ -24,7 +24,7 @@ import java.util.Collections
import kafka.api
import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{EmptyTestInfo, TestUtils}
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
@@ -41,7 +41,7 @@ class ConsumerTopicCreationTest {
@MethodSource(Array("parameters"))
def testAutoTopicCreation(brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean): Unit = {
val testCase = new ConsumerTopicCreationTest.TestCase(brokerAutoTopicCreationEnable, consumerAllowAutoCreateTopics)
- testCase.setUp()
+ testCase.setUp(new EmptyTestInfo())
try testCase.test() finally testCase.tearDown()
}
@@ -49,7 +49,7 @@ class ConsumerTopicCreationTest {
@MethodSource(Array("parameters"))
def testAutoTopicCreationWithForwarding(brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean): Unit = {
val testCase = new api.ConsumerTopicCreationTest.TestCaseWithForwarding(brokerAutoTopicCreationEnable, consumerAllowAutoCreateTopics)
- testCase.setUp()
+ testCase.setUp(new EmptyTestInfo())
try testCase.test() finally testCase.tearDown()
}
}
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index bfc967a..201fc4b 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth._
import org.apache.kafka.server.quota._
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
@@ -61,13 +61,13 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
val defaultConsumeQuota = 1000 * 1000 * 1000
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
this.serverConfig.setProperty(KafkaConfig.ClientQuotaCallbackClassProp, classOf[GroupedUserQuotaCallback].getName)
this.serverConfig.setProperty(s"${listenerName.configPrefix}${KafkaConfig.PrincipalBuilderClassProp}",
classOf[GroupedUserPrincipalBuilder].getName)
this.serverConfig.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
- super.setUp()
+ super.setUp(testInfo)
brokerList = TestUtils.bootstrapServers(servers, listenerName)
producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG,
diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
index a2db015..7336055 100644
--- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -17,6 +17,7 @@
package kafka.api
import java.util.Properties
+
import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.zk.ConfigEntityChangeNotificationZNode
@@ -26,7 +27,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
@@ -98,9 +99,9 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
}
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
- super.setUp()
+ super.setUp(testInfo)
privilegedAdminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
}
diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index 22fee47..0a46972 100644
--- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.Authorizer
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
@@ -107,9 +107,9 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
}
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
- super.setUp()
+ super.setUp(testInfo)
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
client = Admin.create(createConfig())
}
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index df60010..cbb536b 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
@@ -54,14 +54,14 @@ import scala.jdk.CollectionConverters._
* extends IntegrationTestHarness. IntegrationTestHarness creates producers and
* consumers, and it extends KafkaServerTestHarness. KafkaServerTestHarness starts
* brokers, but first it initializes a ZooKeeper server and client, which happens
- * in ZooKeeperTestHarness.
+ * in QuorumTestHarness.
*
* To start brokers we need to set a cluster ACL, which happens optionally in KafkaServerTestHarness.
* The remaining ACLs to enable access to producers and consumers are set here. To set ACLs, we use AclCommand directly.
*
* Finally, we rely on SaslSetup to bootstrap and setup Kerberos. We don't use
- * SaslTestHarness here directly because it extends ZooKeeperTestHarness, and we
- * would end up with ZooKeeperTestHarness twice.
+ * SaslTestHarness here directly because it extends QuorumTestHarness, and we
+ * would end up with QuorumTestHarness twice.
*/
abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
override val brokerCount = 3
@@ -198,8 +198,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* Starts MiniKDC and only then sets up the parent trait.
*/
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
servers.foreach { s =>
TestUtils.waitAndVerifyAcls(ClusterActionAndClusterAlterAcls, s.dataPlaneRequestProcessor.authorizer.get, clusterResource)
TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, new ResourcePattern(TOPIC, "*", LITERAL))
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index e41efce..7b05113 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, TopicPartition}
import org.apache.kafka.test.{TestUtils => _, _}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
import org.apache.kafka.test.TestUtils.isValidClusterId
@@ -106,8 +106,8 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
}
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
MockDeserializer.resetStaticVariables
// create the consumer offset topic
createTopic(topic, 2, serverCount)
diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 4f76735..cedfc17 100644
--- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -14,6 +14,7 @@ package kafka.api
import java.util.Properties
import java.util.concurrent.ExecutionException
+
import kafka.api.GroupAuthorizerIntegrationTest._
import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.WildcardHost
@@ -30,7 +31,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
@@ -75,8 +76,8 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
}
@BeforeEach
- override def setUp(): Unit = {
- doSetup(createOffsetsTopic = false)
+ override def setUp(testInfo: TestInfo): Unit = {
+ doSetup(testInfo, createOffsetsTopic = false)
// Allow inter-broker communication
TestUtils.addAndVerifyAcls(servers.head,
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 83c1cac..8989044 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -30,7 +30,7 @@ import kafka.integration.KafkaServerTestHarness
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import scala.collection.mutable
import scala.collection.Seq
@@ -58,10 +58,11 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
}
override def generateConfigs: Seq[KafkaConfig] = {
- val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+ val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
configureListeners(cfgs)
modifyConfigs(cfgs)
+ insertControllerListenersIfNeeded(cfgs)
cfgs.map(KafkaConfig.fromProps)
}
@@ -75,22 +76,45 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
val listenerSecurityMap = listenerNames.map(listenerName => s"${listenerName.value}:${securityProtocol.name}").mkString(",")
config.setProperty(KafkaConfig.ListenersProp, listeners)
+ config.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityMap)
}
}
+ private def insertControllerListenersIfNeeded(props: Seq[Properties]): Unit = {
+ if (isKRaftTest()) {
+ props.foreach { config =>
+ // Add the CONTROLLER listener to "listeners" if it is not already there.
+ // But do not add it to advertised.listeners.
+ val listeners = config.getProperty(KafkaConfig.ListenersProp, "").split(",")
+ val listenerNames = listeners.map(_.replaceFirst(":.*", ""))
+ if (!listenerNames.contains("CONTROLLER")) {
+ config.setProperty(KafkaConfig.ListenersProp,
+ (listeners ++ Seq("CONTROLLER://localhost:0")).mkString(","))
+ }
+ // Add a security protocol for the CONTROLLER endpoint, if one is not already set.
+ val securityPairs = config.getProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "").split(",")
+ if (!securityPairs.exists(_.startsWith("CONTROLLER:"))) {
+ config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
+ (securityPairs ++ Seq(s"CONTROLLER:${controllerListenerSecurityProtocol.toString}")).mkString(","))
+ }
+ }
+ }
+ }
+
@BeforeEach
- override def setUp(): Unit = {
- doSetup(createOffsetsTopic = true)
+ override def setUp(testInfo: TestInfo): Unit = {
+ doSetup(testInfo, createOffsetsTopic = true)
}
- def doSetup(createOffsetsTopic: Boolean): Unit = {
+ def doSetup(testInfo: TestInfo,
+ createOffsetsTopic: Boolean): Unit = {
// Generate client security properties before starting the brokers in case certs are needed
producerConfig ++= clientSecurityProps("producer")
consumerConfig ++= clientSecurityProps("consumer")
adminClientConfig ++= clientSecurityProps("adminClient")
- super.setUp()
+ super.setUp(testInfo)
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerConfig.putIfAbsent(ProducerConfig.ACKS_CONFIG, "-1")
@@ -105,8 +129,13 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
- if (createOffsetsTopic)
- TestUtils.createOffsetsTopic(zkClient, servers)
+ if (createOffsetsTopic) {
+ if (isKRaftTest()) {
+ TestUtils.createOffsetsTopicWithAdmin(brokers, adminClientConfig)
+ } else {
+ TestUtils.createOffsetsTopic(zkClient, servers)
+ }
+ }
}
def clientSecurityProps(certAlias: String): Properties = {
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
index d0007fb..6f397d8 100644
--- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
@@ -22,7 +22,7 @@ import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.record.TimestampType
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertTrue}
/**
@@ -41,8 +41,8 @@ class LogAppendTimeTest extends IntegrationTestHarness {
private val topic = "topic"
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
createTopic(topic)
}
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index c5c17c2..850ac89 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPart
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.authenticator.TestJaasConfig
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
import scala.annotation.nowarn
@@ -54,10 +54,10 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
verifyNoRequestMetrics("Request metrics not removed in a previous test")
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
- super.setUp()
+ super.setUp(testInfo)
}
@AfterEach
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index ef8174d0..94fc04c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.util.{Collections, Optional, Properties}
import java.{time, util}
+
import kafka.log.LogConfig
import kafka.security.authorizer.AclEntry
import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer}
@@ -43,7 +44,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
import org.slf4j.LoggerFactory
import scala.annotation.nowarn
@@ -70,8 +71,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
private val changedBrokerLoggers = scala.collection.mutable.Set[String]()
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
brokerLoggerConfigResource = new ConfigResource(
ConfigResource.Type.BROKER_LOGGER, servers.head.config.brokerId.toString)
}
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
index c44028c..8e69a2d 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
@@ -20,7 +20,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
import org.apache.kafka.common.errors.TopicAuthorizationException
@@ -71,9 +71,9 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "server")
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(List.empty, None, ZkSasl))
- super.setUp()
+ super.setUp(testInfo)
}
@Test
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 9ea0bb5..62b2689 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -19,19 +19,19 @@ package kafka.api.test
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import java.util.{Collections, Properties}
import scala.jdk.CollectionConverters._
-class ProducerCompressionTest extends ZooKeeperTestHarness {
+class ProducerCompressionTest extends QuorumTestHarness {
private val brokerId = 0
private val topic = "topic"
@@ -40,8 +40,8 @@ class ProducerCompressionTest extends ZooKeeperTestHarness {
private var server: KafkaServer = null
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
server = TestUtils.createServer(KafkaConfig.fromProps(props))
}
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 1adb26c..4d45da9 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private val producerBufferSize = 30000
@@ -60,8 +60,8 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private val topic2 = "topic-2"
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
producer1 = TestUtils.createProducer(brokerList, acks = 0, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 2a7d5c9..a9f2c6c 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.SaslAuthenticationException
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
import kafka.server.KafkaConfig
@@ -63,10 +63,10 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
}
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
JaasTestUtils.KafkaServerContextName))
- super.setUp()
+ super.setUp(testInfo)
createTopic(topic, numPartitions, brokerCount)
}
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index e425aba..e406450 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -19,7 +19,7 @@ package kafka.api
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.errors.{GroupAuthorizationException, TopicAuthorizationException}
-import org.junit.jupiter.api.{BeforeEach, Test, Timeout}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
import scala.collection.immutable.List
@@ -34,7 +34,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
protected def kafkaServerSaslMechanisms: List[String]
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
// create static config including client login context with credentials for JaasTestUtils 'client2'
startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
// set dynamic properties with credentials for JaasTestUtils 'client1' so that dynamic JAAS configuration is also
@@ -43,7 +43,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
adminClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
- super.setUp()
+ super.setUp(testInfo)
}
/**
diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index eb2eab4..37b36e1 100644
--- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -15,7 +15,7 @@ package kafka.api
import java.io.File
import kafka.server.KafkaConfig
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -31,10 +31,10 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup {
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
JaasTestUtils.KafkaServerContextName))
- super.setUp()
+ super.setUp(testInfo)
}
@AfterEach
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index d3ca141..042aa3d 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -19,7 +19,7 @@ import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
override protected def listenerName = new ListenerName("CLIENT")
@@ -28,7 +28,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
private val kafkaServerJaasEntryName =
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
- // disable secure acls of zkClient in ZooKeeperTestHarness
+ // disable secure acls of zkClient in QuorumTestHarness
override protected def zkAclsEnabled = Some(false)
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
@@ -36,9 +36,9 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, kafkaServerJaasEntryName))
- super.setUp()
+ super.setUp(testInfo)
}
@AfterEach
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
index b7eb39b..0933818 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
@@ -14,15 +14,15 @@ package kafka.api
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
class SaslPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
- super.setUp()
+ super.setUp(testInfo)
}
@AfterEach
diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index 022b2f4..6e334d1 100644
--- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.test.TestSslUtils
import scala.jdk.CollectionConverters._
-import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256"
@@ -52,8 +52,8 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
override def createPrivilegedAdminClient() = createScramAdminClient(kafkaClientSaslMechanism, kafkaPrincipal.getName, kafkaPassword)
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
// Create client credentials after starting brokers so that dynamic credential creation is also tested
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 53a267f..3cb424d 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -14,6 +14,7 @@ package kafka.api
import java.io.File
import java.util
+
import kafka.log.LogConfig
import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
@@ -33,9 +34,9 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.authorizer.Authorizer
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.util.Collections
+
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
@@ -62,9 +63,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
setUpSasl()
- super.setUp()
+ super.setUp(testInfo)
}
def setUpSasl(): Unit = {
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
index 2b1db11..563481d 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
@@ -17,7 +17,7 @@ import java.io.File
import kafka.server.KafkaConfig
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
@@ -25,9 +25,9 @@ class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
- super.setUp()
+ super.setUp(testInfo)
}
@AfterEach
diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
index 65bef41..850d99f 100644
--- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.network.Mode
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.Java
-import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
object SslEndToEndAuthorizationTest {
class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
@@ -70,9 +70,9 @@ class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, s"O=A client,CN=$clientCn")
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "server")
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(List.empty, None, ZkSasl))
- super.setUp()
+ super.setUp(testInfo)
}
override def clientSecurityProps(certAlias: String): Properties = {
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
index ecdf435..1a656ef 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
@@ -26,7 +26,7 @@ import kafka.utils.TestUtils.consumeRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.errors.InvalidPidMappingException
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
import scala.collection.Seq
@@ -46,8 +46,8 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
}
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
producer = TestUtils.createTransactionalProducer("transactionalProducer", servers)
consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 5afad85..1fbba9e 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@@ -59,8 +59,8 @@ class TransactionsTest extends KafkaServerTestHarness {
}
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
val topicConfig = new Properties()
topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
createTopic(topic1, numPartitions, numServers, topicConfig)
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
index 20094b6..267792f 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
@@ -26,7 +26,7 @@ import kafka.utils.TestUtils.consumeRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.collection.Seq
import scala.collection.mutable.Buffer
@@ -51,8 +51,8 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
}
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
val topicConfig = new Properties()
topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 1.toString)
createTopic(topic1, numPartitions, numServers, topicConfig)
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index e442dda..83c70da 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -19,7 +19,7 @@ import java.io.File
import kafka.server._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Sanitizer
-import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
class UserClientIdQuotaTest extends BaseQuotaTest {
@@ -30,9 +30,9 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
this.serverConfig.setProperty(KafkaConfig.SslClientAuthProp, "required")
- super.setUp()
+ super.setUp(testInfo)
quotaTestClients.alterClientQuotas(
quotaTestClients.clientQuotaAlteration(
quotaTestClients.clientQuotaEntity(Some(QuotaTestClients.DefaultEntity), Some(QuotaTestClients.DefaultEntity)),
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index 7a00f40..ffbaebb 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -19,7 +19,7 @@ import java.io.File
import kafka.server.KafkaServer
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
class UserQuotaTest extends BaseQuotaTest with SaslSetup {
@@ -31,9 +31,9 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
- super.setUp()
+ super.setUp(testInfo)
quotaTestClients.alterClientQuotas(
quotaTestClients.clientQuotaAlteration(
quotaTestClients.clientQuotaEntity(Some(QuotaTestClients.DefaultEntity), None),
diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
index 98bc641..14f3e8f 100644
--- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{KafkaException, requests}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
@@ -56,8 +56,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
}
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
TestUtils.createTopic(zkClient, topic, brokerCount, brokerCount, servers)
}
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 84e8099..0c0950a 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -35,9 +35,10 @@ import kafka.log.LogConfig
import kafka.message.ProducerCompressionCodec
import kafka.metrics.KafkaYammerMetrics
import kafka.network.{Processor, RequestChannel}
+import kafka.server.QuorumTestHarness
import kafka.utils._
import kafka.utils.Implicits._
-import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
+import kafka.zk.{ConfigEntityChangeNotificationZNode}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
@@ -61,7 +62,7 @@ import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
import scala.annotation.nowarn
import scala.collection._
@@ -74,7 +75,7 @@ object DynamicBrokerReconfigurationTest {
val SecureExternal = "EXTERNAL"
}
-class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSetup {
+class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup {
import DynamicBrokerReconfigurationTest._
@@ -101,9 +102,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)))
- super.setUp()
+ super.setUp(testInfo)
clearLeftOverProcessorMetrics() // clear metrics left over from other tests so that new ones can be tested
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 5c75507..2bda3ac 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.security.auth.{Login, SecurityProtocol}
import org.apache.kafka.common.security.kerberos.KerberosLogin
import org.apache.kafka.common.utils.{LogContext, MockTime}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
@@ -60,12 +60,12 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
private val failedAuthenticationDelayMs = 2000
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
TestableKerberosLogin.reset()
startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
serverConfig.put(KafkaConfig.FailedAuthenticationDelayMsProp, failedAuthenticationDelayMs.toString)
- super.setUp()
+ super.setUp(testInfo)
serverAddr = new InetSocketAddress("localhost",
servers.head.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)))
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 9709148..1d865f9 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -27,14 +27,14 @@ import kafka.coordinator.group.OffsetConfig
import kafka.utils.JaasTestUtils.JaasSection
import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.utils.Implicits._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -49,7 +49,7 @@ object MultipleListenersWithSameSecurityProtocolBaseTest {
val Plain = "PLAIN"
}
-abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeeperTestHarness with SaslSetup {
+abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumTestHarness with SaslSetup {
import MultipleListenersWithSameSecurityProtocolBaseTest._
@@ -67,9 +67,9 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
protected def dynamicJaasSections: Properties
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(staticJaasSections)
- super.setUp()
+ super.setUp(testInfo)
// 2 brokers so that we can test that the data propagates correctly via UpdateMetadadaRequest
val numServers = 2
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
new file mode 100755
index 0000000..69be4f5
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.{ByteArrayOutputStream, File, PrintStream}
+import java.net.InetSocketAddress
+import java.util
+import java.util.{Collections, Properties}
+import java.util.concurrent.CompletableFuture
+
+import javax.security.auth.login.Configuration
+import kafka.raft.KafkaRaftManager
+import kafka.tools.StorageTool
+import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
+import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.zookeeper.client.ZKClientConfig
+import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, TestInfo}
+
+import scala.collection.{Seq, immutable}
+
+trait QuorumImplementation {
+ def createAndStartBroker(config: KafkaConfig,
+ time: Time): KafkaBroker
+
+ def shutdown(): Unit
+}
+
+class ZooKeeperQuorumImplementation(val zookeeper: EmbeddedZookeeper,
+ val zkClient: KafkaZkClient,
+ val adminZkClient: AdminZkClient,
+ val log: Logging) extends QuorumImplementation {
+ override def createAndStartBroker(config: KafkaConfig,
+ time: Time): KafkaBroker = {
+ val server = new KafkaServer(config, time, None, false)
+ server.startup()
+ server
+ }
+
+ override def shutdown(): Unit = {
+ CoreUtils.swallow(zkClient.close(), log)
+ CoreUtils.swallow(zookeeper.shutdown(), log)
+ }
+}
+
+class KRaftQuorumImplementation(val raftManager: KafkaRaftManager[ApiMessageAndVersion],
+ val controllerServer: ControllerServer,
+ val metadataDir: File,
+ val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
+ val clusterId: String,
+ val log: Logging) extends QuorumImplementation {
+ override def createAndStartBroker(config: KafkaConfig,
+ time: Time): KafkaBroker = {
+ val broker = new BrokerServer(config = config,
+ metaProps = new MetaProperties(clusterId, config.nodeId),
+ raftManager = raftManager,
+ time = time,
+ metrics = new Metrics(),
+ threadNamePrefix = Some("Broker%02d_".format(config.nodeId)),
+ initialOfflineDirs = Seq(),
+ controllerQuorumVotersFuture = controllerQuorumVotersFuture,
+ supportedFeatures = Collections.emptyMap())
+ broker.startup()
+ broker
+ }
+
+ override def shutdown(): Unit = {
+ CoreUtils.swallow(raftManager.shutdown(), log)
+ CoreUtils.swallow(controllerServer.shutdown(), log)
+ }
+}
+
+@Tag("integration")
+abstract class QuorumTestHarness extends Logging {
+ val zkConnectionTimeout = 10000
+ val zkSessionTimeout = 15000 // Allows us to avoid ZK session expiration due to GC up to 2/3 * 15000ms = 10 secs
+ val zkMaxInFlightRequests = Int.MaxValue
+
+ protected def zkAclsEnabled: Option[Boolean] = None
+
+ /**
+ * When in KRaft mode, the security protocol to use for the controller listener.
+ * Can be overridden by subclasses.
+ */
+ protected def controllerListenerSecurityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
+
+ protected def kraftControllerConfigs(): Seq[Properties] = {
+ Seq(new Properties())
+ }
+
+ private var implementation: QuorumImplementation = null
+
+ def isKRaftTest(): Boolean = implementation.isInstanceOf[KRaftQuorumImplementation]
+
+ def checkIsZKTest(): Unit = {
+ if (isKRaftTest()) {
+ throw new RuntimeException("This function can't be accessed when running the test " +
+ "in KRaft mode. ZooKeeper mode is required.")
+ }
+ }
+
+ def checkIsKRaftTest(): Unit = {
+ if (!isKRaftTest()) {
+ throw new RuntimeException("This function can't be accessed when running the test " +
+ "in ZooKeeper mode. KRaft mode is required.")
+ }
+ }
+
+ private def asZk(): ZooKeeperQuorumImplementation = {
+ checkIsZKTest()
+ implementation.asInstanceOf[ZooKeeperQuorumImplementation]
+ }
+
+ private def asKRaft(): KRaftQuorumImplementation = {
+ checkIsKRaftTest()
+ implementation.asInstanceOf[KRaftQuorumImplementation]
+ }
+
+ def zookeeper: EmbeddedZookeeper = asZk().zookeeper
+
+ def zkClient: KafkaZkClient = asZk().zkClient
+
+ def zkClientOrNull: KafkaZkClient = if (isKRaftTest()) null else asZk().zkClient
+
+ def adminZkClient: AdminZkClient = asZk().adminZkClient
+
+ def zkPort: Int = asZk().zookeeper.port
+
+ def zkConnect: String = s"127.0.0.1:$zkPort"
+
+ def zkConnectOrNull: String = if (isKRaftTest()) null else zkConnect
+
+ def controllerServer: ControllerServer = asKRaft().controllerServer
+
+ // Note: according to the junit documentation: "JUnit Jupiter does not guarantee the execution
+ // order of multiple @BeforeEach methods that are declared within a single test class or test
+ // interface." Therefore, if you have things you would like to do before each test case runs, it
+ // is best to override this function rather than declaring a new @BeforeEach function.
+ // That way you control the initialization order.
+ @BeforeEach
+ def setUp(testInfo: TestInfo): Unit = {
+ val name = if (testInfo.getTestMethod().isPresent()) {
+ testInfo.getTestMethod().get().toString()
+ } else {
+ "[unspecified]"
+ }
+ if (TestInfoUtils.isKRaft(testInfo)) {
+ info(s"Running KRAFT test ${name}")
+ implementation = newKRaftQuorum(testInfo)
+ } else {
+ info(s"Running ZK test ${name}")
+ implementation = newZooKeeperQuorum()
+ }
+ }
+
+ def createAndStartBroker(config: KafkaConfig,
+ time: Time = Time.SYSTEM): KafkaBroker = {
+ implementation.createAndStartBroker(config,
+ time)
+ }
+
+ def shutdownZooKeeper(): Unit = asZk().shutdown()
+
+ private def formatDirectories(directories: immutable.Seq[String],
+ metaProperties: MetaProperties): Unit = {
+ val stream = new ByteArrayOutputStream()
+ var out: PrintStream = null
+ try {
+ out = new PrintStream(stream)
+ if (StorageTool.formatCommand(out, directories, metaProperties, false) != 0) {
+ throw new RuntimeException(stream.toString())
+ }
+ debug(s"Formatted storage directory(ies) ${directories}")
+ } finally {
+ if (out != null) out.close()
+ stream.close()
+ }
+ }
+
+ private def newKRaftQuorum(testInfo: TestInfo): KRaftQuorumImplementation = {
+ val clusterId = Uuid.randomUuid().toString
+ val metadataDir = TestUtils.tempDir()
+ val metaProperties = new MetaProperties(clusterId, 0)
+ formatDirectories(immutable.Seq(metadataDir.getAbsolutePath()), metaProperties)
+ val controllerMetrics = new Metrics()
+ val propsList = kraftControllerConfigs()
+ if (propsList.size != 1) {
+ throw new RuntimeException("Only one KRaft controller is supported for now.")
+ }
+ val props = propsList(0)
+ props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+ props.setProperty(KafkaConfig.NodeIdProp, "1000")
+ props.setProperty(KafkaConfig.MetadataLogDirProp, metadataDir.getAbsolutePath())
+ val proto = controllerListenerSecurityProtocol.toString()
+ props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${proto}:${proto}")
+ props.setProperty(KafkaConfig.ListenersProp, s"${proto}://localhost:0")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, proto)
+ props.setProperty(KafkaConfig.QuorumVotersProp, "1000@localhost:0")
+ val config = new KafkaConfig(props)
+ val threadNamePrefix = "Controller_" + testInfo.getDisplayName
+ val controllerQuorumVotersFuture = new CompletableFuture[util.Map[Integer, AddressSpec]]
+ val raftManager = new KafkaRaftManager(
+ metaProperties = metaProperties,
+ config = config,
+ recordSerde = MetadataRecordSerde.INSTANCE,
+ topicPartition = new TopicPartition(KafkaRaftServer.MetadataTopic, 0),
+ topicId = KafkaRaftServer.MetadataTopicId,
+ time = Time.SYSTEM,
+ metrics = controllerMetrics,
+ threadNamePrefixOpt = Option(threadNamePrefix),
+ controllerQuorumVotersFuture = controllerQuorumVotersFuture)
+ var controllerServer: ControllerServer = null
+ try {
+ controllerServer = new ControllerServer(
+ metaProperties = metaProperties,
+ config = config,
+ raftManager = raftManager,
+ time = Time.SYSTEM,
+ metrics = controllerMetrics,
+ threadNamePrefix = Option(threadNamePrefix),
+ controllerQuorumVotersFuture = controllerQuorumVotersFuture)
+ controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
+ if (e != null) {
+ error("Error completing controller socket server future", e)
+ controllerQuorumVotersFuture.completeExceptionally(e)
+ } else {
+ controllerQuorumVotersFuture.complete(Collections.singletonMap(1000,
+ new InetAddressSpec(new InetSocketAddress("localhost", port))))
+ }
+ })
+ controllerServer.startup()
+ raftManager.startup()
+ controllerServer.startup()
+ } catch {
+ case e: Throwable =>
+ CoreUtils.swallow(raftManager.shutdown(), this)
+ if (controllerServer != null) CoreUtils.swallow(controllerServer.shutdown(), this)
+ throw e
+ }
+ new KRaftQuorumImplementation(raftManager,
+ controllerServer,
+ metadataDir,
+ controllerQuorumVotersFuture,
+ clusterId,
+ this)
+ }
+
+ private def newZooKeeperQuorum(): ZooKeeperQuorumImplementation = {
+ val zookeeper = new EmbeddedZookeeper()
+ var zkClient: KafkaZkClient = null
+ var adminZkClient: AdminZkClient = null
+ try {
+ zkClient = KafkaZkClient(s"127.0.0.1:${zookeeper.port}",
+ zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled),
+ zkSessionTimeout,
+ zkConnectionTimeout,
+ zkMaxInFlightRequests,
+ Time.SYSTEM,
+ name = "ZooKeeperTestHarness",
+ new ZKClientConfig)
+ adminZkClient = new AdminZkClient(zkClient)
+ } catch {
+ case t: Throwable =>
+ CoreUtils.swallow(zookeeper.shutdown(), this)
+ if (zkClient != null) CoreUtils.swallow(zkClient.close(), this)
+ throw t
+ }
+ new ZooKeeperQuorumImplementation(zookeeper,
+ zkClient,
+ adminZkClient,
+ this)
+ }
+
+ @AfterEach
+ def tearDown(): Unit = {
+ if (implementation != null) {
+ implementation.shutdown()
+ }
+ Configuration.setConfiguration(null)
+ }
+
+ // Trigger session expiry by reusing the session id in another client
+ def createZooKeeperClientToTriggerSessionExpiry(zooKeeper: ZooKeeper): ZooKeeper = {
+ val dummyWatcher = new Watcher {
+ override def process(event: WatchedEvent): Unit = {}
+ }
+ val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
+ zooKeeper.getSessionId,
+ zooKeeper.getSessionPasswd)
+ assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works
+ anotherZkClient
+ }
+}
+
+object QuorumTestHarness {
+ val ZkClientEventThreadSuffix = "-EventThread"
+
+ /**
+ * Verify that a previous test that doesn't use QuorumTestHarness hasn't left behind an unexpected thread.
+ * This assumes that brokers, ZooKeeper clients, producers and consumers are not created in another @BeforeClass,
+ * which is true for core tests where this harness is used.
+ */
+ @BeforeAll
+ def setUpClass(): Unit = {
+ TestUtils.verifyNoUnexpectedThreads("@BeforeAll")
+ }
+
+ /**
+ * Verify that tests from the current test class using QuorumTestHarness haven't left behind an unexpected thread
+ */
+ @AfterAll
+ def tearDownClass(): Unit = {
+ TestUtils.verifyNoUnexpectedThreads("@AfterAll")
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
index ef75787..abcbebc 100644
--- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -30,10 +30,8 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.apache.kafka.common.utils.Exit
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
@deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", since = "3.0")
class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
@@ -44,9 +42,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
val exited = new AtomicBoolean(false)
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
Exit.setExitProcedure((_, _) => exited.set(true))
- super.setUp()
+ super.setUp(testInfo)
}
@AfterEach
diff --git a/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala b/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
index cd611de..796ed36 100644
--- a/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
+++ b/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
private val topicCount = 4
@@ -38,7 +38,8 @@ class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
}.map(KafkaConfig.fromProps)
@BeforeEach
- def createTestAndInternalTopics(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i))
val props = new Properties()
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
new file mode 100644
index 0000000..ecd656e
--- /dev/null
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.utils
+
+import java.lang.reflect.Method
+import java.util
+import java.util.{Collections, Optional}
+
+import org.junit.jupiter.api.TestInfo
+
+class EmptyTestInfo extends TestInfo {
+ override def getDisplayName: String = ""
+ override def getTags: util.Set[String] = Collections.emptySet()
+ override def getTestClass: (Optional[Class[_]]) = Optional.empty()
+ override def getTestMethod: Optional[Method] = Optional.empty()
+}
+
+object TestInfoUtils {
+ def isKRaft(testInfo: TestInfo): Boolean = {
+ if (testInfo.getDisplayName().contains("quorum=")) {
+ if (testInfo.getDisplayName().contains("quorum=kraft")) {
+ true
+ } else if (testInfo.getDisplayName().contains("quorum=zk")) {
+ false
+ } else {
+ throw new RuntimeException(s"Unknown quorum value")
+ }
+ } else {
+ false
+ }
+ }
+}
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 3f72afd..42230e7 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -23,10 +23,10 @@ import java.nio.file.{Files, StandardOpenOption}
import javax.imageio.ImageIO
import kafka.admin.ReassignPartitionsCommand
-import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
+import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness, QuotaType}
import kafka.utils.TestUtils._
-import kafka.utils.{Exit, Logging, TestUtils}
-import kafka.zk.{ReassignPartitionsZNode, ZooKeeperTestHarness}
+import kafka.utils.{EmptyTestInfo, Exit, Logging, TestUtils}
+import kafka.zk.ReassignPartitionsZNode
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
@@ -80,7 +80,7 @@ object ReplicationQuotasTestRig {
def run(config: ExperimentDef, journal: Journal, displayChartsOnScreen: Boolean): Unit = {
val experiment = new Experiment()
try {
- experiment.setUp()
+ experiment.setUp(new EmptyTestInfo())
experiment.run(config, journal, displayChartsOnScreen)
journal.footer()
}
@@ -96,7 +96,7 @@ object ReplicationQuotasTestRig {
val targetBytesPerBrokerMB: Long = msgsPerPartition.toLong * msgSize.toLong * partitions.toLong / brokers.toLong / 1000000
}
- class Experiment extends ZooKeeperTestHarness with Logging {
+ class Experiment extends QuorumTestHarness with Logging {
val topicName = "my-topic"
var experimentName = "unset"
val partitionId = 0
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index aec78cb..7cd5a18 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -23,7 +23,7 @@ import kafka.admin.AclCommand.AclCommandOptions
import kafka.security.authorizer.{AclAuthorizer, AclEntry}
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{Exit, LogCaptureAppender, Logging, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation, AclPermissionType}
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType._
@@ -36,9 +36,9 @@ import org.apache.kafka.common.utils.{AppInfoParser, SecurityUtils}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.log4j.Level
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
-class AclCommandTest extends ZooKeeperTestHarness with Logging {
+class AclCommandTest extends QuorumTestHarness with Logging {
var servers: Seq[KafkaServer] = Seq()
@@ -102,8 +102,8 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
private var adminArgs: Array[String] = _
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
brokerProps.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 94c5a77..cd0bb1b 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
@@ -49,8 +49,8 @@ class AddPartitionsTest extends BaseRequestTest {
val topic5Assignment = Map(1 -> ReplicaAssignment(Seq(0,1), List(), List()))
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
createTopic(topic1, partitionReplicaAssignment = topic1Assignment.map { case (k, v) => k -> v.replicas })
createTopic(topic2, partitionReplicaAssignment = topic2Assignment.map { case (k, v) => k -> v.replicas })
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 299d98b..859b3d5 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -21,9 +21,9 @@ import java.util.Properties
import kafka.admin.ConfigCommand.ConfigCommandOptions
import kafka.api.ApiVersion
import kafka.cluster.{Broker, EndPoint}
-import kafka.server.{ConfigEntityName, ConfigType, KafkaConfig}
+import kafka.server.{ConfigEntityName, ConfigType, KafkaConfig, QuorumTestHarness}
import kafka.utils.{Exit, Logging}
-import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient, ZooKeeperTestHarness}
+import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.Node
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
@@ -42,7 +42,7 @@ import org.junit.jupiter.api.Test
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
-class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
+class ConfigCommandTest extends QuorumTestHarness with Logging {
@Test
def shouldExitWithNonZeroStatusOnArgError(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index 458a3ee..6415c16 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.serialization.StringDeserializer
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
@@ -52,8 +52,8 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
}
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
createTopic(topic, 1, 1)
}
diff --git a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
index 05fc339..2071d08 100644
--- a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
@@ -25,7 +25,7 @@ import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionException
@@ -41,9 +41,9 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
override def brokerCount = 1
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
- super.setUp()
+ super.setUp(testInfo)
}
override def generateConfigs = {
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 4a53c67..bb881f2 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -22,9 +22,9 @@ import java.util.{Collections, Optional, Properties}
import scala.collection.Seq
import kafka.log.UnifiedLog
-import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness}
+import kafka.zk.TopicPartitionZNode
import kafka.utils.TestUtils
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import kafka.common.TopicAlreadyMarkedForDeletionException
@@ -34,7 +34,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import scala.jdk.CollectionConverters._
-class DeleteTopicTest extends ZooKeeperTestHarness {
+class DeleteTopicTest extends QuorumTestHarness {
var servers: Seq[KafkaServer] = Seq()
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
index 19dbd6a..dfb4ae7 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
@@ -80,7 +80,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
}
@BeforeEach
- def setup(info: TestInfo): Unit = {
+ override def setUp(info: TestInfo): Unit = {
+ super.setUp(info)
+
// create adminClient
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index 4aa7167..9b7f090 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -17,24 +17,25 @@
package kafka.common
import kafka.utils.TestUtils
-import kafka.zk.{LiteralAclChangeStore, LiteralAclStore, ZkAclChangeStore, ZooKeeperTestHarness}
+import kafka.zk.{LiteralAclChangeStore, LiteralAclStore, ZkAclChangeStore}
+import kafka.server.QuorumTestHarness
import org.apache.kafka.common.resource.PatternType.LITERAL
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.resource.ResourceType.GROUP
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.collection.mutable.ArrayBuffer
import scala.collection.Seq
-class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
+class ZkNodeChangeNotificationListenerTest extends QuorumTestHarness {
private val changeExpirationMs = 1000
private var notificationListener: ZkNodeChangeNotificationListener = _
private var notificationHandler: TestNotificationHandler = _
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
zkClient.createAclPaths()
notificationHandler = new TestNotificationHandler()
}
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index bdc06a7..2302007 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -23,7 +23,7 @@ import com.yammer.metrics.core.Timer
import kafka.api.{ApiVersion, KAFKA_2_6_IV0, KAFKA_2_7_IV0, LeaderAndIsr}
import kafka.controller.KafkaController.AlterIsrCallback
import kafka.metrics.KafkaYammerMetrics
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
import kafka.utils.{LogCaptureAppender, TestUtils}
import kafka.zk.{FeatureZNodeStatus, _}
import org.apache.kafka.common.errors.{ControllerMovedException, StaleBrokerEpochException}
@@ -33,7 +33,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.{ElectionType, TopicPartition, Uuid}
import org.apache.log4j.Level
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.mockito.Mockito.{doAnswer, spy, verify}
import org.mockito.invocation.InvocationOnMock
@@ -41,14 +41,14 @@ import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
-class ControllerIntegrationTest extends ZooKeeperTestHarness {
+class ControllerIntegrationTest extends QuorumTestHarness {
var servers = Seq.empty[KafkaServer]
val firstControllerEpoch = KafkaController.InitialControllerEpoch + 1
val firstControllerEpochZkVersion = KafkaController.InitialControllerEpochZkVersion + 1
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
servers = Seq.empty[KafkaServer]
}
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 6fc18f0..e8fdaf5 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -18,16 +18,17 @@
package kafka.integration
import java.io.File
+import java.util
import java.util.Arrays
+import kafka.server.QuorumTestHarness
import kafka.server._
import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
-import scala.collection.Seq
-import scala.collection.mutable.{ArrayBuffer, Buffer}
+import scala.collection.{Seq, mutable}
+import scala.jdk.CollectionConverters._
import java.util.Properties
import org.apache.kafka.common.{KafkaException, Uuid}
@@ -38,9 +39,25 @@ import org.apache.kafka.common.utils.Time
/**
* A test harness that brings up some number of broker nodes
*/
-abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
+abstract class KafkaServerTestHarness extends QuorumTestHarness {
var instanceConfigs: Seq[KafkaConfig] = null
- var servers: Buffer[KafkaServer] = new ArrayBuffer
+
+ private val _brokers = new mutable.ArrayBuffer[KafkaBroker]
+
+ /**
+ * Get the list of brokers, which could be either BrokerServer objects or KafkaServer objects.
+ */
+ def brokers: mutable.Buffer[KafkaBroker] = _brokers
+
+ /**
+ * Get the list of brokers, as instances of KafkaServer.
+ * This method should only be used when dealing with brokers that use ZooKeeper.
+ */
+ def servers: mutable.Buffer[KafkaServer] = {
+ checkIsZKTest()
+ _brokers.map(_.asInstanceOf[KafkaServer])
+ }
+
var brokerList: String = null
var alive: Array[Boolean] = null
@@ -88,8 +105,8 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
protected def enableForwarding: Boolean = false
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
if (configs.isEmpty)
throw new KafkaException("Must supply at least one server config.")
@@ -100,15 +117,19 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
// Add each broker to `servers` buffer as soon as it is created to ensure that brokers
// are shutdown cleanly in tearDown even if a subsequent broker fails to start
for (config <- configs) {
- servers += TestUtils.createServer(
- config,
- time = brokerTime(config.brokerId),
- threadNamePrefix = None,
- enableForwarding
- )
+ if (isKRaftTest()) {
+ _brokers += createAndStartBroker(config, brokerTime(config.brokerId))
+ } else {
+ _brokers += TestUtils.createServer(
+ config,
+ time = brokerTime(config.brokerId),
+ threadNamePrefix = None,
+ enableForwarding
+ )
+ }
}
- brokerList = TestUtils.bootstrapServers(servers, listenerName)
- alive = new Array[Boolean](servers.length)
+ brokerList = TestUtils.bootstrapServers(_brokers, listenerName)
+ alive = new Array[Boolean](_brokers.length)
Arrays.fill(alive, true)
// default implementation is a no-op, it is overridden by subclasses if required
@@ -117,20 +138,26 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
@AfterEach
override def tearDown(): Unit = {
- if (servers != null) {
- TestUtils.shutdownServers(servers)
- }
+ TestUtils.shutdownServers(_brokers)
super.tearDown()
}
/**
- * Create a topic in ZooKeeper.
+ * Create a topic.
* Wait until the leader is elected and the metadata is propagated to all brokers.
* Return the leader for each partition.
*/
- def createTopic(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1,
- topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] =
- TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers, topicConfig)
+ def createTopic(topic: String,
+ numPartitions: Int = 1,
+ replicationFactor: Int = 1,
+ topicConfig: Properties = new Properties,
+ adminClientConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
+ if (isKRaftTest()) {
+ TestUtils.createTopicWithAdmin(topic, numPartitions, replicationFactor, brokers, topicConfig, adminClientConfig)
+ } else {
+ TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers, topicConfig)
+ }
+ }
/**
* Create a topic in ZooKeeper using a customized replica assignment.
@@ -140,20 +167,28 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
def createTopic(topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): scala.collection.immutable.Map[Int, Int] =
TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment, servers)
+ def deleteTopic(topic: String): Unit = {
+ if (isKRaftTest()) {
+ TestUtils.deleteTopicWithAdmin(topic, brokers)
+ } else {
+ adminZkClient.deleteTopic(topic)
+ }
+ }
+
/**
* Pick a broker at random and kill it if it isn't already dead
* Return the id of the broker killed
*/
def killRandomBroker(): Int = {
- val index = TestUtils.random.nextInt(servers.length)
+ val index = TestUtils.random.nextInt(_brokers.length)
killBroker(index)
index
}
def killBroker(index: Int): Unit = {
if(alive(index)) {
- servers(index).shutdown()
- servers(index).awaitShutdown()
+ _brokers(index).shutdown()
+ _brokers(index).awaitShutdown()
alive(index) = false
}
}
@@ -165,32 +200,50 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
if (reconfigure) {
instanceConfigs = null
}
- for(i <- servers.indices if !alive(i)) {
+ for(i <- _brokers.indices if !alive(i)) {
if (reconfigure) {
- servers(i) = TestUtils.createServer(
+ _brokers(i) = TestUtils.createServer(
configs(i),
time = brokerTime(configs(i).brokerId),
threadNamePrefix = None,
enableForwarding
)
}
- servers(i).startup()
+ _brokers(i).startup()
alive(i) = true
}
}
def waitForUserScramCredentialToAppearOnAllBrokers(clientPrincipal: String, mechanismName: String): Unit = {
- servers.foreach { server =>
+ _brokers.foreach { server =>
val cache = server.credentialProvider.credentialCache.cache(mechanismName, classOf[ScramCredential])
TestUtils.waitUntilTrue(() => cache.get(clientPrincipal) != null, s"SCRAM credentials not created for $clientPrincipal")
}
}
def getController(): KafkaServer = {
+ checkIsZKTest()
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
servers.filter(s => s.config.brokerId == controllerId).head
}
+ def getTopicIds(names: Seq[String]): Map[String, Uuid] = {
+ val result = new util.HashMap[String, Uuid]()
+ if (isKRaftTest()) {
+ val topicIdsMap = controllerServer.controller.findTopicIds(Long.MaxValue, names.asJava).get()
+ names.foreach { name =>
+ val response = topicIdsMap.get(name)
+ result.put(name, response.result())
+ }
+ } else {
+ val topicIdsMap = getController().kafkaController.controllerContext.topicIds.toMap
+ names.foreach { name =>
+ if (topicIdsMap.contains(name)) result.put(name, topicIdsMap.get(name).get)
+ }
+ }
+ result.asScala.toMap
+ }
+
def getTopicIds(): Map[String, Uuid] = {
getController().kafkaController.controllerContext.topicIds.toMap
}
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index cc3c69a..e045ea9 100644
--- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -23,7 +23,7 @@ import kafka.server.KafkaConfig
import kafka.utils.{Logging, TestUtils}
import scala.jdk.CollectionConverters._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaYammerMetrics
@@ -52,7 +52,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
.map(KafkaConfig.fromProps(_, overridingProps))
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
// Do some Metrics Registry cleanup by removing the metrics that this test checks.
// This is a test workaround to the issue that prior harness runs may have left a populated registry.
// see https://issues.apache.org/jira/browse/KAFKA-4605
@@ -61,7 +61,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
metricName.foreach(KafkaYammerMetrics.defaultRegistry.removeMetric)
}
- super.setUp()
+ super.setUp(testInfo)
}
/*
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index d2f9462..9db9e3f 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -18,7 +18,7 @@
package kafka.integration
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.util.Random
import scala.jdk.CollectionConverters._
@@ -30,7 +30,7 @@ import java.util.concurrent.ExecutionException
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{CoreUtils, TestUtils}
import kafka.utils.TestUtils._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.network.ListenerName
@@ -41,7 +41,7 @@ import org.junit.jupiter.api.Assertions._
import scala.annotation.nowarn
-class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
+class UncleanLeaderElectionTest extends QuorumTestHarness {
val brokerId1 = 0
val brokerId2 = 1
@@ -63,8 +63,8 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
val networkProcessorLogger = Logger.getLogger(classOf[kafka.network.Processor])
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
configProps1 = createBrokerConfig(brokerId1, zkConnect)
configProps2 = createBrokerConfig(brokerId2, zkConnect)
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 790536b..f4e69f9 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -22,7 +22,6 @@ import java.util.Properties
import javax.management.ObjectName
import com.yammer.metrics.core.MetricPredicate
-import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import kafka.integration.KafkaServerTestHarness
import kafka.server._
@@ -34,7 +33,11 @@ import kafka.log.LogConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.JmxReporter
import org.apache.kafka.common.utils.Time
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+@Timeout(120)
class MetricsTest extends KafkaServerTestHarness with Logging {
val numNodes = 2
val numParts = 2
@@ -44,57 +47,64 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
overridingProps.put(JmxReporter.EXCLUDE_CONFIG, s"$requiredKafkaServerPrefix=ClusterId")
- def generateConfigs =
- TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
+ def generateConfigs: Seq[KafkaConfig] =
+ TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, enableControlledShutdown = false).
+ map(KafkaConfig.fromProps(_, overridingProps))
val nMessages = 2
- @Test
- def testMetricsReporterAfterDeletingTopic(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testMetricsReporterAfterDeletingTopic(quorum: String): Unit = {
val topic = "test-topic-metric"
createTopic(topic, 1, 1)
- adminZkClient.deleteTopic(topic)
- TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+ deleteTopic(topic)
+ TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic")
}
- @Test
- def testBrokerTopicMetricsUnregisteredAfterDeletingTopic(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testBrokerTopicMetricsUnregisteredAfterDeletingTopic(quorum: String): Unit = {
val topic = "test-broker-topic-metric"
createTopic(topic, 2, 1)
// Produce a few messages to create the metrics
// Don't consume messages as it may cause metrics to be re-created causing the test to fail, see KAFKA-5238
- TestUtils.generateAndProduceMessages(servers, topic, nMessages)
+ TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics don't exist")
- servers.foreach(s => assertNotNull(s.brokerTopicStats.topicStats(topic)))
- adminZkClient.deleteTopic(topic)
- TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+ brokers.foreach(b => assertNotNull(b.brokerTopicStats.topicStats(topic)))
+ deleteTopic(topic)
+ TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic")
}
- @Test
- def testClusterIdMetric(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testClusterIdMetric(quorum: String): Unit = {
// Check if clusterId metric exists.
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == s"$requiredKafkaServerPrefix=ClusterId"), 1)
}
- @Test
- def testBrokerStateMetric(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testBrokerStateMetric(quorum: String): Unit = {
// Check if BrokerState metric exists.
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == s"$requiredKafkaServerPrefix=BrokerState"), 1)
}
- @Test
- def testYammerMetricsCountMetric(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testYammerMetricsCountMetric(quorum: String): Unit = {
// Check if yammer-metrics-count metric exists.
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == s"$requiredKafkaServerPrefix=yammer-metrics-count"), 1)
}
- @Test
- def testLinuxIoMetrics(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testLinuxIoMetrics(quorum: String): Unit = {
// Check if linux-disk-{read,write}-bytes metrics either do or do not exist depending on whether we are or are not
// able to collect those metrics on the platform where this test is running.
val usable = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying).usable()
@@ -104,8 +114,9 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == s"$requiredKafkaServerPrefix=$name"), expectedCount))
}
- @Test
- def testJMXFilter(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testJMXFilter(quorum: String): Unit = {
// Check if cluster id metrics is not exposed in JMX
assertTrue(ManagementFactory.getPlatformMBeanServer
.isRegistered(new ObjectName("kafka.controller:type=KafkaController,name=ActiveControllerCount")))
@@ -113,10 +124,11 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
.isRegistered(new ObjectName(s"$requiredKafkaServerPrefix=ClusterId")))
}
- @Test
- def testUpdateJMXFilter(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testUpdateJMXFilter(quorum: String): Unit = {
// verify previously exposed metrics are removed and existing matching metrics are added
- servers.foreach(server => server.kafkaYammerMetrics.reconfigure(
+ brokers.foreach(broker => broker.kafkaYammerMetrics.reconfigure(
Map(JmxReporter.EXCLUDE_CONFIG -> "kafka.controller:type=KafkaController,name=ActiveControllerCount").asJava
))
assertFalse(ManagementFactory.getPlatformMBeanServer
@@ -125,22 +137,24 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
.isRegistered(new ObjectName(s"$requiredKafkaServerPrefix=ClusterId")))
}
- @Test
- def testGeneralBrokerTopicMetricsAreGreedilyRegistered(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testGeneralBrokerTopicMetricsAreGreedilyRegistered(quorum: String): Unit = {
val topic = "test-broker-topic-metric"
createTopic(topic, 2, 1)
// The broker metrics for all topics should be greedily registered
assertTrue(topicMetrics(None).nonEmpty, "General topic metrics don't exist")
- assertEquals(servers.head.brokerTopicStats.allTopicsStats.metricMap.size, topicMetrics(None).size)
+ assertEquals(brokers.head.brokerTopicStats.allTopicsStats.metricMap.size, topicMetrics(None).size)
// topic metrics should be lazily registered
assertTrue(topicMetricGroups(topic).isEmpty, "Topic metrics aren't lazily registered")
- TestUtils.generateAndProduceMessages(servers, topic, nMessages)
+ TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics aren't registered")
}
- @Test
- def testWindowsStyleTagNames(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testWindowsStyleTagNames(quorum: String): Unit = {
val path = "C:\\windows-path\\kafka-logs"
val tags = Map("dir" -> path)
val expectedMBeanName = Set(tags.keySet.head, ObjectName.quote(path)).mkString("=")
@@ -148,8 +162,9 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assert(metric.getMBeanName.endsWith(expectedMBeanName))
}
- @Test
- def testBrokerTopicMetricsBytesInOut(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testBrokerTopicMetricsBytesInOut(quorum: String): Unit = {
val topic = "test-bytes-in-out"
val replicationBytesIn = BrokerTopicStats.ReplicationBytesInPerSec
val replicationBytesOut = BrokerTopicStats.ReplicationBytesOutPerSec
@@ -160,20 +175,20 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, "2")
createTopic(topic, 1, numNodes, topicConfig)
// Produce a few messages to create the metrics
- TestUtils.generateAndProduceMessages(servers, topic, nMessages)
+ TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
// Check the log size for each broker so that we can distinguish between failures caused by replication issues
// versus failures caused by the metrics
val topicPartition = new TopicPartition(topic, 0)
- servers.foreach { server =>
- val log = server.logManager.getLog(new TopicPartition(topic, 0))
- val brokerId = server.config.brokerId
+ brokers.foreach { broker =>
+ val log = broker.logManager.getLog(new TopicPartition(topic, 0))
+ val brokerId = broker.config.brokerId
val logSize = log.map(_.size)
assertTrue(logSize.exists(_ > 0), s"Expected broker $brokerId to have a Log for $topicPartition with positive size, actual: $logSize")
}
// Consume messages to make bytesOut tick
- TestUtils.consumeTopicRecords(servers, topic, nMessages)
+ TestUtils.consumeTopicRecords(brokers, topic, nMessages)
val initialReplicationBytesIn = TestUtils.meterCount(replicationBytesIn)
val initialReplicationBytesOut = TestUtils.meterCount(replicationBytesOut)
val initialBytesIn = TestUtils.meterCount(bytesIn)
@@ -183,20 +198,21 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertEquals(initialBytesOut, TestUtils.meterCount(bytesOut))
// Produce a few messages to make the metrics tick
- TestUtils.generateAndProduceMessages(servers, topic, nMessages)
+ TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
assertTrue(TestUtils.meterCount(replicationBytesIn) > initialReplicationBytesIn)
assertTrue(TestUtils.meterCount(replicationBytesOut) > initialReplicationBytesOut)
assertTrue(TestUtils.meterCount(bytesIn) > initialBytesIn)
// Consume messages to make bytesOut tick
- TestUtils.consumeTopicRecords(servers, topic, nMessages)
+ TestUtils.consumeTopicRecords(brokers, topic, nMessages)
assertTrue(TestUtils.meterCount(bytesOut) > initialBytesOut)
}
- @Test
- def testControllerMetrics(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk"))
+ def testZkControllerMetrics(quorum: String): Unit = {
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=ActiveControllerCount"), 1)
@@ -216,13 +232,17 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
* Test that the metrics are created with the right name, testZooKeeperStateChangeRateMetrics
* and testZooKeeperSessionStateMetric in ZooKeeperClientTest test the metrics behaviour.
*/
- @Test
- def testSessionExpireListenerMetrics(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSessionExpireListenerMetrics(quorum: String): Unit = {
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
-
- assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=SessionState"), 1)
- assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec"), 1)
- assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec"), 1)
+ val expectedNumMetrics = if (isKRaftTest()) 0 else 1
+ assertEquals(expectedNumMetrics, metrics.keySet.asScala.
+ count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=SessionState"))
+ assertEquals(expectedNumMetrics, metrics.keySet.asScala.
+ count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec"))
+ assertEquals(expectedNumMetrics, metrics.keySet.asScala.
+ count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec"))
}
private def topicMetrics(topic: Option[String]): Set[String] = {
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 74803ce..3bbce4d 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -19,13 +19,14 @@ package kafka.security.auth
import java.nio.charset.StandardCharsets
import kafka.admin.ZkSecurityMigrator
+import kafka.server.QuorumTestHarness
import kafka.utils.{Logging, TestUtils}
import kafka.zk._
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.data.{ACL, Stat}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.util.{Failure, Success, Try}
import javax.security.auth.login.Configuration
@@ -40,16 +41,16 @@ import org.apache.zookeeper.client.ZKClientConfig
import scala.jdk.CollectionConverters._
import scala.collection.Seq
-class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
+class ZkAuthorizationTest extends QuorumTestHarness with Logging {
val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
val authProvider = "zookeeper.authProvider.1"
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath)
Configuration.setConfiguration(null)
System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
- super.setUp()
+ super.setUp(testInfo)
}
@AfterEach
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
index a49b0d3..9011eb6 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
@@ -25,9 +25,9 @@ import java.util.concurrent.{Executors, Semaphore, TimeUnit}
import kafka.Kafka
import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1}
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
-import kafka.server.KafkaConfig
+import kafka.server.{KafkaConfig, QuorumTestHarness}
import kafka.utils.TestUtils
-import kafka.zk.{ZkAclStore, ZooKeeperTestHarness}
+import kafka.zk.ZkAclStore
import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.acl.AclOperation._
@@ -44,12 +44,12 @@ import org.apache.kafka.server.authorizer._
import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils}
import org.apache.zookeeper.client.ZKClientConfig
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
import scala.collection.mutable
-class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
+class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
private val allowReadAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, READ, ALLOW)
private val allowWriteAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, WRITE, ALLOW)
@@ -70,8 +70,8 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
override def authorizer: Authorizer = aclAuthorizer
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
// Increase maxUpdateRetries to avoid transient failures
aclAuthorizer.maxUpdateRetries = Int.MaxValue
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala
index 15ea42d..e38cafd 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala
@@ -25,10 +25,10 @@ import javax.security.auth.Subject
import javax.security.auth.callback.CallbackHandler
import kafka.api.SaslSetup
import kafka.security.authorizer.AclEntry.WildcardHost
-import kafka.server.KafkaConfig
+import kafka.server.{KafkaConfig, QuorumTestHarness}
import kafka.utils.JaasTestUtils.{JaasModule, JaasSection}
import kafka.utils.{JaasTestUtils, TestUtils}
-import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
+import kafka.zk.KafkaZkClient
import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
import org.apache.kafka.common.acl.AclOperation.{READ, WRITE}
@@ -43,12 +43,12 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.apache.zookeeper.server.auth.DigestLoginModule
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
import scala.collection.Seq
-class AclAuthorizerWithZkSaslTest extends ZooKeeperTestHarness with SaslSetup {
+class AclAuthorizerWithZkSaslTest extends QuorumTestHarness with SaslSetup {
private val aclAuthorizer = new AclAuthorizer
private val aclAuthorizer2 = new AclAuthorizer
@@ -60,7 +60,7 @@ class AclAuthorizerWithZkSaslTest extends ZooKeeperTestHarness with SaslSetup {
private var config: KafkaConfig = _
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
// Allow failed clients to avoid server closing the connection before reporting AuthFailed.
System.setProperty("zookeeper.allowSaslFailedClients", "true")
@@ -76,7 +76,7 @@ class AclAuthorizerWithZkSaslTest extends ZooKeeperTestHarness with SaslSetup {
aclAuthorizer.maxUpdateRetries = Int.MaxValue
aclAuthorizer2.maxUpdateRetries = Int.MaxValue
- super.setUp()
+ super.setUp(testInfo)
config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))
aclAuthorizer.configure(config.originals)
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
index bccd58a..6852ab3 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
@@ -20,24 +20,24 @@ import java.util.concurrent.CompletionStage
import java.{lang, util}
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.acl._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.authorizer._
import org.apache.zookeeper.client.ZKClientConfig
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
-class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
+class AuthorizerInterfaceDefaultTest extends QuorumTestHarness with BaseAuthorizerTest {
private val interfaceDefaultAuthorizer = new DelegateAuthorizer
override def authorizer: Authorizer = interfaceDefaultAuthorizer
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
// Increase maxUpdateRetries to avoid transient failures
interfaceDefaultAuthorizer.authorizer.maxUpdateRetries = Int.MaxValue
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index 16de1df..523b6a7 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -24,9 +24,9 @@ import java.util.{Base64, Properties}
import kafka.network.RequestChannel.Session
import kafka.security.authorizer.{AclAuthorizer, AuthorizerUtils}
import kafka.security.authorizer.AclEntry.WildcardHost
-import kafka.server.{CreateTokenResult, Defaults, DelegationTokenManager, KafkaConfig}
+import kafka.server.{CreateTokenResult, Defaults, DelegationTokenManager, KafkaConfig, QuorumTestHarness}
import kafka.utils.TestUtils
-import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
+import kafka.zk.KafkaZkClient
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclOperation}
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType.ALLOW
@@ -41,12 +41,12 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
import org.apache.kafka.common.utils.{MockTime, SecurityUtils, Time}
import org.apache.kafka.server.authorizer._
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.Buffer
-class DelegationTokenManagerTest extends ZooKeeperTestHarness {
+class DelegationTokenManagerTest extends QuorumTestHarness {
val time = new MockTime()
val owner = SecurityUtils.parseKafkaPrincipal("User:owner")
@@ -64,8 +64,8 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
var expiryTimeStamp: Long = 0
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
props = TestUtils.createBrokerConfig(0, zkConnect, enableToken = true)
props.put(KafkaConfig.SaslEnabledMechanismsProp, ScramMechanism.mechanismNames().asScala.mkString(","))
props.put(KafkaConfig.DelegationTokenSecretKeyProp, secretKey)
diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
index b4140d0..0a98d26 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
@@ -35,8 +35,8 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {
properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
createTopic(topic1, numPartitions, servers.size, new Properties())
}
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index db01370..3f74863 100755
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -19,13 +19,13 @@ package kafka.server
import org.junit.jupiter.api.Assertions._
import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.{AfterEach, Test}
import scala.collection.mutable.ArrayBuffer
-class AdvertiseBrokerTest extends ZooKeeperTestHarness {
+class AdvertiseBrokerTest extends QuorumTestHarness {
val servers = ArrayBuffer[KafkaServer]()
val brokerId = 0
diff --git a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
index 46be884..0e1b148 100755
--- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
@@ -24,7 +24,7 @@ import kafka.cluster.Broker
import kafka.controller.{ControllerChannelManager, ControllerContext, StateChangeLogger}
import kafka.utils.TestUtils
import kafka.utils.TestUtils.createTopic
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
@@ -36,19 +36,19 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Time
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
-class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
+class BrokerEpochIntegrationTest extends QuorumTestHarness {
val brokerId1 = 0
val brokerId2 = 1
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
val configs = Seq(
TestUtils.createBrokerConfig(brokerId1, zkConnect),
TestUtils.createBrokerConfig(brokerId2, zkConnect))
diff --git a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
index 20e89ed..c05044e 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
@@ -45,8 +45,7 @@ import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Assertions.fail
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
@@ -106,8 +105,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
// Define a quota for ThrottledPrincipal
defineUserQuota(ThrottledPrincipal.getName, Some(ControllerMutationRate))
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
index 2addda2..1d355ce 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -21,7 +21,7 @@ import java.util
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.errors.UnsupportedByAuthenticationException
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions.assertThrows
import scala.concurrent.ExecutionException
@@ -32,8 +32,8 @@ class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
override def brokerCount = 1
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
}
def createAdminConfig: util.Map[String, Object] = {
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index a6a09ee..4f304bd 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.errors.InvalidPrincipalTypeException
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.SecurityUtils
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.util
import scala.concurrent.ExecutionException
@@ -40,9 +40,9 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
override def brokerCount = 1
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
- super.setUp()
+ super.setUp(testInfo)
}
override def generateConfigs = {
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
index ba621f2..e1e5c30 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.errors.DelegationTokenDisabledException
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.Assertions.assertThrows
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.util
import scala.concurrent.ExecutionException
@@ -38,9 +38,9 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
override def brokerCount = 1
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
- super.setUp()
+ super.setUp(testInfo)
}
def createAdminConfig: util.Map[String, Object] = {
diff --git a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
index 5b2a16e..222ff2d 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
@@ -19,6 +19,7 @@ package kafka.server
import java.lang.{Byte => JByte}
import java.util.Properties
+
import kafka.network.SocketServer
import kafka.security.authorizer.AclEntry
import org.apache.kafka.common.message.{DescribeClusterRequestData, DescribeClusterResponseData}
@@ -27,7 +28,7 @@ import org.apache.kafka.common.requests.{DescribeClusterRequest, DescribeCluster
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.utils.Utils
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
@@ -40,8 +41,8 @@ class DescribeClusterRequestTest extends BaseRequestTest {
}
@BeforeEach
- override def setUp(): Unit = {
- doSetup(createOffsetsTopic = false)
+ override def setUp(testInfo: TestInfo): Unit = {
+ doSetup(testInfo, createOffsetsTopic = false)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
index 3bcb626..1fb9f33 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -18,13 +18,13 @@ package kafka.server
import kafka.admin.AdminOperationException
import kafka.utils.CoreUtils._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.common.config._
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Test
-class DynamicConfigTest extends ZooKeeperTestHarness {
+class DynamicConfigTest extends QuorumTestHarness {
private final val nonExistentConfig: String = "some.config.that.does.not.exist"
private final val someValue: String = "some interesting value"
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index b21666e..7038678a 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
@@ -35,8 +35,8 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
override def brokerCount: Int = 1
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
initProducer()
}
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
index f8166dd..db57ec9 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.util.{Optional, Properties}
import scala.jdk.CollectionConverters._
@@ -58,8 +58,8 @@ class FetchRequestMaxBytesTest extends BaseRequestTest {
}
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers))
}
diff --git a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
index d7de098..d59474e 100644
--- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
@@ -19,7 +19,8 @@ package kafka.server
import java.util.concurrent.{CountDownLatch, TimeoutException}
-import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion, ZooKeeperTestHarness}
+import kafka.server.QuorumTestHarness
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
@@ -29,7 +30,7 @@ import org.junit.jupiter.api.Test
import scala.jdk.CollectionConverters._
-class FinalizedFeatureChangeListenerTest extends ZooKeeperTestHarness {
+class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
private def createBrokerFeatures(): BrokerFeatures = {
val supportedFeaturesMap = Map[String, SupportedVersionRange](
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
index cd14d3c..26f6a12 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
@@ -20,11 +20,11 @@ import java.util.concurrent.atomic.AtomicReference
import kafka.metrics.KafkaMetricsReporter
import kafka.utils.{CoreUtils, TestUtils, VerifiableProperties}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener}
import org.apache.kafka.test.MockMetricsReporter
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.apache.kafka.test.TestUtils.isValidClusterId
object KafkaMetricReporterClusterIdTest {
@@ -75,13 +75,13 @@ object KafkaMetricReporterClusterIdTest {
}
}
-class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness {
+class KafkaMetricReporterClusterIdTest extends QuorumTestHarness {
var server: KafkaServer = null
var config: KafkaConfig = null
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
val props = TestUtils.createBrokerConfig(1, zkConnect)
props.setProperty(KafkaConfig.KafkaMetricsReporterClassesProp, "kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter")
props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricReporterClusterIdTest$MockBrokerMetricsReporter")
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
index f480bc5..c84a91b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.metrics.KafkaMetric
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.protocol.Errors
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.common.message.ListGroupsRequestData
@@ -44,8 +44,8 @@ class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest {
}
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
// need a quota prop to register a "throttle-time" metrics after server startup
val quotaProps = new Properties()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index 8a3d7cf..7e5d791 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -21,10 +21,10 @@ import java.util
import java.util.concurrent.atomic.AtomicReference
import kafka.utils.{CoreUtils, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter}
import org.junit.jupiter.api.Assertions.{assertEquals}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
@@ -59,13 +59,13 @@ object KafkaMetricsReporterTest {
}
}
-class KafkaMetricsReporterTest extends ZooKeeperTestHarness {
+class KafkaMetricsReporterTest extends QuorumTestHarness {
var server: KafkaServer = null
var config: KafkaConfig = null
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
val props = TestUtils.createBrokerConfig(1, zkConnect)
props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricsReporterTest$MockMetricsReporter")
props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true")
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
index 8056e23..6ab6930 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -19,13 +19,13 @@ package kafka.server
import kafka.api.ApiVersion
import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, fail}
import org.junit.jupiter.api.Test
import java.util.Properties
-class KafkaServerTest extends ZooKeeperTestHarness {
+class KafkaServerTest extends QuorumTestHarness {
@Test
def testAlreadyRegisteredAdvertisedListeners(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index e5e27b5..a1fb7cd 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -29,16 +29,16 @@ import kafka.utils.TestUtils
import kafka.cluster.Broker
import kafka.controller.{ControllerChannelManager, ControllerContext, StateChangeLogger}
import kafka.utils.TestUtils._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Time
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
-class LeaderElectionTest extends ZooKeeperTestHarness {
+class LeaderElectionTest extends QuorumTestHarness {
val brokerId1 = 0
val brokerId2 = 1
@@ -47,8 +47,8 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
var staleControllerEpochDetected = false
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
val configProps1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, enableControlledShutdown = false)
val configProps2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, enableControlledShutdown = false)
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 6d6d881..1025d7a 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderOrFollowerException}
import org.apache.kafka.common.utils.Utils
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@@ -50,8 +50,8 @@ class LogDirFailureTest extends IntegrationTestHarness {
this.serverConfig.setProperty(KafkaConfig.NumReplicaFetchersProp, "1")
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
createTopic(topic, partitionNum, brokerCount)
}
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 93c7094..30fc72d 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -22,17 +22,17 @@ import scala.collection.Seq
import kafka.utils.TestUtils
import TestUtils._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import java.io.File
import kafka.server.checkpoints.OffsetCheckpointFile
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
-class LogRecoveryTest extends ZooKeeperTestHarness {
+class LogRecoveryTest extends QuorumTestHarness {
val replicaLagTimeMaxMs = 5000L
val replicaLagMaxMessages = 10L
@@ -75,8 +75,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
}
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
configs = TestUtils.createBrokerConfigs(2, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 42eebb3..d91d58e 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.test.TestUtils.isValidClusterId
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@@ -36,8 +36,8 @@ import scala.jdk.CollectionConverters._
class MetadataRequestTest extends AbstractMetadataRequestTest {
@BeforeEach
- override def setUp(): Unit = {
- doSetup(createOffsetsTopic = false)
+ override def setUp(testInfo: TestInfo): Unit = {
+ doSetup(testInfo, createOffsetsTopic = false)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala
index 8409c96..3580e2b 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.MetadataRequest
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@@ -30,8 +30,8 @@ import scala.jdk.CollectionConverters._
class MetadataRequestWithForwardingTest extends AbstractMetadataRequestTest {
@BeforeEach
- override def setUp(): Unit = {
- doSetup(createOffsetsTopic = false)
+ override def setUp(testInfo: TestInfo): Unit = {
+ doSetup(testInfo, createOffsetsTopic = false)
}
override def enableForwarding: Boolean = true
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 5e821fb..477f3eb 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -25,10 +25,10 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test}
-
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import java.util
import java.util.Collections.singletonList
+
import scala.jdk.CollectionConverters._
import java.util.{Optional, Properties}
@@ -74,8 +74,8 @@ class OffsetFetchRequestTest extends BaseRequestTest {
}
@BeforeEach
- override def setUp(): Unit = {
- doSetup(createOffsetsTopic = false)
+ override def setUp(testInfo: TestInfo): Unit = {
+ doSetup(testInfo, createOffsetsTopic = false)
TestUtils.createOffsetsTopic(zkClient, servers)
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 963c9bf..4a5ab60 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -19,22 +19,22 @@ package kafka.server
import scala.collection.Seq
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import kafka.zk.ZooKeeperTestHarness
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import kafka.server.QuorumTestHarness
import kafka.utils.TestUtils
import TestUtils._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringSerializer
-class ReplicaFetchTest extends ZooKeeperTestHarness {
+class ReplicaFetchTest extends QuorumTestHarness {
var brokers: Seq[KafkaServer] = null
val topic1 = "foo"
val topic2 = "bar"
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
val props = createBrokerConfigs(2, zkConnect)
brokers = props.map(KafkaConfig.fromProps).map(TestUtils.createServer(_))
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index cbd882d..4b7cae8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -25,7 +25,7 @@ import kafka.server.QuotaType._
import kafka.utils.TestUtils._
import kafka.utils.CoreUtils._
import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.junit.jupiter.api.Assertions._
@@ -42,7 +42,7 @@ import scala.jdk.CollectionConverters._
*
* Anything over 100MB/s tends to fail as this is the non-throttled replication rate
*/
-class ReplicationQuotasTest extends ZooKeeperTestHarness {
+class ReplicationQuotasTest extends QuorumTestHarness {
def percentError(percent: Int, value: Long): Long = Math.round(value * percent / 100.0)
val msg100KB = new Array[Byte](100000)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index e0bb504..56359df 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -50,7 +50,7 @@ import org.apache.kafka.common._
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
@@ -85,9 +85,9 @@ class RequestQuotaTest extends BaseRequestTest {
}
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
RequestQuotaTest.principal = KafkaPrincipal.ANONYMOUS
- super.setUp()
+ super.setUp(testInfo)
createTopic(topic, numPartitions)
leaderNode = servers.head
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index d6b8103..096debe 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -20,15 +20,15 @@ import java.util.Properties
import scala.collection.Seq
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import kafka.utils.TestUtils
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
import java.io.File
import org.apache.zookeeper.KeeperException.NodeExistsException
-class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
+class ServerGenerateBrokerIdTest extends QuorumTestHarness {
var props1: Properties = null
var config1: KafkaConfig = null
var props2: Properties = null
@@ -37,8 +37,8 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
var servers: Seq[KafkaServer] = Seq()
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
props1 = TestUtils.createBrokerConfig(-1, zkConnect)
config1 = KafkaConfig.fromProps(props1)
props2 = TestUtils.createBrokerConfig(0, zkConnect)
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
index 0e9c744..fd9b365 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
@@ -26,14 +26,14 @@ import ExecutionContext.Implicits._
import kafka.common.{InconsistentBrokerMetadataException, InconsistentClusterIdException}
import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.apache.kafka.test.TestUtils.isValidClusterId
-class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
+class ServerGenerateClusterIdTest extends QuorumTestHarness {
var config1: KafkaConfig = null
var config2: KafkaConfig = null
var config3: KafkaConfig = null
@@ -41,8 +41,8 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
val brokerMetaPropsFile = "meta.properties"
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
config1 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, zkConnect))
config2 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(2, zkConnect))
config3 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(3, zkConnect))
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index a523bd7..013d084 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.server
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import kafka.utils.{CoreUtils, TestUtils}
import kafka.utils.TestUtils._
@@ -40,14 +40,14 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.BrokerState
-import org.junit.jupiter.api.{BeforeEach, Test, Timeout}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
import org.junit.jupiter.api.Assertions._
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
@Timeout(60)
-class ServerShutdownTest extends ZooKeeperTestHarness {
+class ServerShutdownTest extends QuorumTestHarness {
var config: KafkaConfig = null
val host = "localhost"
val topic = "test"
@@ -55,8 +55,8 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
val sent2 = List("more", "messages")
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
val props = TestUtils.createBrokerConfig(0, zkConnect)
config = KafkaConfig.fromProps(props)
}
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index bf85e99..e80084d 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -18,14 +18,14 @@
package kafka.server
import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.common.KafkaException
import org.apache.kafka.metadata.BrokerState
import org.apache.zookeeper.KeeperException.NodeExistsException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
-class ServerStartupTest extends ZooKeeperTestHarness {
+class ServerStartupTest extends QuorumTestHarness {
private var server: KafkaServer = null
diff --git a/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala
index 5c38026..224474e 100644
--- a/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{DeleteTopicsRequest, DeleteTopicsResponse, FetchRequest, FetchResponse, MetadataRequest, MetadataResponse}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@@ -43,8 +43,8 @@ class TopicIdWithOldInterBrokerProtocolTest extends BaseRequestTest {
}
@BeforeEach
- override def setUp(): Unit = {
- doSetup(createOffsetsTopic = false)
+ override def setUp(testInfo: TestInfo): Unit = {
+ doSetup(testInfo, createOffsetsTopic = false)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 33776cf..72d2866 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -26,14 +26,14 @@ import kafka.server.{KafkaConfig, KafkaServer}
import kafka.tools.DumpLogSegments
import kafka.utils.{CoreUtils, Logging, TestUtils}
import kafka.utils.TestUtils._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ListBuffer => Buffer}
@@ -47,7 +47,7 @@ import scala.collection.Seq
*
* A test which validates the end to end workflow is also included.
*/
-class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness with Logging {
+class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness with Logging {
// Set this to KAFKA_0_11_0_IV1 to demonstrate the tests failing in the pre-KIP-101 case
val apiVersion = ApiVersion.latestVersion
@@ -59,8 +59,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = null
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
}
@AfterEach
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 206cee6..2557d35 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -22,7 +22,7 @@ import kafka.server.{BlockingSend, KafkaServer, ReplicaFetcherBlockingSend}
import kafka.utils.Implicits._
import kafka.utils.TestUtils._
import kafka.utils.{Logging, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors._
@@ -43,7 +43,7 @@ import scala.jdk.CollectionConverters._
import scala.collection.Map
import scala.collection.mutable.ListBuffer
-class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
+class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
var brokers: ListBuffer[KafkaServer] = ListBuffer()
val topic1 = "foo"
val topic2 = "bar"
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index c62224c..a610956 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -19,12 +19,13 @@ package kafka.utils
import kafka.api.LeaderAndIsr
import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.server.QuorumTestHarness
import kafka.zk._
import org.apache.kafka.common.TopicPartition
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
-class ReplicationUtilsTest extends ZooKeeperTestHarness {
+class ReplicationUtilsTest extends QuorumTestHarness {
private val zkVersion = 1
private val topic = "my-topic-test"
private val partition = 0
@@ -34,8 +35,8 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
private val isr = List(1, 2)
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
zkClient.makeSurePersistentPathExists(TopicZNode.path(topic))
val topicPartition = new TopicPartition(topic, partition)
val leaderAndIsr = LeaderAndIsr(leader, leaderEpoch, isr, 1)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 378c0f5..def86cb 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -67,6 +67,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaFuture, TopicPartition}
+import org.apache.kafka.controller.QuorumController
import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.apache.zookeeper.KeeperException.SessionExpiredException
@@ -2144,7 +2145,8 @@ object TestUtils extends Logging {
KafkaProducer.NETWORK_THREAD_PREFIX,
AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
- ZooKeeperTestHarness.ZkClientEventThreadSuffix
+ QuorumTestHarness.ZkClientEventThreadSuffix,
+ QuorumController.CONTROLLER_THREAD_SUFFIX
)
def unexpectedThreads: Set[String] = {
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 253188c..5e9817b 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -23,11 +23,11 @@ import kafka.controller.ReplicaAssignment
import kafka.log._
import kafka.server.DynamicConfig.Broker._
import kafka.server.KafkaConfig._
-import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
+import kafka.server.{ConfigType, KafkaConfig, KafkaServer, QuorumTestHarness}
import kafka.utils.CoreUtils._
import kafka.utils.TestUtils._
import kafka.utils.{Logging, TestUtils}
-import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
+import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
@@ -40,7 +40,7 @@ import org.junit.jupiter.api.{AfterEach, Test}
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq, immutable}
-class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
+class AdminZkClientTest extends QuorumTestHarness with Logging with RackAwareTest {
var servers: Seq[KafkaServer] = Seq()
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 2088e5f..6be954d 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
import kafka.log.LogConfig
-import kafka.server.{ConfigType, KafkaConfig}
+import kafka.server.{ConfigType, KafkaConfig, QuorumTestHarness}
import kafka.utils.CoreUtils
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.network.ListenerName
@@ -31,7 +31,7 @@ import org.apache.kafka.common.security.token.delegation.TokenInformation
import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.zookeeper.KeeperException.{Code, NoAuthException, NoNodeException, NodeExistsException}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
@@ -56,7 +56,7 @@ import org.apache.zookeeper.data.Stat
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
-class KafkaZkClientTest extends ZooKeeperTestHarness {
+class KafkaZkClientTest extends QuorumTestHarness {
private val group = "my-group"
private val topic1 = "topic1"
@@ -73,8 +73,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
var expiredSessionZkClient: ExpiredKafkaZkClient = _
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
zkClient.createControllerEpochRaw(1)
otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient",
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
deleted file mode 100755
index a81cbb9..0000000
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.zk
-
-import javax.security.auth.login.Configuration
-import kafka.utils.{CoreUtils, Logging, TestUtils}
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.client.ZKClientConfig
-import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag}
-
-@Tag("integration")
-abstract class ZooKeeperTestHarness extends Logging {
-
- val zkConnectionTimeout = 10000
- val zkSessionTimeout = 15000 // Allows us to avoid ZK session expiration due to GC up to 2/3 * 15000ms = 10 secs
- val zkMaxInFlightRequests = Int.MaxValue
-
- protected def zkAclsEnabled: Option[Boolean] = None
-
- var zkClient: KafkaZkClient = null
- var adminZkClient: AdminZkClient = null
-
- var zookeeper: EmbeddedZookeeper = null
-
- def zkPort: Int = zookeeper.port
- def zkConnect: String = s"127.0.0.1:$zkPort"
-
- @BeforeEach
- def setUp(): Unit = {
- zookeeper = new EmbeddedZookeeper()
- zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
- zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "ZooKeeperTestHarness", new ZKClientConfig)
- adminZkClient = new AdminZkClient(zkClient)
- }
-
- @AfterEach
- def tearDown(): Unit = {
- shutdownZooKeeper()
- Configuration.setConfiguration(null)
- }
-
- def shutdownZooKeeper(): Unit = {
- if (zkClient != null)
- zkClient.close()
- if (zookeeper != null)
- CoreUtils.swallow(zookeeper.shutdown(), this)
- }
-
- // Trigger session expiry by reusing the session id in another client
- def createZooKeeperClientToTriggerSessionExpiry(zooKeeper: ZooKeeper): ZooKeeper = {
- val dummyWatcher = new Watcher {
- override def process(event: WatchedEvent): Unit = {}
- }
- val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
- zooKeeper.getSessionId,
- zooKeeper.getSessionPasswd)
- assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works
- anotherZkClient
- }
-}
-
-object ZooKeeperTestHarness {
- val ZkClientEventThreadSuffix = "-EventThread"
-
- /**
- * Verify that a previous test that doesn't use ZooKeeperTestHarness hasn't left behind an unexpected thread.
- * This assumes that brokers, ZooKeeper clients, producers and consumers are not created in another @BeforeClass,
- * which is true for core tests where this harness is used.
- */
- @BeforeAll
- def setUpClass(): Unit = {
- TestUtils.verifyNoUnexpectedThreads("@BeforeAll")
- }
-
- /**
- * Verify that tests from the current test class using ZooKeeperTestHarness haven't left behind an unexpected thread
- */
- @AfterAll
- def tearDownClass(): Unit = {
- TestUtils.verifyNoUnexpectedThreads("@AfterAll")
- }
-
-}
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 5cfb9ee..649f3c5 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -26,7 +26,7 @@ import com.yammer.metrics.core.{Gauge, Meter, MetricName}
import kafka.server.KafkaConfig
import kafka.metrics.KafkaYammerMetrics
import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.QuorumTestHarness
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
@@ -35,21 +35,21 @@ import org.apache.zookeeper.ZooKeeper.States
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertThrows, assertTrue, fail}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
-class ZooKeeperClientTest extends ZooKeeperTestHarness {
+class ZooKeeperClientTest extends QuorumTestHarness {
private val mockPath = "/foo"
private val time = Time.SYSTEM
private var zooKeeperClient: ZooKeeperClient = _
@BeforeEach
- override def setUp(): Unit = {
+ override def setUp(testInfo: TestInfo): Unit = {
TestUtils.verifyNoUnexpectedThreads("@BeforeEach")
cleanMetricsRegistry()
- super.setUp()
+ super.setUp(testInfo)
zooKeeperClient = newZooKeeperClient()
}
@@ -88,9 +88,9 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
def testConnection(): Unit = {
val client = newZooKeeperClient()
try {
- // Verify ZooKeeper event thread name. This is used in ZooKeeperTestHarness to verify that tests have closed ZK clients
+ // Verify ZooKeeper event thread name. This is used in QuorumTestHarness to verify that tests have closed ZK clients
val threads = Thread.getAllStackTraces.keySet.asScala.map(_.getName)
- assertTrue(threads.exists(_.contains(ZooKeeperTestHarness.ZkClientEventThreadSuffix)),
+ assertTrue(threads.exists(_.contains(QuorumTestHarness.ZkClientEventThreadSuffix)),
s"ZooKeeperClient event thread not found, threads=$threads")
} finally {
client.close()
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 67d4eb0..1a1c5d0 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -232,7 +232,7 @@ public final class QuorumController implements Controller {
}
KafkaEventQueue queue = null;
try {
- queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
+ queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
return new QuorumController(logContext, nodeId, queue, time, configDefs,
raftClient, supportedFeatures, defaultReplicationFactor,
defaultNumPartitions, replicaPlacer, snapshotMaxNewRecordBytes,
@@ -245,6 +245,8 @@ public final class QuorumController implements Controller {
}
}
+ public static final String CONTROLLER_THREAD_SUFFIX = "QuorumControllerEventHandler";
+
private static final String ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX =
"The active controller appears to be node ";
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
index 31dd43f..e1dd217 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
@@ -17,6 +17,7 @@
package org.apache.kafka.server.log.remote.metadata.storage;
import kafka.api.IntegrationTestHarness;
+import kafka.utils.EmptyTestInfo;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
@@ -60,7 +61,7 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa
public void initialize(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread) {
// Call setup to start the cluster.
- super.setUp();
+ super.setUp(new EmptyTestInfo());
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread);
}