You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/09 20:36:20 UTC

[kafka] branch 3.3 updated (ce7d0f5fa51 -> 8c8cb111a4f)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a change to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


    from ce7d0f5fa51 MINOR: BrokerMetadataSnapshotter must avoid exceeding batch size (#12486)
     new 8db3f0998c6 KAFKA-14124: improve quorum controller fault handling (#12447)
     new 2ee58902d4c MINOR: add :server-common test dependency to :storage (#12488)
     new bbd659325a6 KAFKA-14129: KRaft must check manual assignments for createTopics are contiguous (#12467)
     new 4a94d25b6d2 MINOR: convert some more junit tests to support KRaft (#12456)
     new 480e97914e1 KAFKA-13166 Fix missing ControllerApis error handling (#12403)
     new 112294334f4 MINOR: Convert some junit tests to kraft (#12443)
     new 8c8cb111a4f KAFKA-14051: Create metrics reporters in KRaft remote controllers (#12396)

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build.gradle                                       |   4 +
 checkstyle/import-control-core.xml                 |   1 +
 checkstyle/import-control.xml                      |   4 +
 checkstyle/suppressions.xml                        |   2 +
 core/src/main/scala/kafka/server/AclApis.scala     |  32 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |   5 +-
 .../main/scala/kafka/server/ControllerApis.scala   | 121 ++++---
 .../main/scala/kafka/server/ControllerServer.scala |  25 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  46 ++-
 .../main/scala/kafka/server/KafkaRaftServer.scala  |   6 +-
 .../kafka/server/metadata/KRaftMetadataCache.scala |   2 +-
 .../java/kafka/testkit/KafkaClusterTestKit.java    |  36 +-
 .../kafka/api/BaseProducerSendTest.scala           | 102 +++---
 .../kafka/api/PlaintextProducerSendTest.scala      |  51 +--
 .../kafka/api/ProducerCompressionTest.scala        |  49 +--
 .../api/TransactionsWithMaxInFlightOneTest.scala   |  24 +-
 .../kafka/server/QuorumTestHarness.scala           |  46 ++-
 .../kafka/tools/MirrorMakerIntegrationTest.scala   |  24 +-
 .../scala/unit/kafka/admin/AddPartitionsTest.scala | 164 ++++++---
 .../AddPartitionsToTxnRequestServerTest.scala      |  16 +-
 .../unit/kafka/server/ControllerApisTest.scala     |  30 ++
 .../unit/kafka/server/EdgeCaseRequestTest.scala    |  32 +-
 .../kafka/server/KafkaMetricsReporterTest.scala    |  55 +--
 .../unit/kafka/server/ServerShutdownTest.scala     |  10 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  60 +++-
 .../apache/kafka/controller/QuorumController.java  | 382 ++++++++++++---------
 .../controller/ReplicationControlManager.java      |   8 +-
 .../metadata/fault/MetadataFaultException.java     |  17 +-
 .../kafka/metadata/fault/MetadataFaultHandler.java |  20 +-
 .../kafka/controller/QuorumControllerTest.java     |  25 ++
 .../kafka/controller/QuorumControllerTestEnv.java  |  15 +
 .../apache/kafka/server/fault/FaultHandler.java    |  58 ++++
 .../server/fault/ProcessExitingFaultHandler.java   |  19 +-
 .../kafka/server/fault/MockFaultHandler.java       |  65 ++++
 .../server/fault/MockFaultHandlerException.java    |  29 +-
 35 files changed, 1069 insertions(+), 516 deletions(-)
 copy clients/src/main/java/org/apache/kafka/common/errors/NotControllerException.java => metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java (74%)
 copy trogdor/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskController.java => metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java (62%)
 create mode 100644 server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
 copy trogdor/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskController.java => server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java (65%)
 create mode 100644 server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
 copy trogdor/src/main/java/org/apache/kafka/trogdor/task/AgentWorkerStatusTracker.java => server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java (54%)


[kafka] 04/07: MINOR: convert some more junit tests to support KRaft (#12456)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4a94d25b6d2de94f92c2795eb40e585940198bb7
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri Jul 29 13:36:40 2022 -0700

    MINOR: convert some more junit tests to support KRaft (#12456)
    
    * MINOR: convert some more junit tests to support KRaft
    
    Introduce TestUtils#waitUntilLeaderIsElectedOrChangedWithAdmin, a ZK-free alternative to
    TestUtils#waitUntilLeaderIsElectedOrChanged.
    
    Convert PlaintextProducerSendTest, SslProducerSendTest, TransactionsWithMaxInFlightOneTest,
    AddPartitionsToTxnRequestServerTest and KafkaMetricsReporterTest to support KRaft
    
    Reviewers: dengziming <de...@gmail.com>, David Arthur <mu...@gmail.com>
---
 .../kafka/api/BaseProducerSendTest.scala           | 102 ++++++++++++---------
 .../kafka/api/PlaintextProducerSendTest.scala      |  51 ++++++-----
 .../api/TransactionsWithMaxInFlightOneTest.scala   |  24 ++---
 .../kafka/server/QuorumTestHarness.scala           |  40 +++++---
 .../AddPartitionsToTxnRequestServerTest.scala      |  16 ++--
 .../kafka/server/KafkaMetricsReporterTest.scala    |  55 ++++++-----
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  60 +++++++++++-
 7 files changed, 226 insertions(+), 122 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 61870b073d8..ce3cd32afde 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -19,22 +19,24 @@ package kafka.api
 
 import java.time.Duration
 import java.nio.charset.StandardCharsets
-import java.util.Properties
+import java.util.{Collections, Properties}
 import java.util.concurrent.TimeUnit
-
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.{Admin, NewPartitions}
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.Buffer
@@ -42,16 +44,17 @@ import scala.concurrent.ExecutionException
 
 abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
-  def generateConfigs = {
+  def generateConfigs: scala.collection.Seq[KafkaConfig] = {
     val overridingProps = new Properties()
     val numServers = 2
     overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
-    TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+    TestUtils.createBrokerConfigs(numServers, zkConnectOrNull, false, interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
   }
 
   private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
   private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+  protected var admin: Admin = null
 
   protected val topic = "topic"
   private val numRecords = 100
@@ -59,6 +62,15 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
+
+    admin = TestUtils.createAdminClient(brokers, listenerName,
+        TestUtils.securityConfigs(Mode.CLIENT,
+          securityProtocol,
+          trustStoreFile,
+          "adminClient",
+          TestUtils.SslCertificateCn,
+          clientSaslProperties))
+
     consumer = TestUtils.createConsumer(
       bootstrapServers(listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
       securityProtocol = SecurityProtocol.PLAINTEXT
@@ -70,6 +82,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     consumer.close()
     // Ensure that all producers are closed since unclosed producers impact other tests when Kafka server ports are reused
     producers.foreach(_.close())
+    admin.close()
 
     super.tearDown()
   }
@@ -105,8 +118,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected.
    * 2. Last message of the non-blocking send should return the correct offset metadata
    */
-  @Test
-  def testSendOffset(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendOffset(quorum: String): Unit = {
     val producer = createProducer()
     val partition = 0
 
@@ -134,7 +148,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
     try {
       // create topic
-      createTopic(topic, 1, 2)
+      TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
 
       // send a normal record
       val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes(StandardCharsets.UTF_8),
@@ -166,8 +180,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     }
   }
 
-  @Test
-  def testSendCompressedMessageWithCreateTime(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendCompressedMessageWithCreateTime(quorum: String): Unit = {
     val producer = createProducer(
       compressionType = "gzip",
       lingerMs = Int.MaxValue,
@@ -175,8 +190,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
 
-  @Test
-  def testSendNonCompressedMessageWithCreateTime(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendNonCompressedMessageWithCreateTime(quorum: String): Unit = {
     val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
@@ -186,7 +202,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
                               timeoutMs: Long = 20000L): Unit = {
     val partition = 0
     try {
-      createTopic(topic, 1, 2)
+      TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
 
       val futures = for (i <- 1 to numRecords) yield {
         val record = new ProducerRecord(topic, partition, s"key$i".getBytes(StandardCharsets.UTF_8),
@@ -241,7 +257,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "LogAppendTime")
       else
         topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "CreateTime")
-      createTopic(topic, 1, 2, topicProps)
+      TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps)
 
       val recordAndFutures = for (i <- 1 to numRecords) yield {
         val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes(StandardCharsets.UTF_8),
@@ -267,13 +283,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    *
    * After close() returns, all messages should be sent with correct returned offset metadata
    */
-  @Test
-  def testClose(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testClose(quorum: String): Unit = {
     val producer = createProducer()
 
     try {
       // create topic
-      createTopic(topic, 1, 2)
+      TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
 
       // non-blocking send a list of records
       val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes(StandardCharsets.UTF_8),
@@ -300,12 +317,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    *
    * The specified partition-id should be respected
    */
-  @Test
-  def testSendToPartition(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendToPartition(quorum: String): Unit = {
     val producer = createProducer()
 
     try {
-      createTopic(topic, 2, 2)
+      TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
       val partition = 1
 
       val now = System.currentTimeMillis()
@@ -345,14 +363,15 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     * Producer will attempt to send messages to the partition specified in each record, and should
     * succeed as long as the partition is included in the metadata.
     */
-  @Test
-  def testSendBeforeAndAfterPartitionExpansion(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendBeforeAndAfterPartitionExpansion(quorum: String): Unit = {
     val producer = createProducer(maxBlockMs = 5 * 1000L)
 
     // create topic
-    createTopic(topic, 1, 2)
-    val partition0 = 0
+    TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
 
+    val partition0 = 0
     var futures0 = (1 to numRecords).map { i =>
       producer.send(new ProducerRecord(topic, partition0, null, ("value" + i).getBytes(StandardCharsets.UTF_8)))
     }.map(_.get(30, TimeUnit.SECONDS))
@@ -369,13 +388,11 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     val e = assertThrows(classOf[ExecutionException], () => producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes(StandardCharsets.UTF_8))).get())
     assertEquals(classOf[TimeoutException], e.getCause.getClass)
 
-    val existingAssignment = zkClient.getFullReplicaAssignmentForTopics(Set(topic)).map {
-      case (topicPartition, assignment) => topicPartition.partition -> assignment
-    }
-    adminZkClient.addPartitions(topic, existingAssignment, adminZkClient.getBrokerMetadatas(), 2)
+    admin.createPartitions(Collections.singletonMap(topic, NewPartitions.increaseTo(2))).all().get()
+
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic, 0)
-    TestUtils.waitForPartitionMetadata(servers, topic, 1)
+    TestUtils.waitForPartitionMetadata(brokers, topic, 0)
+    TestUtils.waitForPartitionMetadata(brokers, topic, 1)
 
     // send records to the newly added partition after confirming that metadata have been updated.
     val futures1 = (1 to numRecords).map { i =>
@@ -404,11 +421,12 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   /**
    * Test that flush immediately sends all accumulated requests.
    */
-  @Test
-  def testFlush(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testFlush(quorum: String): Unit = {
     val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
     try {
-      createTopic(topic, 2, 2)
+      TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
       val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
         "value".getBytes(StandardCharsets.UTF_8))
       for (_ <- 0 until 50) {
@@ -425,9 +443,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   /**
    * Test close with zero timeout from caller thread
    */
-  @Test
-  def testCloseWithZeroTimeoutFromCallerThread(): Unit = {
-    createTopic(topic, 2, 2)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCloseWithZeroTimeoutFromCallerThread(quorum: String): Unit = {
+    TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
     val partition = 0
     consumer.assign(List(new TopicPartition(topic, partition)).asJava)
     val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null,
@@ -450,9 +469,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   /**
    * Test close with zero and non-zero timeout from sender thread
    */
-  @Test
-  def testCloseWithZeroTimeoutFromSenderThread(): Unit = {
-    createTopic(topic, 1, 2)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCloseWithZeroTimeoutFromSenderThread(quorum: String): Unit = {
+    TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
     val partition = 0
     consumer.assign(List(new TopicPartition(topic, partition)).asJava)
     val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes(StandardCharsets.UTF_8))
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 06ff201e0b2..c25eb184b3e 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -19,22 +19,23 @@ package kafka.api
 
 import java.util.Properties
 import java.util.concurrent.{ExecutionException, Future, TimeUnit}
-
 import kafka.log.LogConfig
 import kafka.server.Defaults
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.producer.{BufferExhaustedException, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
 import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException}
 import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType}
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 
 class PlaintextProducerSendTest extends BaseProducerSendTest {
 
-  @Test
-  def testWrongSerializer(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testWrongSerializer(quorum: String): Unit = {
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
@@ -44,8 +45,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     assertThrows(classOf[SerializationException], () => producer.send(record))
   }
 
-  @Test
-  def testBatchSizeZero(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testBatchSizeZero(quorum: String): Unit = {
     val producer = createProducer(
       lingerMs = Int.MaxValue,
       deliveryTimeoutMs = Int.MaxValue,
@@ -53,8 +55,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     sendAndVerify(producer)
   }
 
-  @Test
-  def testSendCompressedMessageWithLogAppendTime(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendCompressedMessageWithLogAppendTime(quorum: String): Unit = {
     val producer = createProducer(
       compressionType = "gzip",
       lingerMs = Int.MaxValue,
@@ -62,8 +65,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
   }
 
-  @Test
-  def testSendNonCompressedMessageWithLogAppendTime(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendNonCompressedMessageWithLogAppendTime(quorum: String): Unit = {
     val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
   }
@@ -73,8 +77,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
    *
    * The topic should be created upon sending the first message
    */
-  @Test
-  def testAutoCreateTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAutoCreateTopic(quorum: String): Unit = {
     val producer = createProducer()
     try {
       // Send a message to auto-create the topic
@@ -82,18 +87,18 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
       assertEquals(0L, producer.send(record).get.offset, "Should have offset 0")
 
       // double check that the topic is created with leader elected
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-
+      TestUtils.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic, 0)
     } finally {
       producer.close()
     }
   }
 
-  @Test
-  def testSendWithInvalidCreateTime(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendWithInvalidCreateTime(quorum: String): Unit = {
     val topicProps = new Properties()
     topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000")
-    createTopic(topic, 1, 2, topicProps)
+    TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps)
 
     val producer = createProducer()
     try {
@@ -118,8 +123,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
   // Test that producer with max.block.ms=0 can be used to send in non-blocking mode
   // where requests are failed immediately without blocking if metadata is not available
   // or buffer is full.
-  @Test
-  def testNonBlockingProducer(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testNonBlockingProducer(quorum: String): Unit = {
 
     def send(producer: KafkaProducer[Array[Byte],Array[Byte]]): Future[RecordMetadata] = {
       producer.send(new ProducerRecord(topic, 0, "key".getBytes, new Array[Byte](1000)))
@@ -173,8 +179,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     verifySendSuccess(future2)               // previous batch should be completed and sent now
   }
 
-  @Test
-  def testSendRecordBatchWithMaxRequestSizeAndHigher(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendRecordBatchWithMaxRequestSizeAndHigher(quorum: String): Unit = {
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
     val producer = registerProducer(new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer))
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
index eacc58e76cc..5dd82b6b224 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
@@ -18,15 +18,16 @@
 package kafka.api
 
 import java.util.Properties
-
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import kafka.utils.TestUtils.consumeRecords
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.collection.Seq
 import scala.collection.mutable.Buffer
@@ -37,7 +38,7 @@ import scala.jdk.CollectionConverters._
  * A single broker is used to verify edge cases where different requests are queued on the same connection.
  */
 class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
-  val numServers = 1
+  val numBrokers = 1
 
   val topic1 = "topic1"
   val topic2 = "topic2"
@@ -47,7 +48,7 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
   val transactionalConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
 
   override def generateConfigs: Seq[KafkaConfig] = {
-    TestUtils.createBrokerConfigs(numServers, zkConnect).map(KafkaConfig.fromProps(_, serverProps()))
+    TestUtils.createBrokerConfigs(numBrokers, zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
   }
 
   @BeforeEach
@@ -55,8 +56,8 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
     super.setUp(testInfo)
     val topicConfig = new Properties()
     topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 1.toString)
-    createTopic(topic1, numPartitions, numServers, topicConfig)
-    createTopic(topic2, numPartitions, numServers, topicConfig)
+    createTopic(topic1, numPartitions, numBrokers, topicConfig)
+    createTopic(topic2, numPartitions, numBrokers, topicConfig)
 
     createTransactionalProducer("transactional-producer")
     createReadCommittedConsumer("transactional-group")
@@ -69,10 +70,11 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
     super.tearDown()
   }
 
-  @Test
-  def testTransactionalProducerSingleBrokerMaxInFlightOne(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionalProducerSingleBrokerMaxInFlightOne(quorum: String): Unit = {
     // We want to test with one broker to verify multiple requests queued on a connection
-    assertEquals(1, servers.size)
+    assertEquals(1, brokers.size)
 
     val producer = transactionalProducers.head
     val consumer = transactionalConsumers.head
@@ -124,7 +126,7 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
   }
 
   private def createTransactionalProducer(transactionalId: String): KafkaProducer[Array[Byte], Array[Byte]] = {
-    val producer = TestUtils.createTransactionalProducer(transactionalId, servers, maxInFlight = 1)
+    val producer = TestUtils.createTransactionalProducer(transactionalId, brokers, maxInFlight = 1)
     transactionalProducers += producer
     producer
   }
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 766eb3a7e53..9894df9c5f7 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -48,9 +48,12 @@ import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
 trait QuorumImplementation {
-  def createBroker(config: KafkaConfig,
-                   time: Time,
-                   startup: Boolean): KafkaBroker
+  def createBroker(
+    config: KafkaConfig,
+    time: Time = Time.SYSTEM,
+    startup: Boolean = true,
+    threadNamePrefix: Option[String] = None,
+  ): KafkaBroker
 
   def shutdown(): Unit
 }
@@ -62,10 +65,13 @@ class ZooKeeperQuorumImplementation(
   val adminZkClient: AdminZkClient,
   val log: Logging
 ) extends QuorumImplementation {
-  override def createBroker(config: KafkaConfig,
-                            time: Time,
-                            startup: Boolean): KafkaBroker = {
-    val server = new KafkaServer(config, time, None, false)
+  override def createBroker(
+    config: KafkaConfig,
+    time: Time,
+    startup: Boolean,
+    threadNamePrefix: Option[String],
+  ): KafkaBroker = {
+    val server = new KafkaServer(config, time, threadNamePrefix, false)
     if (startup) server.startup()
     server
   }
@@ -82,9 +88,12 @@ class KRaftQuorumImplementation(val raftManager: KafkaRaftManager[ApiMessageAndV
                                 val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
                                 val clusterId: String,
                                 val log: Logging) extends QuorumImplementation {
-  override def createBroker(config: KafkaConfig,
-                            time: Time,
-                            startup: Boolean): KafkaBroker = {
+  override def createBroker(
+    config: KafkaConfig,
+    time: Time,
+    startup: Boolean,
+    threadNamePrefix: Option[String],
+  ): KafkaBroker = {
     val broker = new BrokerServer(config = config,
       metaProps = new MetaProperties(clusterId, config.nodeId),
       raftManager = raftManager,
@@ -222,10 +231,13 @@ abstract class QuorumTestHarness extends Logging {
     }
   }
 
-  def createBroker(config: KafkaConfig,
-                   time: Time = Time.SYSTEM,
-                   startup: Boolean = true): KafkaBroker = {
-    implementation.createBroker(config, time, startup)
+  def createBroker(
+    config: KafkaConfig,
+    time: Time = Time.SYSTEM,
+    startup: Boolean = true,
+    threadNamePrefix: Option[String] = None
+  ): KafkaBroker = {
+    implementation.createBroker(config, time, startup, threadNamePrefix)
   }
 
   def shutdownZooKeeper(): Unit = asZk().shutdown()
diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
index 0a98d2626cd..74320e62b49 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
@@ -17,13 +17,16 @@
 
 package kafka.server
 
-import java.util.Properties
+import kafka.utils.TestInfoUtils
 
+import java.util.Properties
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 
@@ -37,11 +40,12 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    createTopic(topic1, numPartitions, servers.size, new Properties())
+    createTopic(topic1, numPartitions, brokers.size, new Properties())
   }
 
-  @Test
-  def shouldReceiveOperationNotAttemptedWhenOtherPartitionHasError(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def shouldReceiveOperationNotAttemptedWhenOtherPartitionHasError(quorum: String): Unit = {
     // The basic idea is that we have one unknown topic and one created topic. We should get the 'UNKNOWN_TOPIC_OR_PARTITION'
     // error for the unknown topic and the 'OPERATION_NOT_ATTEMPTED' error for the known and authorized topic.
     val nonExistentTopic = new TopicPartition("unknownTopic", 0)
@@ -58,7 +62,7 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {
       List(createdTopicPartition, nonExistentTopic).asJava)
       .build()
 
-    val leaderId = servers.head.config.brokerId
+    val leaderId = brokers.head.config.brokerId
     val response = connectAndReceive[AddPartitionsToTxnResponse](request, brokerSocketServer(leaderId))
 
     assertEquals(2, response.errors.size)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index 7e5d791db25..1adf544819f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -17,15 +17,13 @@
 package kafka.server
 
 import java.util
-
 import java.util.concurrent.atomic.AtomicReference
-
-import kafka.utils.{CoreUtils, TestUtils}
-import kafka.server.QuorumTestHarness
+import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
 import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter}
-import org.junit.jupiter.api.Assertions.{assertEquals}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 
 object KafkaMetricsReporterTest {
@@ -43,52 +41,63 @@ object KafkaMetricsReporterTest {
     override def contextChange(metricsContext: MetricsContext): Unit = {
       //read jmxPrefix
 
-      MockMetricsReporter.JMXPREFIX.set(metricsContext.contextLabels().get("_namespace").toString)
-      MockMetricsReporter.CLUSTERID.set(metricsContext.contextLabels().get("kafka.cluster.id").toString)
-      MockMetricsReporter.BROKERID.set(metricsContext.contextLabels().get("kafka.broker.id").toString)
+      MockMetricsReporter.JMXPREFIX.set(contextLabelOrNull("_namespace", metricsContext))
+      MockMetricsReporter.CLUSTERID.set(contextLabelOrNull("kafka.cluster.id", metricsContext))
+      MockMetricsReporter.BROKERID.set(contextLabelOrNull("kafka.broker.id", metricsContext))
+      MockMetricsReporter.NODEID.set(contextLabelOrNull("kafka.node.id", metricsContext))
     }
 
-    override def configure(configs: util.Map[String, _]): Unit = {}
+    private def contextLabelOrNull(name: String, metricsContext: MetricsContext): String = {
+      Option(metricsContext.contextLabels().get(name)).flatMap(v => Option(v.toString())).getOrElse(null)
+    }
 
+    override def configure(configs: util.Map[String, _]): Unit = {}
   }
 
   object MockMetricsReporter {
     val JMXPREFIX: AtomicReference[String] = new AtomicReference[String]
     val BROKERID : AtomicReference[String] = new AtomicReference[String]
+    val NODEID : AtomicReference[String] = new AtomicReference[String]
     val CLUSTERID : AtomicReference[String] = new AtomicReference[String]
   }
 }
 
 class KafkaMetricsReporterTest extends QuorumTestHarness {
-  var server: KafkaServer = null
+  var broker: KafkaBroker = null
   var config: KafkaConfig = null
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    val props = TestUtils.createBrokerConfig(1, zkConnectOrNull)
     props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricsReporterTest$MockMetricsReporter")
     props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true")
-    props.setProperty(KafkaConfig.BrokerIdProp, "-1")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
     config = KafkaConfig.fromProps(props)
-    server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
-    server.startup()
+    broker = createBroker(config, threadNamePrefix = Option(this.getClass.getName))
+    broker.startup()
   }
 
-  @Test
-  def testMetricsContextNamespacePresent(): Unit = {
-    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID)
-    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID)
-    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX)
-    assertEquals("kafka.server", KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMetricsContextNamespacePresent(quorum: String): Unit = {
+    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get())
+    if (isKRaftTest()) {
+      assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
+      assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
+    } else {
+      assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
+      assertNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
+    }
+    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())
 
-    server.shutdown()
+    broker.shutdown()
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
-    server.shutdown()
+    broker.shutdown()
     CoreUtils.delete(config.logDirs)
     super.tearDown()
   }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d0266bdee9d..1e0d5981dac 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -47,7 +47,7 @@ import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.TopicIdPartition
+import org.apache.kafka.common.{KafkaFuture, Node, TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
 import org.apache.kafka.common.config.{ConfigException, ConfigResource}
 import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
@@ -67,7 +67,6 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerd
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{KafkaFuture, TopicPartition, Uuid}
 import org.apache.kafka.controller.QuorumController
 import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
 import org.apache.kafka.server.common.MetadataVersion
@@ -871,8 +870,59 @@ object TestUtils extends Logging {
    *         LeaderDuringDelete).
    * @throws AssertionError if the expected condition is not true within the timeout.
    */
-  def waitUntilLeaderIsElectedOrChanged(zkClient: KafkaZkClient, topic: String, partition: Int, timeoutMs: Long = 30000L,
-                                        oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Int = {
+  def waitUntilLeaderIsElectedOrChanged(
+    zkClient: KafkaZkClient,
+    topic: String,
+    partition: Int,
+    timeoutMs: Long = 30000L,
+    oldLeaderOpt: Option[Int] = None,
+    newLeaderOpt: Option[Int] = None
+  ): Int = {
+    def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
+      zkClient.getLeaderForPartition(new TopicPartition(topic, partition))
+    }
+    doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, timeoutMs, oldLeaderOpt, newLeaderOpt)
+  }
+
+  /**
+   *  If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the leader of a partition is elected.
+   *  If oldLeaderOpt is defined, it waits until the new leader is different from the old leader.
+   *  If newLeaderOpt is defined, it waits until the new leader becomes the expected new leader.
+   *
+   * @return The new leader (note that negative values are used to indicate conditions like NoLeader and
+   *         LeaderDuringDelete).
+   * @throws AssertionError if the expected condition is not true within the timeout.
+   */
+  def waitUntilLeaderIsElectedOrChangedWithAdmin(
+    admin: Admin,
+    topic: String,
+    partition: Int,
+    timeoutMs: Long = 30000L,
+    oldLeaderOpt: Option[Int] = None,
+    newLeaderOpt: Option[Int] = None
+  ): Int = {
+    def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
+      admin.describeTopics(Collections.singletonList(topic)).allTopicNames().get().get(topic).partitions().asScala.
+        find(_.partition() == partition).
+        flatMap { p =>
+          if (p.leader().id() == Node.noNode().id()) {
+            None
+          } else {
+            Some(p.leader().id())
+          }
+        }
+    }
+    doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, timeoutMs, oldLeaderOpt, newLeaderOpt)
+  }
+
+  private def doWaitUntilLeaderIsElectedOrChanged(
+    getPartitionLeader: (String, Int) => Option[Int],
+    topic: String,
+    partition: Int,
+    timeoutMs: Long,
+    oldLeaderOpt: Option[Int],
+    newLeaderOpt: Option[Int]
+  ): Int = {
     require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader")
     val startTime = System.currentTimeMillis()
     val topicPartition = new TopicPartition(topic, partition)
@@ -884,7 +934,7 @@ object TestUtils extends Logging {
     var electedOrChangedLeader: Option[Int] = None
     while (electedOrChangedLeader.isEmpty && System.currentTimeMillis() < startTime + timeoutMs) {
       // check if leader is elected
-      leader = zkClient.getLeaderForPartition(topicPartition)
+      leader = getPartitionLeader(topic, partition)
       leader match {
         case Some(l) => (newLeaderOpt, oldLeaderOpt) match {
           case (Some(newLeader), _) if newLeader == l =>


[kafka] 05/07: KAFKA-13166 Fix missing ControllerApis error handling (#12403)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 480e97914e1146ba79ae883c4e987f5de20702cb
Author: David Arthur <mu...@gmail.com>
AuthorDate: Tue Jul 26 19:08:59 2022 -0400

    KAFKA-13166 Fix missing ControllerApis error handling (#12403)
    
    Makes all ControllerApis request handlers return a `CompletableFuture[Unit]`. Also adds an additional completion stage which ensures we capture errors thrown during response building.
    
    Reviewed-by: Colin P. McCabe <cm...@apache.org>
---
 core/src/main/scala/kafka/server/AclApis.scala     |  32 ++++--
 .../main/scala/kafka/server/ControllerApis.scala   | 121 ++++++++++++---------
 .../unit/kafka/server/ControllerApisTest.scala     |  30 +++++
 3 files changed, 124 insertions(+), 59 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AclApis.scala b/core/src/main/scala/kafka/server/AclApis.scala
index 97b685bc0aa..485cafeca20 100644
--- a/core/src/main/scala/kafka/server/AclApis.scala
+++ b/core/src/main/scala/kafka/server/AclApis.scala
@@ -24,14 +24,16 @@ import org.apache.kafka.common.acl.AclOperation._
 import org.apache.kafka.common.acl.AclBinding
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.message.CreateAclsResponseData.AclCreationResult
+import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult
 import org.apache.kafka.common.message._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
 import org.apache.kafka.common.resource.ResourceType
 import org.apache.kafka.server.authorizer._
-import java.util
 
+import java.util
+import java.util.concurrent.CompletableFuture
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
@@ -53,7 +55,7 @@ class AclApis(authHelper: AuthHelper,
 
   def close(): Unit = alterAclsPurgatory.shutdown()
 
-  def handleDescribeAcls(request: RequestChannel.Request): Unit = {
+  def handleDescribeAcls(request: RequestChannel.Request): CompletableFuture[Unit] = {
     authHelper.authorizeClusterOperation(request, DESCRIBE)
     val describeAclsRequest = request.body[DescribeAclsRequest]
     authorizer match {
@@ -74,9 +76,10 @@ class AclApis(authHelper: AuthHelper,
             .setResources(DescribeAclsResponse.aclsResources(returnedAcls)),
           describeAclsRequest.version))
     }
+    CompletableFuture.completedFuture[Unit](())
   }
 
-  def handleCreateAcls(request: RequestChannel.Request): Unit = {
+  def handleCreateAcls(request: RequestChannel.Request): CompletableFuture[Unit] = {
     authHelper.authorizeClusterOperation(request, ALTER)
     val createAclsRequest = request.body[CreateAclsRequest]
 
@@ -84,6 +87,7 @@ class AclApis(authHelper: AuthHelper,
       case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
         createAclsRequest.getErrorResponse(requestThrottleMs,
           new SecurityDisabledException("No Authorizer is configured.")))
+        CompletableFuture.completedFuture[Unit](())
       case Some(auth) =>
         val allBindings = createAclsRequest.aclCreations.asScala.map(CreateAclsRequest.aclBinding)
         val errorResults = mutable.Map[AclBinding, AclCreateResult]()
@@ -103,6 +107,7 @@ class AclApis(authHelper: AuthHelper,
             validBindings += acl
         }
 
+        val future = new CompletableFuture[util.List[AclCreationResult]]()
         val createResults = auth.createAcls(request.context, validBindings.asJava).asScala.map(_.toCompletableFuture)
 
         def sendResponseCallback(): Unit = {
@@ -117,17 +122,20 @@ class AclApis(authHelper: AuthHelper,
             }
             creationResult
           }
+          future.complete(aclCreationResults.asJava)
+        }
+        alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, createResults, sendResponseCallback)
+
+        future.thenApply[Unit] { aclCreationResults =>
           requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
             new CreateAclsResponse(new CreateAclsResponseData()
               .setThrottleTimeMs(requestThrottleMs)
-              .setResults(aclCreationResults.asJava)))
+              .setResults(aclCreationResults)))
         }
-
-        alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, createResults, sendResponseCallback)
     }
   }
 
-  def handleDeleteAcls(request: RequestChannel.Request): Unit = {
+  def handleDeleteAcls(request: RequestChannel.Request): CompletableFuture[Unit] = {
     authHelper.authorizeClusterOperation(request, ALTER)
     val deleteAclsRequest = request.body[DeleteAclsRequest]
     authorizer match {
@@ -135,13 +143,20 @@ class AclApis(authHelper: AuthHelper,
         requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
           deleteAclsRequest.getErrorResponse(requestThrottleMs,
             new SecurityDisabledException("No Authorizer is configured.")))
+        CompletableFuture.completedFuture[Unit](())
       case Some(auth) =>
 
+        val future = new CompletableFuture[util.List[DeleteAclsFilterResult]]()
         val deleteResults = auth.deleteAcls(request.context, deleteAclsRequest.filters)
           .asScala.map(_.toCompletableFuture).toList
 
         def sendResponseCallback(): Unit = {
           val filterResults = deleteResults.map(_.get).map(DeleteAclsResponse.filterResult).asJava
+          future.complete(filterResults)
+        }
+
+        alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, deleteResults, sendResponseCallback)
+        future.thenApply[Unit] { filterResults =>
           requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
             new DeleteAclsResponse(
               new DeleteAclsResponseData()
@@ -149,7 +164,6 @@ class AclApis(authHelper: AuthHelper,
                 .setFilterResults(filterResults),
               deleteAclsRequest.version))
         }
-        alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, deleteResults, sendResponseCallback)
     }
   }
-}
+ }
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 74bc4dd4067..efb6a36c3db 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -20,7 +20,7 @@ package kafka.server
 import java.util
 import java.util.{Collections, OptionalLong}
 import java.util.Map.Entry
-import java.util.concurrent.{CompletableFuture, ExecutionException}
+import java.util.concurrent.{CompletableFuture, CompletionException}
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
@@ -78,7 +78,7 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     try {
-      request.header.apiKey match {
+      val handlerFuture: CompletableFuture[Unit] = request.header.apiKey match {
         case ApiKeys.FETCH => handleFetch(request)
         case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
         case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
@@ -109,10 +109,24 @@ class ControllerApis(val requestChannel: RequestChannel,
         case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
         case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
       }
+
+      // This catches exceptions in the future and subsequent completion stages returned by the request handlers.
+      handlerFuture.whenComplete { (_, exception) =>
+        if (exception != null) {
+          // CompletionException does not include the stack frames in its "cause" exception, so we need to
+          // log the original exception here
+          error(s"Unexpected error handling request ${request.requestDesc(true)} " +
+            s"with context ${request.context}", exception)
+
+          // For building the correct error request, we do need send the "cause" exception
+          val actualException = if (exception.isInstanceOf[CompletionException]) exception.getCause else exception
+          requestHelper.handleError(request, actualException)
+        }
+      }
     } catch {
       case e: FatalExitError => throw e
-      case e: Throwable => {
-        val t = if (e.isInstanceOf[ExecutionException]) e.getCause else e
+      case t: Throwable => {
+        // This catches exceptions in the blocking parts of the request handlers
         error(s"Unexpected error handling request ${request.requestDesc(true)} " +
           s"with context ${request.context}", t)
         requestHelper.handleError(request, t)
@@ -125,38 +139,41 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleEnvelopeRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
+  def handleEnvelopeRequest(request: RequestChannel.Request, requestLocal: RequestLocal): CompletableFuture[Unit] = {
     if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
       requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException(
         s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope"))
     } else {
       EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle(_, requestLocal))
     }
+    CompletableFuture.completedFuture[Unit](())
   }
 
-  def handleSaslHandshakeRequest(request: RequestChannel.Request): Unit = {
+  def handleSaslHandshakeRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val responseData = new SaslHandshakeResponseData().setErrorCode(ILLEGAL_SASL_STATE.code)
     requestHelper.sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(responseData))
+    CompletableFuture.completedFuture[Unit](())
   }
 
-  def handleSaslAuthenticateRequest(request: RequestChannel.Request): Unit = {
+  def handleSaslAuthenticateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val responseData = new SaslAuthenticateResponseData()
       .setErrorCode(ILLEGAL_SASL_STATE.code)
       .setErrorMessage("SaslAuthenticate request received after successful authentication")
     requestHelper.sendResponseMaybeThrottle(request, _ => new SaslAuthenticateResponse(responseData))
+    CompletableFuture.completedFuture[Unit](())
   }
 
-  def handleFetch(request: RequestChannel.Request): Unit = {
+  def handleFetch(request: RequestChannel.Request): CompletableFuture[Unit] = {
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     handleRaftRequest(request, response => new FetchResponse(response.asInstanceOf[FetchResponseData]))
   }
 
-  def handleFetchSnapshot(request: RequestChannel.Request): Unit = {
+  def handleFetchSnapshot(request: RequestChannel.Request): CompletableFuture[Unit] = {
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     handleRaftRequest(request, response => new FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData]))
   }
 
-  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+  def handleDeleteTopics(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val deleteTopicsRequest = request.body[DeleteTopicsRequest]
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       requestTimeoutMsToDeadlineNs(time, deleteTopicsRequest.data.timeoutMs))
@@ -166,7 +183,7 @@ class ControllerApis(val requestChannel: RequestChannel,
       authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME, logIfDenied = false),
       names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
       names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
-    future.whenComplete { (results, exception) =>
+    future.handle[Unit] { (results, exception) =>
       requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
         if (exception != null) {
           deleteTopicsRequest.getErrorResponse(throttleTimeMs, exception)
@@ -320,7 +337,7 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleCreateTopics(request: RequestChannel.Request): Unit = {
+  def handleCreateTopics(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val createTopicsRequest = request.body[CreateTopicsRequest]
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       requestTimeoutMsToDeadlineNs(time, createTopicsRequest.data.timeoutMs))
@@ -330,7 +347,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(identity),
         names => authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
             names, logIfDenied = false)(identity))
-    future.whenComplete { (result, exception) =>
+    future.handle[Unit] { (result, exception) =>
       requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
         if (exception != null) {
           createTopicsRequest.getErrorResponse(throttleTimeMs, exception)
@@ -392,7 +409,7 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleApiVersionsRequest(request: RequestChannel.Request): Unit = {
+  def handleApiVersionsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     // Note that broker returns its full list of supported ApiKeys and versions regardless of current
     // authentication state (e.g., before SASL authentication on an SASL listener, do note that no
     // Kafka protocol requests may take place on an SSL listener before the SSL handshake is finished).
@@ -410,6 +427,7 @@ class ControllerApis(val requestChannel: RequestChannel,
       }
     }
     requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
+    CompletableFuture.completedFuture[Unit](())
   }
 
   def authorizeAlterResource(requestContext: RequestContext,
@@ -431,7 +449,7 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleLegacyAlterConfigs(request: RequestChannel.Request): Unit = {
+  def handleLegacyAlterConfigs(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val response = new AlterConfigsResponseData()
     val alterConfigsRequest = request.body[AlterConfigsRequest]
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty())
@@ -474,7 +492,7 @@ class ControllerApis(val requestChannel: RequestChannel,
       }
     }
     controller.legacyAlterConfigs(context, configChanges, alterConfigsRequest.data.validateOnly)
-      .whenComplete { (controllerResults, exception) =>
+      .handle[Unit] { (controllerResults, exception) =>
         if (exception != null) {
           requestHelper.handleError(request, exception)
         } else {
@@ -490,33 +508,33 @@ class ControllerApis(val requestChannel: RequestChannel,
       }
   }
 
-  def handleVote(request: RequestChannel.Request): Unit = {
+  def handleVote(request: RequestChannel.Request): CompletableFuture[Unit] = {
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     handleRaftRequest(request, response => new VoteResponse(response.asInstanceOf[VoteResponseData]))
   }
 
-  def handleBeginQuorumEpoch(request: RequestChannel.Request): Unit = {
+  def handleBeginQuorumEpoch(request: RequestChannel.Request): CompletableFuture[Unit] = {
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     handleRaftRequest(request, response => new BeginQuorumEpochResponse(response.asInstanceOf[BeginQuorumEpochResponseData]))
   }
 
-  def handleEndQuorumEpoch(request: RequestChannel.Request): Unit = {
+  def handleEndQuorumEpoch(request: RequestChannel.Request): CompletableFuture[Unit] = {
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     handleRaftRequest(request, response => new EndQuorumEpochResponse(response.asInstanceOf[EndQuorumEpochResponseData]))
   }
 
-  def handleDescribeQuorum(request: RequestChannel.Request): Unit = {
+  def handleDescribeQuorum(request: RequestChannel.Request): CompletableFuture[Unit] = {
     authHelper.authorizeClusterOperation(request, DESCRIBE)
     handleRaftRequest(request, response => new DescribeQuorumResponse(response.asInstanceOf[DescribeQuorumResponseData]))
   }
 
-  def handleElectLeaders(request: RequestChannel.Request): Unit = {
+  def handleElectLeaders(request: RequestChannel.Request): CompletableFuture[Unit] = {
     authHelper.authorizeClusterOperation(request, ALTER)
     val electLeadersRequest = request.body[ElectLeadersRequest]
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       requestTimeoutMsToDeadlineNs(time, electLeadersRequest.data.timeoutMs))
     val future = controller.electLeaders(context, electLeadersRequest.data)
-    future.whenComplete { (responseData, exception) =>
+    future.handle[Unit] { (responseData, exception) =>
       if (exception != null) {
         requestHelper.sendResponseMaybeThrottle(request, throttleMs => {
           electLeadersRequest.getErrorResponse(throttleMs, exception)
@@ -529,13 +547,13 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleAlterPartitionRequest(request: RequestChannel.Request): Unit = {
+  def handleAlterPartitionRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val alterPartitionRequest = request.body[AlterPartitionRequest]
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       OptionalLong.empty())
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     val future = controller.alterPartition(context, alterPartitionRequest.data)
-    future.whenComplete { (result, exception) =>
+    future.handle[Unit] { (result, exception) =>
       val response = if (exception != null) {
         alterPartitionRequest.getErrorResponse(exception)
       } else {
@@ -545,7 +563,7 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleBrokerHeartBeatRequest(request: RequestChannel.Request): Unit = {
+  def handleBrokerHeartBeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val heartbeatRequest = request.body[BrokerHeartbeatRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
@@ -572,7 +590,7 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleUnregisterBroker(request: RequestChannel.Request): Unit = {
+  def handleUnregisterBroker(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val decommissionRequest = request.body[UnregisterBrokerRequest]
     authHelper.authorizeClusterOperation(request, ALTER)
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
@@ -595,7 +613,7 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleBrokerRegistration(request: RequestChannel.Request): Unit = {
+  def handleBrokerRegistration(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val registrationRequest = request.body[BrokerRegistrationRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
@@ -622,11 +640,10 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
 
   private def handleRaftRequest(request: RequestChannel.Request,
-                                buildResponse: ApiMessage => AbstractResponse): Unit = {
+                                buildResponse: ApiMessage => AbstractResponse): CompletableFuture[Unit] = {
     val requestBody = request.body[AbstractRequest]
     val future = raftManager.handleRequest(request.header, requestBody.data, time.milliseconds())
-
-    future.whenComplete { (responseData, exception) =>
+    future.handle[Unit] { (responseData, exception) =>
       val response = if (exception != null) {
         requestBody.getErrorResponse(exception)
       } else {
@@ -636,13 +653,13 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleAlterClientQuotas(request: RequestChannel.Request): Unit = {
+  def handleAlterClientQuotas(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val quotaRequest = request.body[AlterClientQuotasRequest]
     authHelper.authorizeClusterOperation(request, ALTER_CONFIGS)
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       OptionalLong.empty())
     controller.alterClientQuotas(context, quotaRequest.entries, quotaRequest.validateOnly)
-      .whenComplete { (results, exception) =>
+      .handle[Unit] { (results, exception) =>
         if (exception != null) {
           requestHelper.handleError(request, exception)
         } else {
@@ -652,7 +669,7 @@ class ControllerApis(val requestChannel: RequestChannel,
       }
   }
 
-  def handleIncrementalAlterConfigs(request: RequestChannel.Request): Unit = {
+  def handleIncrementalAlterConfigs(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val response = new IncrementalAlterConfigsResponseData()
     val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
@@ -700,7 +717,7 @@ class ControllerApis(val requestChannel: RequestChannel,
       }
     }
     controller.incrementalAlterConfigs(context, configChanges, alterConfigsRequest.data.validateOnly)
-      .whenComplete { (controllerResults, exception) =>
+      .handle[Unit] { (controllerResults, exception) =>
         if (exception != null) {
           requestHelper.handleError(request, exception)
         } else {
@@ -716,7 +733,7 @@ class ControllerApis(val requestChannel: RequestChannel,
       }
   }
 
-  def handleCreatePartitions(request: RequestChannel.Request): Unit = {
+  def handleCreatePartitions(request: RequestChannel.Request): CompletableFuture[Unit] = {
     def filterAlterAuthorizedTopics(topics: Iterable[String]): Set[String] = {
       authHelper.filterByAuthorized(request.context, ALTER, TOPIC, topics)(n => n)
     }
@@ -726,7 +743,7 @@ class ControllerApis(val requestChannel: RequestChannel,
     val future = createPartitions(context,
       createPartitionsRequest.data(),
       filterAlterAuthorizedTopics)
-    future.whenComplete { (responses, exception) =>
+    future.handle[Unit] { (responses, exception) =>
       if (exception != null) {
         requestHelper.handleError(request, exception)
       } else {
@@ -778,33 +795,37 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleAlterPartitionReassignments(request: RequestChannel.Request): Unit = {
+  def handleAlterPartitionReassignments(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val alterRequest = request.body[AlterPartitionReassignmentsRequest]
     authHelper.authorizeClusterOperation(request, ALTER)
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       requestTimeoutMsToDeadlineNs(time, alterRequest.data.timeoutMs))
-    val response = controller.alterPartitionReassignments(context, alterRequest.data).get()
-    requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-      new AlterPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
+    controller.alterPartitionReassignments(context, alterRequest.data)
+      .thenApply[Unit] { response =>
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+          new AlterPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
+      }
   }
 
-  def handleListPartitionReassignments(request: RequestChannel.Request): Unit = {
+  def handleListPartitionReassignments(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val listRequest = request.body[ListPartitionReassignmentsRequest]
     authHelper.authorizeClusterOperation(request, DESCRIBE)
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       OptionalLong.empty())
-    val response = controller.listPartitionReassignments(context, listRequest.data).get()
-    requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-      new ListPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
+    controller.listPartitionReassignments(context, listRequest.data)
+      .thenApply[Unit] { response =>
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+          new ListPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
+      }
   }
 
-  def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = {
+  def handleAllocateProducerIdsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val allocatedProducerIdsRequest = request.body[AllocateProducerIdsRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
         OptionalLong.empty())
     controller.allocateProducerIds(context, allocatedProducerIdsRequest.data)
-      .whenComplete((results, exception) => {
+      .handle[Unit] { (results, exception) =>
         if (exception != null) {
           requestHelper.handleError(request, exception)
         } else {
@@ -813,22 +834,22 @@ class ControllerApis(val requestChannel: RequestChannel,
             new AllocateProducerIdsResponse(results)
           })
         }
-      })
+      }
   }
 
-  def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
+  def handleUpdateFeatures(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
     authHelper.authorizeClusterOperation(request, ALTER)
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
       OptionalLong.empty())
     controller.updateFeatures(context, updateFeaturesRequest.data)
-      .whenComplete((response, exception) => {
+      .handle[Unit] { (response, exception) =>
         if (exception != null) {
           requestHelper.handleError(request, exception)
         } else {
           requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
             new UpdateFeaturesResponse(response.setThrottleTimeMs(requestThrottleMs)))
         }
-      })
+      }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 86a0c854705..0fc96114527 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -63,6 +63,7 @@ import java.net.InetAddress
 import java.util
 import java.util.Collections.singletonList
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 import java.util.{Collections, Properties}
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
@@ -902,6 +903,35 @@ class ControllerApisTest {
     }
   }
 
+  @Test
+  def testCompletableFutureExceptions(): Unit = {
+    // This test simulates an error in a completable future as we return from the controller. We need to ensure
+    // that any exception throw in the completion phase is properly captured and translated to an error response.
+    val request = buildRequest(new FetchRequest(new FetchRequestData(), 12))
+    val response = new FetchResponseData()
+    val responseFuture = new CompletableFuture[ApiMessage]()
+    val errorResponseFuture = new AtomicReference[AbstractResponse]()
+    when(raftManager.handleRequest(any(), any(), any())).thenReturn(responseFuture)
+    when(requestChannel.sendResponse(any(), any(), any())).thenAnswer { _ =>
+      // Simulate an encoding failure in the initial fetch response
+      throw new UnsupportedVersionException("Something went wrong")
+    }.thenAnswer { invocation =>
+      val resp = invocation.getArgument(1, classOf[AbstractResponse])
+      errorResponseFuture.set(resp)
+    }
+
+    // Calling handle does not block since we do not call get() in ControllerApis
+    createControllerApis(None,
+      new MockController.Builder().build()).handle(request, null)
+
+    // When we complete this future, the completion stages will fire (including the error handler in ControllerApis#request)
+    responseFuture.complete(response)
+
+    // Now we should get an error response with UNSUPPORTED_VERSION
+    val errorResponse = errorResponseFuture.get()
+    assertEquals(1, errorResponse.errorCounts().getOrDefault(Errors.UNSUPPORTED_VERSION, 0))
+  }
+
   @AfterEach
   def tearDown(): Unit = {
     quotas.shutdown()


[kafka] 07/07: KAFKA-14051: Create metrics reporters in KRaft remote controllers (#12396)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8c8cb111a4fb6f17d80fb55393ff4ffff03828a6
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Thu Jul 21 19:59:05 2022 -0400

    KAFKA-14051: Create metrics reporters in KRaft remote controllers (#12396)
    
    KRaft remote controllers do not yet support dynamic reconfiguration (https://issues.apache.org/jira/browse/KAFKA-14057). Until we implement that, in the meantime we see that the instantiation of the configured metric reporters is actually performed as part of the wiring for dynamic reconfiguration. Since that wiring does not exist yet for KRaft remote controllers, this patch refactors out the instantiation of the metric reporters from the reconfiguration of them and adjusts the contro [...]
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../main/scala/kafka/server/ControllerServer.scala | 15 +++++++
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 46 +++++++++++++++-------
 2 files changed, 46 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 28c98643c3a..212c092e1ab 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -27,6 +27,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.RaftManager
 import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPolicyClassNameProp}
+import kafka.server.KafkaRaftServer.BrokerRole
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.clients.ApiVersions
@@ -70,6 +71,8 @@ class ControllerServer(
 ) extends Logging with KafkaMetricsGroup {
   import kafka.server.Server._
 
+  config.dynamicConfig.initialize(zkClientOpt = None)
+
   val lock = new ReentrantLock()
   val awaitShutdownCond = lock.newCondition()
   var status: ProcessStatus = SHUTDOWN
@@ -99,6 +102,13 @@ class ControllerServer(
     true
   }
 
+  private def doRemoteKraftSetup(): Unit = {
+    // Explicitly configure metric reporters on this remote controller.
+    // We do not yet support dynamic reconfiguration on remote controllers in general;
+    // remove this once that is implemented.
+    new DynamicMetricReporterState(config.nodeId, config, metrics, clusterId)
+  }
+
   def clusterId: String = metaProperties.clusterId
 
   def startup(): Unit = {
@@ -206,6 +216,11 @@ class ControllerServer(
       }
       controller = controllerBuilder.build()
 
+      // Perform any setup that is done only when this node is a controller-only node.
+      if (!config.processRoles.contains(BrokerRole)) {
+        doRemoteKraftSetup()
+      }
+
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
       controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
         authorizer,
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 76a42b74fa5..a860938124e 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -30,7 +30,7 @@ import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SslConfigs}
-import org.apache.kafka.common.metrics.MetricsReporter
+import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
@@ -258,7 +258,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       case _ =>
     }
     addReconfigurable(kafkaServer.kafkaYammerMetrics)
-    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
+    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId))
     addReconfigurable(new DynamicClientQuotaCallback(kafkaServer))
 
     addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
@@ -748,17 +748,18 @@ class DynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {
   }
 }
 
-class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconfigurable {
+class DynamicMetricsReporters(brokerId: Int, config: KafkaConfig, metrics: Metrics, clusterId: String) extends Reconfigurable {
+  private val reporterState = new DynamicMetricReporterState(brokerId, config, metrics, clusterId)
+  private val currentReporters = reporterState.currentReporters
+  private val dynamicConfig = reporterState.dynamicConfig
 
-  private val dynamicConfig = server.config.dynamicConfig
-  private val metrics = server.metrics
-  private val propsOverride = Map[String, AnyRef](KafkaConfig.BrokerIdProp -> brokerId.toString)
-  private val currentReporters = mutable.Map[String, MetricsReporter]()
+  private def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] =
+    reporterState.metricsReporterClasses(configs)
 
-  createReporters(dynamicConfig.currentKafkaConfig.getList(KafkaConfig.MetricReporterClassesProp),
-    Collections.emptyMap[String, Object])
+  private def createReporters(reporterClasses: util.List[String], updatedConfigs: util.Map[String, _]): Unit =
+    reporterState.createReporters(reporterClasses, updatedConfigs)
 
-  private[server] def currentMetricsReporters: List[MetricsReporter] = currentReporters.values.toList
+  private def removeReporter(className: String): Unit = reporterState.removeReporter(className)
 
   override def configure(configs: util.Map[String, _]): Unit = {}
 
@@ -801,8 +802,23 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconf
     val added = updatedMetricsReporters.filterNot(currentReporters.keySet)
     createReporters(added.asJava, configs)
   }
+}
+
+class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Metrics, clusterId: String) {
+  private[server] val dynamicConfig = config.dynamicConfig
+  private val propsOverride = Map[String, AnyRef](KafkaConfig.BrokerIdProp -> brokerId.toString)
+  private[server] val currentReporters = mutable.Map[String, MetricsReporter]()
+  createReporters(config, clusterId, metricsReporterClasses(dynamicConfig.currentKafkaConfig.values()).asJava,
+    Collections.emptyMap[String, Object])
+
+  private[server] def createReporters(reporterClasses: util.List[String],
+                                      updatedConfigs: util.Map[String, _]): Unit = {
+    createReporters(config, clusterId, reporterClasses, updatedConfigs)
+  }
 
-  private def createReporters(reporterClasses: util.List[String],
+  private def createReporters(config: KafkaConfig,
+                              clusterId: String,
+                              reporterClasses: util.List[String],
                               updatedConfigs: util.Map[String, _]): Unit = {
     val props = new util.HashMap[String, AnyRef]
     updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef]))
@@ -811,19 +827,19 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconf
     // Call notifyMetricsReporters first to satisfy the contract for MetricsReporter.contextChange,
     // which provides that MetricsReporter.contextChange must be called before the first call to MetricsReporter.init.
     // The first call to MetricsReporter.init is done when we call metrics.addReporter below.
-    KafkaBroker.notifyMetricsReporters(server.clusterId, server.config, reporters.asScala)
+    KafkaBroker.notifyMetricsReporters(clusterId, config, reporters.asScala)
     reporters.forEach { reporter =>
       metrics.addReporter(reporter)
       currentReporters += reporter.getClass.getName -> reporter
     }
-    KafkaBroker.notifyClusterListeners(server.clusterId, reporters.asScala)
+    KafkaBroker.notifyClusterListeners(clusterId, reporters.asScala)
   }
 
-  private def removeReporter(className: String): Unit = {
+  private[server] def removeReporter(className: String): Unit = {
     currentReporters.remove(className).foreach(metrics.removeReporter)
   }
 
-  private def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] = {
+  private[server] def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] = {
     configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
   }
 }


[kafka] 02/07: MINOR: add :server-common test dependency to :storage (#12488)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2ee58902d4cc23fd90459ed7fbc782be191f809a
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri Aug 5 19:17:50 2022 -0700

    MINOR: add :server-common test dependency to :storage (#12488)
    
    Fix a bug in the KAFKA-14124 PR where a gradle test dependency was missing.
    This causes missing test class exceptions.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 build.gradle | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/build.gradle b/build.gradle
index aa930ce1bff..c154ece6914 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1597,6 +1597,8 @@ project(':storage') {
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':core')
     testImplementation project(':core').sourceSets.test.output
+    testImplementation project(':server-common')
+    testImplementation project(':server-common').sourceSets.test.output
     testImplementation libs.junitJupiter
     testImplementation libs.mockitoCore
     testImplementation libs.bcpkix


[kafka] 01/07: KAFKA-14124: improve quorum controller fault handling (#12447)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8db3f0998c61bd45a57ca60d17ec238f9864b7b1
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Thu Aug 4 22:49:45 2022 -0700

    KAFKA-14124: improve quorum controller fault handling (#12447)
    
    Before trying to commit a batch of records to the __cluster_metadata log, the active controller
    should try to apply them to its current in-memory state. If this application process fails, the
    active controller process should exit, allowing another node to take leadership. This will prevent
    most bad metadata records from ending up in the log and help to surface errors during testing.
    
    Similarly, if the active controller attempts to renounce leadership, and the renunciation process
    itself fails, the process should exit. This will help avoid bugs where the active controller
    continues in an undefined state.
    
    In contrast, standby controllers that experience metadata application errors should continue on, in
    order to avoid a scenario where a bad record brings down the whole controller cluster.  The
    intended effect of these changes is to make it harder to commit a bad record to the metadata log,
    but to continue to ride out the bad record as well as possible if such a record does get committed.
    
    This PR introduces the FaultHandler interface to implement these concepts. In junit tests, we use a
    FaultHandler implementation which does not exit the process. This allows us to avoid terminating
    the gradle test runner, which would be very disruptive. It also allows us to ensure that the test
    surfaces these exceptions, which we previously were not doing (the mock fault handler stores the
    exception).
    
    In addition to the above, this PR fixes a bug where RaftClient#resign was not being called from the
    renounce() function. This bug could have resulted in the raft layer not being informed of an active
    controller resigning.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 build.gradle                                       |   2 +
 checkstyle/import-control-core.xml                 |   1 +
 checkstyle/import-control.xml                      |   4 +
 checkstyle/suppressions.xml                        |   2 +
 .../main/scala/kafka/server/ControllerServer.scala |  10 +-
 .../main/scala/kafka/server/KafkaRaftServer.scala  |   6 +-
 .../java/kafka/testkit/KafkaClusterTestKit.java    |  36 +-
 .../kafka/server/QuorumTestHarness.scala           |   6 +
 .../apache/kafka/controller/QuorumController.java  | 382 ++++++++++++---------
 .../metadata/fault/MetadataFaultException.java     |  32 ++
 .../kafka/metadata/fault/MetadataFaultHandler.java |  36 ++
 .../kafka/controller/QuorumControllerTest.java     |  25 ++
 .../kafka/controller/QuorumControllerTestEnv.java  |  15 +
 .../apache/kafka/server/fault/FaultHandler.java    |  58 ++++
 .../server/fault/ProcessExitingFaultHandler.java   |  37 ++
 .../kafka/server/fault/MockFaultHandler.java       |  65 ++++
 .../server/fault/MockFaultHandlerException.java    |  38 ++
 17 files changed, 586 insertions(+), 169 deletions(-)

diff --git a/build.gradle b/build.gradle
index 73c36c31170..aa930ce1bff 100644
--- a/build.gradle
+++ b/build.gradle
@@ -875,6 +875,7 @@ project(':core') {
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':metadata').sourceSets.test.output
     testImplementation project(':raft').sourceSets.test.output
+    testImplementation project(':server-common').sourceSets.test.output
     testImplementation libs.bcpkix
     testImplementation libs.mockitoCore
     testImplementation(libs.apacheda) {
@@ -1157,6 +1158,7 @@ project(':metadata') {
     testImplementation libs.slf4jlog4j
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':raft').sourceSets.test.output
+    testImplementation project(':server-common').sourceSets.test.output
     generator project(':generator')
   }
 
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 28b325b093d..4042cba402f 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -54,6 +54,7 @@
     <allow pkg="org.apache.kafka.metadata" />
     <allow pkg="org.apache.kafka.metalog" />
     <allow pkg="org.apache.kafka.server.common" />
+    <allow pkg="org.apache.kafka.server.fault" />
   </subpackage>
 
   <subpackage name="tools">
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 211d23ff60a..4b07a26cba5 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -232,6 +232,7 @@
     <allow pkg="org.apache.kafka.raft" />
     <allow pkg="org.apache.kafka.server.authorizer" />
     <allow pkg="org.apache.kafka.server.common" />
+    <allow pkg="org.apache.kafka.server.fault" />
     <allow pkg="org.apache.kafka.server.metrics" />
     <allow pkg="org.apache.kafka.server.policy"/>
     <allow pkg="org.apache.kafka.snapshot" />
@@ -276,6 +277,9 @@
       <allow pkg="org.apache.kafka.controller" />
       <allow pkg="org.apache.kafka.metadata" />
     </subpackage>
+    <subpackage name="fault">
+      <allow pkg="org.apache.kafka.server.fault" />
+    </subpackage>
   </subpackage>
 
   <subpackage name="metalog">
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f6ca0d02fe3..bec3da1637a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -39,6 +39,8 @@
     <suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
               files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
     <suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
+    <suppress checks="MethodLength"
+              files="(KafkaClusterTestKit).java"/>
 
     <!-- Clients -->
     <suppress id="dontUseSystemExit"
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 67b3f0276d7..28c98643c3a 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -21,7 +21,6 @@ import java.util
 import java.util.OptionalLong
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{CompletableFuture, TimeUnit}
-
 import kafka.cluster.Broker.ServerInfo
 import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
 import kafka.network.{DataPlaneAcceptor, SocketServer}
@@ -45,6 +44,7 @@ import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
+import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
 
@@ -64,7 +64,9 @@ class ControllerServer(
   val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
   val configSchema: KafkaConfigSchema,
   val raftApiVersions: ApiVersions,
-  val bootstrapMetadata: BootstrapMetadata
+  val bootstrapMetadata: BootstrapMetadata,
+  val metadataFaultHandler: FaultHandler,
+  val fatalFaultHandler: FaultHandler,
 ) extends Logging with KafkaMetricsGroup {
   import kafka.server.Server._
 
@@ -194,7 +196,9 @@ class ControllerServer(
           setAlterConfigPolicy(alterConfigPolicy.asJava).
           setConfigurationValidator(new ControllerConfigurationValidator()).
           setStaticConfig(config.originals).
-          setBootstrapMetadata(bootstrapMetadata)
+          setBootstrapMetadata(bootstrapMetadata).
+          setMetadataFaultHandler(metadataFaultHandler).
+          setFatalFaultHandler(fatalFaultHandler)
       }
       authorizer match {
         case Some(a: ClusterMetadataAuthorizer) => controllerBuilder.setAuthorizer(a)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 07a31183720..e7cf8f8f1fa 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -29,9 +29,11 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.utils.{AppInfoParser, Time}
 import org.apache.kafka.common.{KafkaException, Uuid}
 import org.apache.kafka.controller.BootstrapMetadata
+import org.apache.kafka.metadata.fault.MetadataFaultHandler
 import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde}
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.fault.ProcessExitingFaultHandler
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 
 import java.nio.file.Paths
@@ -106,7 +108,9 @@ class KafkaRaftServer(
       controllerQuorumVotersFuture,
       KafkaRaftServer.configSchema,
       raftManager.apiVersions,
-      bootstrapMetadata
+      bootstrapMetadata,
+      new MetadataFaultHandler(),
+      new ProcessExitingFaultHandler(),
     ))
   } else {
     None
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index c961d71bbe5..42120324f5f 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -40,6 +40,7 @@ import org.apache.kafka.metadata.MetadataRecordSerde;
 import org.apache.kafka.raft.RaftConfig;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
 import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,6 +116,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
     public static class Builder {
         private TestKitNodes nodes;
         private Map<String, String> configProps = new HashMap<>();
+        private MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler");
+        private MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
 
         public Builder(TestKitNodes nodes) {
             this.nodes = nodes;
@@ -190,7 +193,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
                         connectFutureManager.future,
                         KafkaRaftServer.configSchema(),
                         raftManager.apiVersions(),
-                        bootstrapMetadata
+                        bootstrapMetadata,
+                        metadataFaultHandler,
+                        fatalFaultHandler
                     );
                     controllers.put(node.id(), controller);
                     controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
@@ -273,7 +278,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 throw e;
             }
             return new KafkaClusterTestKit(executorService, nodes, controllers,
-                brokers, raftManagers, connectFutureManager, baseDirectory);
+                brokers, raftManagers, connectFutureManager, baseDirectory,
+                metadataFaultHandler, fatalFaultHandler);
         }
 
         private String listeners(int node) {
@@ -314,14 +320,20 @@ public class KafkaClusterTestKit implements AutoCloseable {
     private final Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers;
     private final ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager;
     private final File baseDirectory;
-
-    private KafkaClusterTestKit(ExecutorService executorService,
-                                TestKitNodes nodes,
-                                Map<Integer, ControllerServer> controllers,
-                                Map<Integer, BrokerServer> brokers,
-                                Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers,
-                                ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager,
-                                File baseDirectory) {
+    private final MockFaultHandler metadataFaultHandler;
+    private final MockFaultHandler fatalFaultHandler;
+
+    private KafkaClusterTestKit(
+        ExecutorService executorService,
+        TestKitNodes nodes,
+        Map<Integer, ControllerServer> controllers,
+        Map<Integer, BrokerServer> brokers,
+        Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers,
+        ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager,
+        File baseDirectory,
+        MockFaultHandler metadataFaultHandler,
+        MockFaultHandler fatalFaultHandler
+    ) {
         this.executorService = executorService;
         this.nodes = nodes;
         this.controllers = controllers;
@@ -329,6 +341,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         this.raftManagers = raftManagers;
         this.controllerQuorumVotersFutureManager = controllerQuorumVotersFutureManager;
         this.baseDirectory = baseDirectory;
+        this.metadataFaultHandler = metadataFaultHandler;
+        this.fatalFaultHandler = fatalFaultHandler;
     }
 
     public void format() throws Exception {
@@ -520,6 +534,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
             executorService.shutdownNow();
             executorService.awaitTermination(5, TimeUnit.MINUTES);
         }
+        metadataFaultHandler.maybeRethrowFirstException();
+        fatalFaultHandler.maybeRethrowFirstException();
     }
 
     private void waitForAllFutures(List<Entry<String, Future<?>>> futureEntries)
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index b82f86a8cb3..766eb3a7e53 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.controller.BootstrapMetadata
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.fault.MockFaultHandler
 import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 import org.junit.jupiter.api.Assertions._
@@ -179,6 +180,8 @@ abstract class QuorumTestHarness extends Logging {
     }
   }
 
+  val faultHandler = new MockFaultHandler("quorumTestHarnessFaultHandler")
+
   // Note: according to the junit documentation: "JUnit Jupiter does not guarantee the execution
   // order of multiple @BeforeEach methods that are declared within a single test class or test
   // interface." Therefore, if you have things you would like to do before each test case runs, it
@@ -296,6 +299,8 @@ abstract class QuorumTestHarness extends Logging {
         configSchema = KafkaRaftServer.configSchema,
         raftApiVersions = raftManager.apiVersions,
         bootstrapMetadata = BootstrapMetadata.create(metadataVersion, bootstrapRecords.asJava),
+        metadataFaultHandler = faultHandler,
+        fatalFaultHandler = faultHandler,
       )
       controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
         if (e != null) {
@@ -362,6 +367,7 @@ abstract class QuorumTestHarness extends Logging {
     }
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
     Configuration.setConfiguration(null)
+    faultHandler.maybeRethrowFirstException()
   }
 
   // Trigger session expiry by reusing the session id in another client
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 0290e0040c2..a4cc1d92cba 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -91,6 +91,7 @@ import org.apache.kafka.server.authorizer.AclCreateResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.FaultHandler;
 import org.apache.kafka.server.policy.AlterConfigPolicy;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.snapshot.SnapshotReader;
@@ -149,6 +150,8 @@ public final class QuorumController implements Controller {
     static public class Builder {
         private final int nodeId;
         private final String clusterId;
+        private FaultHandler fatalFaultHandler = null;
+        private FaultHandler metadataFaultHandler = null;
         private Time time = Time.SYSTEM;
         private String threadNamePrefix = null;
         private LogContext logContext = null;
@@ -175,6 +178,16 @@ public final class QuorumController implements Controller {
             this.clusterId = clusterId;
         }
 
+        public Builder setFatalFaultHandler(FaultHandler fatalFaultHandler) {
+            this.fatalFaultHandler = fatalFaultHandler;
+            return this;
+        }
+
+        public Builder setMetadataFaultHandler(FaultHandler metadataFaultHandler) {
+            this.metadataFaultHandler = metadataFaultHandler;
+            return this;
+        }
+
         public int nodeId() {
             return nodeId;
         }
@@ -287,6 +300,10 @@ public final class QuorumController implements Controller {
                 throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool.");
             } else if (quorumFeatures == null) {
                 throw new IllegalStateException("You must specify the quorum features");
+            } else if (fatalFaultHandler == null) {
+                throw new IllegalStateException("You must specify a fatal fault handler.");
+            } else if (metadataFaultHandler == null) {
+                throw new IllegalStateException("You must specify a metadata fault handler.");
             }
 
             if (threadNamePrefix == null) {
@@ -304,6 +321,8 @@ public final class QuorumController implements Controller {
             try {
                 queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
                 return new QuorumController(
+                    fatalFaultHandler,
+                    metadataFaultHandler,
                     logContext,
                     nodeId,
                     clusterId,
@@ -426,12 +445,18 @@ public final class QuorumController implements Controller {
                 exception.getClass().getSimpleName(), deltaUs);
             return exception;
         }
-        log.warn("{}: failed with unknown server exception {} at epoch {} in {} us.  " +
-                "Renouncing leadership and reverting to the last committed offset {}.",
-            name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
-            lastCommittedOffset, exception);
-        raftClient.resign(curClaimEpoch);
-        renounce();
+        if (isActiveController()) {
+            log.warn("{}: failed with unknown server exception {} at epoch {} in {} us.  " +
+                    "Renouncing leadership and reverting to the last committed offset {}.",
+                    name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
+                    lastCommittedOffset, exception);
+            renounce();
+        } else {
+            log.warn("{}: failed with unknown server exception {} in {} us.  " +
+                    "The controller is already in standby mode.",
+                    name, exception.getClass().getSimpleName(), deltaUs,
+                    exception);
+        }
         return new UnknownServerException(exception);
     }
 
@@ -702,7 +727,7 @@ public final class QuorumController implements Controller {
             long now = time.nanoseconds();
             controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
             int controllerEpoch = curClaimEpoch;
-            if (controllerEpoch == -1) {
+            if (!isActiveController()) {
                 throw newNotControllerException();
             }
             startProcessingTimeNs = OptionalLong.of(now);
@@ -728,9 +753,26 @@ public final class QuorumController implements Controller {
                         "reaches offset {}", this, resultAndOffset.offset());
                 }
             } else {
-                // If the operation returned a batch of records, those records need to be
-                // written before we can return our result to the user.  Here, we hand off
-                // the batch of records to the raft client.  They will be written out
+                // Start by trying to apply the record to our in-memory state. This should always
+                // succeed; if it does not, that's a fatal error. It is important to do this before
+                // scheduling the record for Raft replication.
+                int i = 1;
+                for (ApiMessageAndVersion message : result.records()) {
+                    try {
+                        replay(message.message(), Optional.empty());
+                    } catch (Throwable e) {
+                        String failureMessage = String.format("Unable to apply %s record, which was " +
+                            "%d of %d record(s) in the batch following last writeOffset %d.",
+                            message.message().getClass().getSimpleName(), i, result.records().size(),
+                            writeOffset);
+                        fatalFaultHandler.handleFault(failureMessage, e);
+                    }
+                    i++;
+                }
+
+                // If the operation returned a batch of records, and those records could be applied,
+                // they need to be written before we can return our result to the user.  Here, we
+                // hand off the batch of records to the raft client.  They will be written out
                 // asynchronously.
                 final long offset;
                 if (result.isAtomic()) {
@@ -741,9 +783,6 @@ public final class QuorumController implements Controller {
                 op.processBatchEndOffset(offset);
                 updateWriteOffset(offset);
                 resultAndOffset = ControllerResultAndOffset.of(offset, result);
-                for (ApiMessageAndVersion message : result.records()) {
-                    replay(message.message(), Optional.empty(), offset);
-                }
                 snapshotRegistry.getOrCreateSnapshot(offset);
 
                 log.debug("Read-write operation {} will be completed when the log " +
@@ -789,9 +828,9 @@ public final class QuorumController implements Controller {
         return event.future();
     }
 
-    private <T> CompletableFuture<T> appendWriteEvent(String name,
-                                                      OptionalLong deadlineNs,
-                                                      ControllerWriteOperation<T> op) {
+    <T> CompletableFuture<T> appendWriteEvent(String name,
+                                              OptionalLong deadlineNs,
+                                              ControllerWriteOperation<T> op) {
         ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op);
         if (deadlineNs.isPresent()) {
             queue.appendWithDeadline(deadlineNs.getAsLong(), event);
@@ -841,11 +880,20 @@ public final class QuorumController implements Controller {
                                         "offset {} and epoch {}.", offset, epoch);
                                 }
                             }
-                            for (ApiMessageAndVersion messageAndVersion : messages) {
-                                replay(messageAndVersion.message(), Optional.empty(), offset);
+                            int i = 1;
+                            for (ApiMessageAndVersion message : messages) {
+                                try {
+                                    replay(message.message(), Optional.empty());
+                                } catch (Throwable e) {
+                                    String failureMessage = String.format("Unable to apply %s record on standby " +
+                                            "controller, which was %d of %d record(s) in the batch with baseOffset %d.",
+                                            message.message().getClass().getSimpleName(), i, messages.size(),
+                                            batch.baseOffset());
+                                    metadataFaultHandler.handleFault(failureMessage, e);
+                                }
+                                i++;
                             }
                         }
-
                         updateLastCommittedState(offset, epoch, batch.appendTimestamp());
                         processedRecordsSize += batch.sizeInBytes();
                     }
@@ -862,13 +910,9 @@ public final class QuorumController implements Controller {
             appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
                 try {
                     if (isActiveController()) {
-                        throw new IllegalStateException(
-                            String.format(
-                                "Asked to load snapshot (%s) when it is the active controller (%d)",
-                                reader.snapshotId(),
-                                curClaimEpoch
-                            )
-                        );
+                        fatalFaultHandler.handleFault(String.format("Asked to load snapshot " +
+                            "(%s) when it is the active controller (%d)", reader.snapshotId(),
+                            curClaimEpoch));
                     }
                     log.info("Starting to replay snapshot ({}), from last commit offset ({}) and epoch ({})",
                         reader.snapshotId(), lastCommittedOffset, lastCommittedEpoch);
@@ -882,26 +926,28 @@ public final class QuorumController implements Controller {
 
                         if (log.isDebugEnabled()) {
                             if (log.isTraceEnabled()) {
-                                log.trace(
-                                    "Replaying snapshot ({}) batch with last offset of {}: {}",
-                                    reader.snapshotId(),
-                                    offset,
-                                    messages
-                                        .stream()
-                                        .map(ApiMessageAndVersion::toString)
-                                        .collect(Collectors.joining(", "))
-                                );
+                                log.trace("Replaying snapshot ({}) batch with last offset of {}: {}",
+                                    reader.snapshotId(), offset, messages.stream().map(ApiMessageAndVersion::toString).
+                                        collect(Collectors.joining(", ")));
                             } else {
-                                log.debug(
-                                    "Replaying snapshot ({}) batch with last offset of {}",
-                                    reader.snapshotId(),
-                                    offset
-                                );
+                                log.debug("Replaying snapshot ({}) batch with last offset of {}",
+                                    reader.snapshotId(), offset);
                             }
                         }
 
-                        for (ApiMessageAndVersion messageAndVersion : messages) {
-                            replay(messageAndVersion.message(), Optional.of(reader.snapshotId()), offset);
+                        int i = 1;
+                        for (ApiMessageAndVersion message : messages) {
+                            try {
+                                replay(message.message(), Optional.of(reader.snapshotId()));
+                            } catch (Throwable e) {
+                                String failureMessage = String.format("Unable to apply %s record " +
+                                        "from snapshot %s on standby controller, which was %d of " +
+                                        "%d record(s) in the batch with baseOffset %d.",
+                                        message.message().getClass().getSimpleName(), reader.snapshotId(),
+                                        i, messages.size(), batch.baseOffset());
+                                metadataFaultHandler.handleFault(failureMessage, e);
+                            }
+                            i++;
                         }
                     }
                     updateLastCommittedState(
@@ -968,10 +1014,14 @@ public final class QuorumController implements Controller {
                             if (exception != null) {
                                 log.error("Failed to bootstrap metadata.", exception);
                                 appendRaftEvent("bootstrapMetadata[" + curClaimEpoch + "]", () -> {
-                                    log.warn("Renouncing the leadership at oldEpoch {} since we could not bootstrap " +
-                                             "metadata. Reverting to last committed offset {}.",
-                                        curClaimEpoch, lastCommittedOffset);
-                                    renounce();
+                                    if (isActiveController()) {
+                                        log.warn("Renouncing the leadership at oldEpoch {} since we could not bootstrap " +
+                                                        "metadata. Reverting to last committed offset {}.",
+                                                curClaimEpoch, lastCommittedOffset);
+                                        renounce();
+                                    } else {
+                                        log.warn("Unable to bootstrap metadata on standby controller.");
+                                    }
                                 });
                             }
                         });
@@ -998,10 +1048,12 @@ public final class QuorumController implements Controller {
                 });
             } else if (isActiveController()) {
                 appendRaftEvent("handleRenounce[" + curClaimEpoch + "]", () -> {
-                    log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " +
-                             "log event. Reverting to last committed offset {}.", curClaimEpoch,
-                        lastCommittedOffset);
-                    renounce();
+                    if (isActiveController()) {
+                        log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " +
+                                "log event. Reverting to last committed offset {}.", curClaimEpoch,
+                                lastCommittedOffset);
+                        renounce();
+                    }
                 });
             }
         }
@@ -1078,26 +1130,34 @@ public final class QuorumController implements Controller {
     }
 
     private void renounce() {
-        curClaimEpoch = -1;
-        controllerMetrics.setActive(false);
-        purgatory.failAll(newNotControllerException());
-
-        if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
-            newBytesSinceLastSnapshot = 0;
-            snapshotRegistry.revertToSnapshot(lastCommittedOffset);
-            authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
-        } else {
-            resetState();
-            raftClient.unregister(metaLogListener);
-            metaLogListener = new QuorumMetaLogListener();
-            raftClient.register(metaLogListener);
+        try {
+            if (curClaimEpoch == -1) {
+                throw new RuntimeException("Cannot renounce leadership because we are not the " +
+                        "current leader.");
+            }
+            raftClient.resign(curClaimEpoch);
+            curClaimEpoch = -1;
+            controllerMetrics.setActive(false);
+            purgatory.failAll(newNotControllerException());
+
+            if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
+                newBytesSinceLastSnapshot = 0;
+                snapshotRegistry.revertToSnapshot(lastCommittedOffset);
+                authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
+            } else {
+                resetState();
+                raftClient.unregister(metaLogListener);
+                metaLogListener = new QuorumMetaLogListener();
+                raftClient.register(metaLogListener);
+            }
+            updateWriteOffset(-1);
+            clusterControl.deactivate();
+            cancelMaybeFenceReplicas();
+            cancelMaybeBalancePartitionLeaders();
+            cancelNextWriteNoOpRecord();
+        } catch (Throwable e) {
+            fatalFaultHandler.handleFault("exception while renouncing leadership", e);
         }
-
-        updateWriteOffset(-1);
-        clusterControl.deactivate();
-        cancelMaybeFenceReplicas();
-        cancelMaybeBalancePartitionLeaders();
-        cancelNextWriteNoOpRecord();
     }
 
     private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs,
@@ -1246,70 +1306,60 @@ public final class QuorumController implements Controller {
     }
 
     @SuppressWarnings("unchecked")
-    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {
-        try {
-            MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
-            switch (type) {
-                case REGISTER_BROKER_RECORD:
-                    clusterControl.replay((RegisterBrokerRecord) message);
-                    break;
-                case UNREGISTER_BROKER_RECORD:
-                    clusterControl.replay((UnregisterBrokerRecord) message);
-                    break;
-                case TOPIC_RECORD:
-                    replicationControl.replay((TopicRecord) message);
-                    break;
-                case PARTITION_RECORD:
-                    replicationControl.replay((PartitionRecord) message);
-                    break;
-                case CONFIG_RECORD:
-                    configurationControl.replay((ConfigRecord) message);
-                    break;
-                case PARTITION_CHANGE_RECORD:
-                    replicationControl.replay((PartitionChangeRecord) message);
-                    break;
-                case FENCE_BROKER_RECORD:
-                    clusterControl.replay((FenceBrokerRecord) message);
-                    break;
-                case UNFENCE_BROKER_RECORD:
-                    clusterControl.replay((UnfenceBrokerRecord) message);
-                    break;
-                case REMOVE_TOPIC_RECORD:
-                    replicationControl.replay((RemoveTopicRecord) message);
-                    break;
-                case FEATURE_LEVEL_RECORD:
-                    featureControl.replay((FeatureLevelRecord) message);
-                    handleFeatureControlChange();
-                    break;
-                case CLIENT_QUOTA_RECORD:
-                    clientQuotaControlManager.replay((ClientQuotaRecord) message);
-                    break;
-                case PRODUCER_IDS_RECORD:
-                    producerIdControlManager.replay((ProducerIdsRecord) message);
-                    break;
-                case BROKER_REGISTRATION_CHANGE_RECORD:
-                    clusterControl.replay((BrokerRegistrationChangeRecord) message);
-                    break;
-                case ACCESS_CONTROL_ENTRY_RECORD:
-                    aclControlManager.replay((AccessControlEntryRecord) message, snapshotId);
-                    break;
-                case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
-                    aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId);
-                    break;
-                case NO_OP_RECORD:
-                    // NoOpRecord is an empty record and doesn't need to be replayed
-                    break;
-                default:
-                    throw new RuntimeException("Unhandled record type " + type);
-            }
-        } catch (Exception e) {
-            if (snapshotId.isPresent()) {
-                log.error("Error replaying record {} from snapshot {} at last offset {}.",
-                    message.toString(), snapshotId.get(), offset, e);
-            } else {
-                log.error("Error replaying record {} at last offset {}.",
-                    message.toString(), offset, e);
-            }
+    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                clusterControl.replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                clusterControl.replay((UnregisterBrokerRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replicationControl.replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replicationControl.replay((PartitionRecord) message);
+                break;
+            case CONFIG_RECORD:
+                configurationControl.replay((ConfigRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replicationControl.replay((PartitionChangeRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                clusterControl.replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                clusterControl.replay((UnfenceBrokerRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replicationControl.replay((RemoveTopicRecord) message);
+                break;
+            case FEATURE_LEVEL_RECORD:
+                featureControl.replay((FeatureLevelRecord) message);
+                handleFeatureControlChange();
+                break;
+            case CLIENT_QUOTA_RECORD:
+                clientQuotaControlManager.replay((ClientQuotaRecord) message);
+                break;
+            case PRODUCER_IDS_RECORD:
+                producerIdControlManager.replay((ProducerIdsRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                clusterControl.replay((BrokerRegistrationChangeRecord) message);
+                break;
+            case ACCESS_CONTROL_ENTRY_RECORD:
+                aclControlManager.replay((AccessControlEntryRecord) message, snapshotId);
+                break;
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+                aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId);
+                break;
+            case NO_OP_RECORD:
+                // NoOpRecord is an empty record and doesn't need to be replayed
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);
         }
     }
 
@@ -1344,8 +1394,24 @@ public final class QuorumController implements Controller {
         updateLastCommittedState(-1, -1, -1);
     }
 
+    /**
+     * Handles faults that should normally be fatal to the process.
+     */
+    private final FaultHandler fatalFaultHandler;
+
+    /**
+     * Handles faults in metadata handling that are normally not fatal.
+     */
+    private final FaultHandler metadataFaultHandler;
+
+    /**
+     * The slf4j log context, used to create new loggers.
+     */
     private final LogContext logContext;
 
+    /**
+     * The slf4j logger.
+     */
     private final Logger log;
 
     /**
@@ -1530,28 +1596,34 @@ public final class QuorumController implements Controller {
 
     private final BootstrapMetadata bootstrapMetadata;
 
-    private QuorumController(LogContext logContext,
-                             int nodeId,
-                             String clusterId,
-                             KafkaEventQueue queue,
-                             Time time,
-                             KafkaConfigSchema configSchema,
-                             RaftClient<ApiMessageAndVersion> raftClient,
-                             QuorumFeatures quorumFeatures,
-                             short defaultReplicationFactor,
-                             int defaultNumPartitions,
-                             ReplicaPlacer replicaPlacer,
-                             long snapshotMaxNewRecordBytes,
-                             OptionalLong leaderImbalanceCheckIntervalNs,
-                             OptionalLong maxIdleIntervalNs,
-                             long sessionTimeoutNs,
-                             ControllerMetrics controllerMetrics,
-                             Optional<CreateTopicPolicy> createTopicPolicy,
-                             Optional<AlterConfigPolicy> alterConfigPolicy,
-                             ConfigurationValidator configurationValidator,
-                             Optional<ClusterMetadataAuthorizer> authorizer,
-                             Map<String, Object> staticConfig,
-                             BootstrapMetadata bootstrapMetadata) {
+    private QuorumController(
+        FaultHandler fatalFaultHandler,
+        FaultHandler metadataFaultHandler,
+        LogContext logContext,
+        int nodeId,
+        String clusterId,
+        KafkaEventQueue queue,
+        Time time,
+        KafkaConfigSchema configSchema,
+        RaftClient<ApiMessageAndVersion> raftClient,
+        QuorumFeatures quorumFeatures,
+        short defaultReplicationFactor,
+        int defaultNumPartitions,
+        ReplicaPlacer replicaPlacer,
+        long snapshotMaxNewRecordBytes,
+        OptionalLong leaderImbalanceCheckIntervalNs,
+        OptionalLong maxIdleIntervalNs,
+        long sessionTimeoutNs,
+        ControllerMetrics controllerMetrics,
+        Optional<CreateTopicPolicy> createTopicPolicy,
+        Optional<AlterConfigPolicy> alterConfigPolicy,
+        ConfigurationValidator configurationValidator,
+        Optional<ClusterMetadataAuthorizer> authorizer,
+        Map<String, Object> staticConfig,
+        BootstrapMetadata bootstrapMetadata
+    ) {
+        this.fatalFaultHandler = fatalFaultHandler;
+        this.metadataFaultHandler = metadataFaultHandler;
         this.logContext = logContext;
         this.log = logContext.logger(QuorumController.class);
         this.nodeId = nodeId;
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
new file mode 100644
index 00000000000..c57ce46fb35
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.fault;
+
+
+/**
+ * A fault that we encountered while we replayed cluster metadata.
+ */
+public class MetadataFaultException extends RuntimeException {
+    public MetadataFaultException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MetadataFaultException(String message) {
+        super(message);
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
new file mode 100644
index 00000000000..e9f71b80e67
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.fault;
+
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles faults in Kafka metadata management.
+ */
+public class MetadataFaultHandler implements FaultHandler {
+    private static final Logger log = LoggerFactory.getLogger(MetadataFaultHandler.class);
+
+    @Override
+    public void handleFault(String failureMessage, Throwable cause) {
+        FaultHandler.logFailureMessage(log, failureMessage, cause);
+        throw new MetadataFaultException("Encountered metadata fault: " + failureMessage, cause);
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 2cdec699da2..e8392895626 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.config.ConfigResource;
@@ -1182,6 +1183,30 @@ public class QuorumControllerTest {
         }
     }
 
+    @Test
+    public void testFatalMetadataReplayErrorOnActive() throws Throwable {
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
+            })) {
+                QuorumController active = controlEnv.activeController();
+                CompletableFuture<Void> future = active.appendWriteEvent("errorEvent",
+                        OptionalLong.empty(), () -> {
+                            return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(
+                                    new ConfigRecord().
+                                            setName(null).
+                                            setResourceName(null).
+                                            setResourceType((byte) 255).
+                                            setValue(null), (short) 0)), null);
+                        });
+                assertThrows(ExecutionException.class, () -> future.get());
+                assertEquals(NullPointerException.class,
+                        controlEnv.fatalFaultHandler().firstException().getCause().getClass());
+                controlEnv.fatalFaultHandler().setIgnore(true);
+                controlEnv.metadataFaultHandler().setIgnore(true);
+            }
+        }
+    }
+
     private static void assertInitialLoadFuturesNotComplete(List<StandardAuthorizer> authorizers) {
         for (int i = 0; i < authorizers.size(); i++) {
             assertFalse(authorizers.get(i).initialLoadFuture().isDone(),
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 4cc45a9774b..40dd21c88d3 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -34,6 +34,7 @@ import org.apache.kafka.controller.QuorumController.Builder;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
 import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +45,8 @@ public class QuorumControllerTestEnv implements AutoCloseable {
 
     private final List<QuorumController> controllers;
     private final LocalLogManagerTestEnv logEnv;
+    private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
+    private final MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler");
 
     public QuorumControllerTestEnv(
         LocalLogManagerTestEnv logEnv,
@@ -84,6 +87,8 @@ public class QuorumControllerTestEnv implements AutoCloseable {
                 sessionTimeoutMillis.ifPresent(timeout -> {
                     builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
                 });
+                builder.setFatalFaultHandler(fatalFaultHandler);
+                builder.setMetadataFaultHandler(metadataFaultHandler);
                 builderConsumer.accept(builder);
                 this.controllers.add(builder.build());
             }
@@ -117,6 +122,14 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         return controllers;
     }
 
+    public MockFaultHandler fatalFaultHandler() {
+        return fatalFaultHandler;
+    }
+
+    public MockFaultHandler metadataFaultHandler() {
+        return metadataFaultHandler;
+    }
+
     @Override
     public void close() throws InterruptedException {
         for (QuorumController controller : controllers) {
@@ -125,5 +138,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         for (QuorumController controller : controllers) {
             controller.close();
         }
+        fatalFaultHandler.maybeRethrowFirstException();
+        metadataFaultHandler.maybeRethrowFirstException();
     }
 }
diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
new file mode 100644
index 00000000000..4c03eacc32f
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+
+
+/**
+ * Handle a server fault.
+ */
+public interface FaultHandler {
+    /**
+     * Handle a fault.
+     *
+     * @param failureMessage        The failure message to log.
+     */
+    default void handleFault(String failureMessage) {
+        handleFault(failureMessage, null);
+    }
+
+    /**
+     * Handle a fault.
+     *
+     * @param failureMessage        The failure message to log.
+     * @param cause                 The exception that caused the problem, or null.
+     */
+    void handleFault(String failureMessage, Throwable cause);
+
+    /**
+     * Log a failure message about a fault.
+     *
+     * @param log               The log4j logger.
+     * @param failureMessage    The failure message.
+     * @param cause             The exception which caused the failure, or null.
+     */
+    static void logFailureMessage(Logger log, String failureMessage, Throwable cause) {
+        if (cause == null) {
+            log.error("Encountered fatal fault: {}", failureMessage);
+        } else {
+            log.error("Encountered fatal fault: {}", failureMessage, cause);
+        }
+    }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
new file mode 100644
index 00000000000..e3b9f25a3be
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.common.utils.Exit;
+
+
+/**
+ * This is a fault handler which exits the Java process.
+ */
+public class ProcessExitingFaultHandler implements FaultHandler {
+    private static final Logger log = LoggerFactory.getLogger(ProcessExitingFaultHandler.class);
+
+    @Override
+    public void handleFault(String failureMessage, Throwable cause) {
+        FaultHandler.logFailureMessage(log, failureMessage, cause);
+        Exit.exit(1);
+    }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
new file mode 100644
index 00000000000..39b3ed07847
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a fault handler suitable for use in JUnit tests. It will store the result of the first
+ * call to handleFault that was made.
+ */
+public class MockFaultHandler implements FaultHandler {
+    private static final Logger log = LoggerFactory.getLogger(MockFaultHandler.class);
+
+    private final String name;
+    private MockFaultHandlerException firstException = null;
+    private boolean ignore = false;
+
+    public MockFaultHandler(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public synchronized void handleFault(String failureMessage, Throwable cause) {
+        FaultHandler.logFailureMessage(log, failureMessage, cause);
+        MockFaultHandlerException e = (cause == null) ?
+                new MockFaultHandlerException(name + ": " + failureMessage) :
+                new MockFaultHandlerException(name + ": " + failureMessage +
+                        ": " + cause.getMessage(), cause);
+        if (firstException == null) {
+            firstException = e;
+        }
+        throw e;
+    }
+
+    public synchronized void maybeRethrowFirstException() {
+        if (firstException != null && !ignore) {
+            throw firstException;
+        }
+    }
+
+    public synchronized MockFaultHandlerException firstException() {
+        return firstException;
+    }
+
+    public synchronized void setIgnore(boolean ignore) {
+        this.ignore = ignore;
+    }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
new file mode 100644
index 00000000000..ef9b11bdeb5
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+
+/**
+ * An exception thrown by MockFaultHandler.
+ */
+public class MockFaultHandlerException extends RuntimeException {
+    public MockFaultHandlerException(String failureMessage, Throwable cause) {
+        super(failureMessage, cause);
+        // If a cause exception was provided, set our the stack trace its stack trace. This is
+        // useful in junit tests where a limited number of stack frames are printed, and usually
+        // the stack frames of cause exceptions get trimmed.
+        if (cause != null) {
+            setStackTrace(cause.getStackTrace());
+        }
+    }
+
+    public MockFaultHandlerException(String failureMessage) {
+        this(failureMessage, null);
+    }
+}


[kafka] 03/07: KAFKA-14129: KRaft must check manual assignments for createTopics are contiguous (#12467)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit bbd659325a60c8923afaa0bf099ad0536e297914
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Tue Aug 2 15:39:47 2022 -0700

    KAFKA-14129: KRaft must check manual assignments for createTopics are contiguous (#12467)
    
    KRaft should validate that manual assignments given to createTopics are contiguous. In other words,
    they must start with partition 0, and progress through 1, 2, 3, etc. ZK mode does this, but KRaft
    mode previously did not. Also fix a null pointer exception when the placement for partition 0
    was not specified.
    
    Convert over AddPartitionsTest to use KRaft. This PR converts all of the test except for some of
    the placement logic tests, which will need to be redone for KRaft mode in a future change.
    
    Fix null pointer exception in KRaftMetadataCache#getPartitionInfo.  Specifically, we should not
    assume that the partition will be found in the hash map. This is another case where we had
    "Some(x)" but it should be "Option(x)."
    
    Fix a potential null pointer exception in BrokerServer#state.
    
    Reviewers: dengziming <de...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   5 +-
 .../kafka/server/metadata/KRaftMetadataCache.scala |   2 +-
 .../scala/unit/kafka/admin/AddPartitionsTest.scala | 164 ++++++++++++++-------
 .../controller/ReplicationControlManager.java      |   8 +-
 4 files changed, 123 insertions(+), 56 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index eb21c1ed25e..0bdd6734975 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -81,7 +81,8 @@ class BrokerServer(
   val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
 ) extends KafkaBroker {
 
-  override def brokerState: BrokerState = lifecycleManager.state
+  override def brokerState: BrokerState = Option(lifecycleManager).
+    flatMap(m => Some(m.state)).getOrElse(BrokerState.NOT_RUNNING)
 
   import kafka.server.Server._
 
@@ -89,7 +90,7 @@ class BrokerServer(
 
   this.logIdent = logContext.logPrefix
 
-  @volatile private var lifecycleManager: BrokerLifecycleManager = null
+  @volatile var lifecycleManager: BrokerLifecycleManager = null
 
   private val isShuttingDown = new AtomicBoolean(false)
 
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index ae2e6523573..52577211503 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -229,7 +229,7 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
 
   override def getPartitionInfo(topicName: String, partitionId: Int): Option[UpdateMetadataPartitionState] = {
     Option(_currentImage.topics().getTopic(topicName)).
-      flatMap(topic => Some(topic.partitions().get(partitionId))).
+      flatMap(topic => Option(topic.partitions().get(partitionId))).
       flatMap(partition => Some(new UpdateMetadataPartitionState().
         setTopicName(topicName).
         setPartitionIndex(partitionId).
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index ea4215d9c39..4e2bfee60ee 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,18 +17,24 @@
 
 package kafka.admin
 
-import java.util.Optional
+import java.util.{Collections, Optional}
 import kafka.controller.ReplicaAssignment
-import kafka.server.BaseRequestTest
-import kafka.utils.TestUtils
+import kafka.server.{BaseRequestTest, BrokerServer}
+import kafka.utils.{TestInfoUtils, TestUtils}
 import kafka.utils.TestUtils._
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic}
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
 import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
-
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util
+import java.util.Arrays.asList
+import java.util.Collections.singletonList
+import java.util.concurrent.ExecutionException
 import scala.jdk.CollectionConverters._
 
 class AddPartitionsTest extends BaseRequestTest {
@@ -47,44 +53,97 @@ class AddPartitionsTest extends BaseRequestTest {
   val topic4Assignment = Map(0 -> ReplicaAssignment(Seq(0,3), List(), List()))
   val topic5 = "new-topic5"
   val topic5Assignment = Map(1 -> ReplicaAssignment(Seq(0,1), List(), List()))
+  var admin: Admin = null
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
 
+    if (isKRaftTest()) {
+      brokers.foreach(broker => broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get())
+    }
     createTopicWithAssignment(topic1, partitionReplicaAssignment = topic1Assignment.map { case (k, v) => k -> v.replicas })
     createTopicWithAssignment(topic2, partitionReplicaAssignment = topic2Assignment.map { case (k, v) => k -> v.replicas })
     createTopicWithAssignment(topic3, partitionReplicaAssignment = topic3Assignment.map { case (k, v) => k -> v.replicas })
     createTopicWithAssignment(topic4, partitionReplicaAssignment = topic4Assignment.map { case (k, v) => k -> v.replicas })
+    admin = createAdminClient()
   }
 
-  @Test
-  def testWrongReplicaCount(): Unit = {
-    assertThrows(classOf[InvalidReplicaAssignmentException], () => adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 2,
-      Some(Map(0 -> Seq(0, 1), 1 -> Seq(0, 1, 2)))))
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testWrongReplicaCount(quorum: String): Unit = {
+    assertEquals(classOf[InvalidReplicaAssignmentException], assertThrows(classOf[ExecutionException], () => {
+        admin.createPartitions(Collections.singletonMap(topic1,
+          NewPartitions.increaseTo(2, singletonList(asList(0, 1, 2))))).all().get()
+      }).getCause.getClass)
   }
 
-  @Test
-  def testMissingPartition0(): Unit = {
-    val e = assertThrows(classOf[AdminOperationException], () => adminZkClient.addPartitions(topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
-      Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2)))))
-    assertTrue(e.getMessage.contains("Unexpected existing replica assignment for topic 'new-topic5', partition id 0 is missing"))
+  /**
+   * Test that when we supply a manual partition assignment to createTopics, it must be 0-based
+   * and consecutive.
+   */
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMissingPartitionsInCreateTopics(quorum: String): Unit = {
+    val topic6Placements = new util.HashMap[Integer, util.List[Integer]]
+    topic6Placements.put(1, asList(0, 1))
+    topic6Placements.put(2, asList(1, 0))
+    val topic7Placements = new util.HashMap[Integer, util.List[Integer]]
+    topic7Placements.put(2, asList(0, 1))
+    topic7Placements.put(3, asList(1, 0))
+    val futures = admin.createTopics(asList(
+      new NewTopic("new-topic6", topic6Placements),
+      new NewTopic("new-topic7", topic7Placements))).values()
+    val topic6Cause = assertThrows(classOf[ExecutionException], () => futures.get("new-topic6").get()).getCause
+    assertEquals(classOf[InvalidReplicaAssignmentException], topic6Cause.getClass)
+    assertTrue(topic6Cause.getMessage.contains("partitions should be a consecutive 0-based integer sequence"),
+      "Unexpected error message: " + topic6Cause.getMessage)
+    val topic7Cause = assertThrows(classOf[ExecutionException], () => futures.get("new-topic7").get()).getCause
+    assertEquals(classOf[InvalidReplicaAssignmentException], topic7Cause.getClass)
+    assertTrue(topic7Cause.getMessage.contains("partitions should be a consecutive 0-based integer sequence"),
+      "Unexpected error message: " + topic7Cause.getMessage)
   }
 
-  @Test
-  def testIncrementPartitions(): Unit = {
-    adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 3)
+  /**
+   * Test that when we supply a manual partition assignment to createPartitions, it must contain
+   * enough partitions.
+   */
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMissingPartitionsInCreatePartitions(quorum: String): Unit = {
+    val cause = assertThrows(classOf[ExecutionException], () =>
+      admin.createPartitions(Collections.singletonMap(topic1,
+        NewPartitions.increaseTo(3, singletonList(asList(0, 1, 2))))).all().get()).getCause
+    assertEquals(classOf[InvalidReplicaAssignmentException], cause.getClass)
+    if (isKRaftTest()) {
+      assertTrue(cause.getMessage.contains("Attempted to add 2 additional partition(s), but only 1 assignment(s) " +
+        "were specified."), "Unexpected error message: " + cause.getMessage)
+    } else {
+      assertTrue(cause.getMessage.contains("Increasing the number of partitions by 2 but 1 assignments provided."),
+        "Unexpected error message: " + cause.getMessage)
+    }
+    if (!isKRaftTest()) {
+      // In ZK mode, test the raw AdminZkClient method as well.
+      val e = assertThrows(classOf[AdminOperationException], () => adminZkClient.addPartitions(
+        topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
+        Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2)))))
+      assertTrue(e.getMessage.contains("Unexpected existing replica assignment for topic 'new-topic5', partition " +
+        "id 0 is missing"))
+    }
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIncrementPartitions(quorum: String): Unit = {
+    admin.createPartitions(Collections.singletonMap(topic1, NewPartitions.increaseTo(3))).all().get()
+
     // wait until leader is elected
-    val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1)
-    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2)
-    val leader1FromZk = zkClient.getLeaderForPartition(new TopicPartition(topic1, 1)).get
-    val leader2FromZk = zkClient.getLeaderForPartition(new TopicPartition(topic1, 2)).get
-    assertEquals(leader1, leader1FromZk)
-    assertEquals(leader2, leader2FromZk)
+    waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 1)
+    waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 2)
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic1, 1)
-    TestUtils.waitForPartitionMetadata(servers, topic1, 2)
+    TestUtils.waitForPartitionMetadata(brokers, topic1, 1)
+    TestUtils.waitForPartitionMetadata(brokers, topic1, 2)
     val response = connectAndReceive[MetadataResponse](
       new MetadataRequest.Builder(Seq(topic1).asJava, false).build)
     assertEquals(1, response.topicMetadata.size)
@@ -102,22 +161,21 @@ class AddPartitionsTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testManualAssignmentOfReplicas(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testManualAssignmentOfReplicas(quorum: String): Unit = {
     // Add 2 partitions
-    adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3,
-      Some(Map(0 -> Seq(1, 2), 1 -> Seq(0, 1), 2 -> Seq(2, 3))))
+    admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3,
+      asList(asList(0, 1), asList(2, 3))))).all().get()
     // wait until leader is elected
-    val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1)
-    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2)
-    val leader1FromZk = zkClient.getLeaderForPartition(new TopicPartition(topic2, 1)).get
-    val leader2FromZk = zkClient.getLeaderForPartition(new TopicPartition(topic2, 2)).get
-    assertEquals(leader1, leader1FromZk)
-    assertEquals(leader2, leader2FromZk)
+    val leader1 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 1)
+    val leader2 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 2)
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic2, 1)
-    TestUtils.waitForPartitionMetadata(servers, topic2, 2)
+    val partition1Metadata = TestUtils.waitForPartitionMetadata(brokers, topic2, 1)
+    assertEquals(leader1, partition1Metadata.leader())
+    val partition2Metadata = TestUtils.waitForPartitionMetadata(brokers, topic2, 2)
+    assertEquals(leader2, partition2Metadata.leader())
     val response = connectAndReceive[MetadataResponse](
       new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
     assertEquals(1, response.topicMetadata.size)
@@ -132,17 +190,18 @@ class AddPartitionsTest extends BaseRequestTest {
     assertEquals(Set(0, 1), replicas.asScala.toSet)
   }
 
-  @Test
-  def testReplicaPlacementAllServers(): Unit = {
-    adminZkClient.addPartitions(topic3, topic3Assignment, adminZkClient.getBrokerMetadatas(), 7)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // TODO: add kraft support
+  def testReplicaPlacementAllServers(quorum: String): Unit = {
+    admin.createPartitions(Collections.singletonMap(topic3, NewPartitions.increaseTo(7))).all().get()
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic3, 1)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 2)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 3)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 4)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 5)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 6)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 1)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 2)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 3)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 4)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 5)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 6)
 
     val response = connectAndReceive[MetadataResponse](
       new MetadataRequest.Builder(Seq(topic3).asJava, false).build)
@@ -157,13 +216,14 @@ class AddPartitionsTest extends BaseRequestTest {
     validateLeaderAndReplicas(topicMetadata, 6, 0, Set(0, 1, 2, 3))
   }
 
-  @Test
-  def testReplicaPlacementPartialServers(): Unit = {
-    adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // TODO: add kraft support
+  def testReplicaPlacementPartialServers(quorum: String): Unit = {
+    admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3))).all().get()
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic2, 1)
-    TestUtils.waitForPartitionMetadata(servers, topic2, 2)
+    TestUtils.waitForPartitionMetadata(brokers, topic2, 1)
+    TestUtils.waitForPartitionMetadata(brokers, topic2, 2)
 
     val response = connectAndReceive[MetadataResponse](
       new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 3a3788c41a5..bf3a679d2ce 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -666,6 +666,12 @@ public class ReplicationControlManager {
                     Replicas.toArray(assignment.brokerIds()), Replicas.toArray(isr),
                     Replicas.NONE, Replicas.NONE, isr.get(0), LeaderRecoveryState.RECOVERED, 0, 0));
             }
+            for (int i = 0; i < newParts.size(); i++) {
+                if (!newParts.containsKey(i)) {
+                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
+                            "partitions should be a consecutive 0-based integer sequence");
+                }
+            }
             ApiError error = maybeCheckCreateTopicPolicy(() -> {
                 Map<Integer, List<Integer>> assignments = new HashMap<>();
                 newParts.entrySet().forEach(e -> assignments.put(e.getKey(),
@@ -744,7 +750,7 @@ public class ReplicationControlManager {
                     setIsSensitive(entry.isSensitive()));
             }
             result.setNumPartitions(newParts.size());
-            result.setReplicationFactor((short) newParts.get(0).replicas.length);
+            result.setReplicationFactor((short) newParts.values().iterator().next().replicas.length);
             result.setTopicConfigErrorCode(NONE.code());
         } else {
             result.setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code());


[kafka] 06/07: MINOR: Convert some junit tests to kraft (#12443)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 112294334f4a2e539c37f8ea2a6064af7443351f
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Wed Jul 27 10:38:31 2022 -0700

    MINOR: Convert some junit tests to kraft (#12443)
    
    Convert ProducerCompressionTest, MirrorMakerIntegrationTest, EdgeCaseRequestTest to kraft.
    
    Make it explicit that ServerShutdownTest#testControllerShutdownDuringSend is ZK-only.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../kafka/api/ProducerCompressionTest.scala        | 49 +++++++++++-----------
 .../kafka/tools/MirrorMakerIntegrationTest.scala   | 24 ++++++-----
 .../unit/kafka/server/EdgeCaseRequestTest.scala    | 32 ++++++++------
 .../unit/kafka/server/ServerShutdownTest.scala     | 10 +++--
 4 files changed, 64 insertions(+), 51 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index ccdfe7d3d36..07d9ccb024f 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -17,19 +17,19 @@
 
 package kafka.api.test
 
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
 import kafka.utils.TestUtils
-import kafka.server.QuorumTestHarness
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import org.junit.jupiter.params.provider.CsvSource
 
 import java.util.{Collections, Properties}
-import scala.jdk.CollectionConverters._
 
 class ProducerCompressionTest extends QuorumTestHarness {
 
@@ -37,18 +37,18 @@ class ProducerCompressionTest extends QuorumTestHarness {
   private val topic = "topic"
   private val numRecords = 2000
 
-  private var server: KafkaServer = null
+  private var broker: KafkaBroker = null
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
-    server = TestUtils.createServer(KafkaConfig.fromProps(props))
+    val props = TestUtils.createBrokerConfig(brokerId, zkConnectOrNull)
+    broker = createBroker(new KafkaConfig(props))
   }
 
   @AfterEach
   override def tearDown(): Unit = {
-    TestUtils.shutdownServers(Seq(server))
+    TestUtils.shutdownServers(Seq(broker))
     super.tearDown()
   }
 
@@ -58,11 +58,18 @@ class ProducerCompressionTest extends QuorumTestHarness {
    * Compressed messages should be able to sent and consumed correctly
    */
   @ParameterizedTest
-  @MethodSource(Array("parameters"))
-  def testCompression(compression: String): Unit = {
+  @CsvSource(value = Array(
+    "kraft,none",
+    "kraft,gzip",
+    "kraft,snappy",
+    "kraft,lz4",
+    "kraft,zstd",
+    "zk,gzip"
+  ))
+  def testCompression(quorum: String, compression: String): Unit = {
 
     val producerProps = new Properties()
-    val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(server))
+    val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(broker))
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
     producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
     producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
@@ -72,7 +79,13 @@ class ProducerCompressionTest extends QuorumTestHarness {
 
     try {
       // create topic
-      TestUtils.createTopic(zkClient, topic, 1, 1, List(server))
+      val admin = TestUtils.createAdminClient(Seq(broker),
+        ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+      try {
+        TestUtils.createTopicWithAdmin(admin, topic, Seq(broker))
+      } finally {
+        admin.close()
+      }
       val partition = 0
 
       // prepare the messages
@@ -103,15 +116,3 @@ class ProducerCompressionTest extends QuorumTestHarness {
     }
   }
 }
-
-object ProducerCompressionTest {
-  def parameters: java.util.stream.Stream[Arguments] = {
-    Seq(
-      Arguments.of("none"),
-      Arguments.of("gzip"),
-      Arguments.of("snappy"),
-      Arguments.of("lz4"),
-      Arguments.of("zstd")
-    ).asJava.stream()
-  }
-}
diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
index 4f673cdd60a..c64d25fe4e6 100644
--- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -18,26 +18,27 @@ package kafka.tools
 
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
-
 import scala.collection.Seq
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.tools.MirrorMaker.{ConsumerWrapper, MirrorMakerProducer, NoRecordsException}
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
 import org.apache.kafka.common.utils.Exit
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 @deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", since = "3.0")
 class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
 
   override def generateConfigs: Seq[KafkaConfig] =
-    TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, new Properties()))
+    TestUtils.createBrokerConfigs(1, zkConnectOrNull).map(KafkaConfig.fromProps(_, new Properties()))
 
   val exited = new AtomicBoolean(false)
 
@@ -57,8 +58,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
     }
   }
 
-  @Test
-  def testCommitOffsetsThrowTimeoutException(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCommitOffsetsThrowTimeoutException(quorum: String): Unit = {
     val consumerProps = new Properties
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -70,8 +72,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
     assertThrows(classOf[TimeoutException], () => mirrorMakerConsumer.commit())
   }
 
-  @Test
-  def testCommitOffsetsRemoveNonExistentTopics(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCommitOffsetsRemoveNonExistentTopics(quorum: String): Unit = {
     val consumerProps = new Properties
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -85,8 +88,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
     assertTrue(mirrorMakerConsumer.offsets.isEmpty, "Offsets for non-existent topics should be removed")
   }
 
-  @Test
-  def testCommaSeparatedRegex(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCommaSeparatedRegex(quorum: String): Unit = {
     val topic = "new-topic"
     val msg = "a test message"
 
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index 1a383a8fbcd..1bbde3ffb6b 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -35,19 +35,20 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.ByteUtils
 import org.apache.kafka.common.{TopicPartition, requests}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 
 class EdgeCaseRequestTest extends KafkaServerTestHarness {
 
   def generateConfigs = {
-    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    val props = TestUtils.createBrokerConfig(1, zkConnectOrNull)
     props.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
     List(KafkaConfig.fromProps(props))
   }
 
-  private def socketServer = servers.head.socketServer
+  private def socketServer = brokers.head.socketServer
 
   private def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = {
     new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol)))
@@ -116,8 +117,9 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
     }
   }
 
-  @Test
-  def testProduceRequestWithNullClientId(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testProduceRequestWithNullClientId(quorum: String): Unit = {
     val topic = "topic"
     val topicPartition = new TopicPartition(topic, 0)
     val correlationId = -1
@@ -161,23 +163,27 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
     assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode), "There should be no error")
   }
 
-  @Test
-  def testHeaderOnlyRequest(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testHeaderOnlyRequest(quorum: String): Unit = {
     verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, 1))
   }
 
-  @Test
-  def testInvalidApiKeyRequest(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidApiKeyRequest(quorum: String): Unit = {
     verifyDisconnect(requestHeaderBytes(-1, 0))
   }
 
-  @Test
-  def testInvalidApiVersionRequest(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidApiVersionRequest(quorum: String): Unit = {
     verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, -1))
   }
 
-  @Test
-  def testMalformedHeaderRequest(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMalformedHeaderRequest(quorum: String): Unit = {
     val serializedBytes = {
       // Only send apiKey and apiVersion
       val buffer = ByteBuffer.allocate(
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 96aeac5fa61..70554d9427c 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.BrokerState
-import org.junit.jupiter.api.{BeforeEach, Disabled, Test, TestInfo, Timeout}
+import org.junit.jupiter.api.{BeforeEach, Disabled, TestInfo, Timeout}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.params.ParameterizedTest
@@ -251,9 +251,11 @@ class ServerShutdownTest extends KafkaServerTestHarness {
   }
 
   // Verify that if controller is in the midst of processing a request, shutdown completes
-  // without waiting for request timeout.
-  @Test
-  def testControllerShutdownDuringSend(): Unit = {
+  // without waiting for request timeout. Since this involves LeaderAndIsr request, it is
+  // ZK-only for now.
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testControllerShutdownDuringSend(quorum: String): Unit = {
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val listenerName = ListenerName.forSecurityProtocol(securityProtocol)