You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/01/26 07:39:46 UTC

[kafka] branch trunk updated: MINOR: Add shutdown tests for KRaft (#11606)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new da25add  MINOR: Add shutdown tests for KRaft (#11606)
da25add is described below

commit da25add383b3c855bfaa7cd12eff84539fe27e38
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Wed Jan 26 02:38:11 2022 -0500

    MINOR: Add shutdown tests for KRaft (#11606)
    
    Augments existing shutdown tests for KRaft. Adds the ability to update configs in KRaft tests,
    and in both the ZK and KRaft cases to be able to update configs without losing the server's
    log directory and data.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   5 +-
 .../kafka/server/QuorumTestHarness.scala           |  34 +++--
 .../kafka/integration/KafkaServerTestHarness.scala |  77 ++++++----
 .../unit/kafka/server/ServerShutdownTest.scala     | 164 ++++++++++++---------
 ...chDrivenReplicationProtocolAcceptanceTest.scala |  10 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  10 +-
 6 files changed, 181 insertions(+), 119 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 70c157c..5976db0 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -21,8 +21,7 @@ import java.net.InetAddress
 import java.util
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
-
+import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException}
 import kafka.cluster.Broker.ServerInfo
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
@@ -434,7 +433,7 @@ class BrokerServer(
         maybeChangeStatus(STARTING, STARTED)
         fatal("Fatal error during broker startup. Prepare to shutdown", e)
         shutdown()
-        throw e
+        throw if (e.isInstanceOf[ExecutionException]) e.getCause else e
     }
   }
 
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index ab2cb34..ad710fd 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -44,8 +44,9 @@ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, T
 import scala.collection.{Seq, immutable}
 
 trait QuorumImplementation {
-  def createAndStartBroker(config: KafkaConfig,
-                           time: Time): KafkaBroker
+  def createBroker(config: KafkaConfig,
+                   time: Time,
+                   startup: Boolean): KafkaBroker
 
   def shutdown(): Unit
 }
@@ -54,10 +55,11 @@ class ZooKeeperQuorumImplementation(val zookeeper: EmbeddedZookeeper,
                                     val zkClient: KafkaZkClient,
                                     val adminZkClient: AdminZkClient,
                                     val log: Logging) extends QuorumImplementation {
-  override def createAndStartBroker(config: KafkaConfig,
-                                    time: Time): KafkaBroker = {
+  override def createBroker(config: KafkaConfig,
+                            time: Time,
+                            startup: Boolean): KafkaBroker = {
     val server = new KafkaServer(config, time, None, false)
-    server.startup()
+    if (startup) server.startup()
     server
   }
 
@@ -73,8 +75,9 @@ class KRaftQuorumImplementation(val raftManager: KafkaRaftManager[ApiMessageAndV
                                 val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
                                 val clusterId: String,
                                 val log: Logging) extends QuorumImplementation {
-  override def createAndStartBroker(config: KafkaConfig,
-                                    time: Time): KafkaBroker = {
+  override def createBroker(config: KafkaConfig,
+                            time: Time,
+                            startup: Boolean): KafkaBroker = {
     val broker = new BrokerServer(config = config,
       metaProps = new MetaProperties(clusterId, config.nodeId),
       raftManager = raftManager,
@@ -84,7 +87,7 @@ class KRaftQuorumImplementation(val raftManager: KafkaRaftManager[ApiMessageAndV
       initialOfflineDirs = Seq(),
       controllerQuorumVotersFuture = controllerQuorumVotersFuture,
       supportedFeatures = Collections.emptyMap())
-    broker.startup()
+    if (startup) broker.startup()
     broker
   }
 
@@ -197,14 +200,20 @@ abstract class QuorumTestHarness extends Logging {
     }
   }
 
-  def createAndStartBroker(config: KafkaConfig,
-                           time: Time = Time.SYSTEM): KafkaBroker = {
-    implementation.createAndStartBroker(config,
-      time)
+  def createBroker(config: KafkaConfig,
+                   time: Time = Time.SYSTEM,
+                   startup: Boolean = true): KafkaBroker = {
+    implementation.createBroker(config, time, startup)
   }
 
   def shutdownZooKeeper(): Unit = asZk().shutdown()
 
+  def shutdownKRaftController(): Unit = {
+    // Note that the RaftManager instance is left running; it will be shut down in tearDown()
+    val kRaftQuorumImplementation = asKRaft()
+    CoreUtils.swallow(kRaftQuorumImplementation.controllerServer.shutdown(), kRaftQuorumImplementation.log)
+  }
+
   private def formatDirectories(directories: immutable.Seq[String],
                                 metaProperties: MetaProperties): Unit = {
     val stream = new ByteArrayOutputStream()
@@ -274,7 +283,6 @@ abstract class QuorumTestHarness extends Logging {
       })
       controllerServer.startup()
       raftManager.startup()
-      controllerServer.startup()
     } catch {
       case e: Throwable =>
         CoreUtils.swallow(raftManager.shutdown(), this)
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index acb1201..9f7a77a 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -114,23 +114,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     // default implementation is a no-op, it is overridden by subclasses if required
     configureSecurityBeforeServersStart()
 
-    // Add each broker to `servers` buffer as soon as it is created to ensure that brokers
-    // are shutdown cleanly in tearDown even if a subsequent broker fails to start
-    for (config <- configs) {
-      if (isKRaftTest()) {
-        _brokers += createAndStartBroker(config, brokerTime(config.brokerId))
-      } else {
-        _brokers += TestUtils.createServer(
-          config,
-          time = brokerTime(config.brokerId),
-          threadNamePrefix = None,
-          enableForwarding
-        )
-      }
-    }
-    brokerList = TestUtils.bootstrapServers(_brokers, listenerName)
-    alive = new Array[Boolean](_brokers.length)
-    Arrays.fill(alive, true)
+    createBrokers(startup = true)
+
 
     // default implementation is a no-op, it is overridden by subclasses if required
     configureSecurityAfterServersStart()
@@ -142,6 +127,21 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     super.tearDown()
   }
 
+  def recreateBrokers(reconfigure: Boolean = false, startup: Boolean = false): Unit = {
+    // The test must be allowed to fail and be torn down if an exception is raised here.
+    if (reconfigure) {
+      instanceConfigs = null
+    }
+    if (configs.isEmpty)
+      throw new KafkaException("Must supply at least one server config.")
+
+    TestUtils.shutdownServers(_brokers, deleteLogDirs = false)
+    _brokers.clear()
+    Arrays.fill(alive, false)
+
+    createBrokers(startup)
+  }
+
   /**
    * Create a topic.
    * Wait until the leader is elected and the metadata is propagated to all brokers.
@@ -211,19 +211,16 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     if (reconfigure) {
       instanceConfigs = null
     }
+    if (configs.isEmpty)
+      throw new KafkaException("Must supply at least one server config.")
     for(i <- _brokers.indices if !alive(i)) {
       if (reconfigure) {
-        _brokers(i) = TestUtils.createServer(
-          configs(i),
-          time = brokerTime(configs(i).brokerId),
-          threadNamePrefix = None,
-          enableForwarding
-        )
-      } else {
-        brokers(i).startup()
+        _brokers(i) = createBrokerFromConfig(configs(i))
       }
+      _brokers(i).startup()
       alive(i) = true
     }
+    brokerList = TestUtils.bootstrapServers(_brokers, listenerName)
   }
 
   def waitForUserScramCredentialToAppearOnAllBrokers(clientPrincipal: String, mechanismName: String): Unit = {
@@ -264,4 +261,34 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     getController().kafkaController.controllerContext.topicNames.toMap
   }
 
+  private def createBrokers(startup: Boolean): Unit = {
+    // Add each broker to `brokers` buffer as soon as it is created to ensure that brokers
+    // are shutdown cleanly in tearDown even if a subsequent broker fails to start
+    val potentiallyRegeneratedConfigs = configs
+    alive = new Array[Boolean](potentiallyRegeneratedConfigs.length)
+    Arrays.fill(alive, false)
+    for (config <- potentiallyRegeneratedConfigs) {
+      val broker = createBrokerFromConfig(config)
+      _brokers += broker
+      if (startup) {
+        broker.startup()
+        alive(_brokers.length - 1) = true
+      }
+    }
+    brokerList = if (startup) TestUtils.bootstrapServers(_brokers, listenerName) else null
+  }
+
+  private def createBrokerFromConfig(config: KafkaConfig) = {
+    if (isKRaftTest()) {
+      createBroker(config, brokerTime(config.brokerId), startup = false)
+    } else {
+      TestUtils.createServer(
+        config,
+        time = brokerTime(config.brokerId),
+        threadNamePrefix = None,
+        enableForwarding,
+        startup = false
+      )
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
old mode 100755
new mode 100644
index 013d084..c48e83b
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -16,16 +16,15 @@
  */
 package kafka.server
 
-import kafka.server.QuorumTestHarness
 import kafka.utils.{CoreUtils, TestUtils}
-import kafka.utils.TestUtils._
 
 import java.io.{DataInputStream, File}
 import java.net.ServerSocket
 import java.util.Collections
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.{CancellationException, Executors, TimeUnit}
 import kafka.cluster.Broker
 import kafka.controller.{ControllerChannelManager, ControllerContext, StateChangeLogger}
+import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogManager
 import kafka.zookeeper.ZooKeeperClientTimeoutException
 import org.apache.kafka.clients.consumer.KafkaConsumer
@@ -42,55 +41,76 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.BrokerState
 import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
+import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
 @Timeout(60)
-class ServerShutdownTest extends QuorumTestHarness {
-  var config: KafkaConfig = null
+class ServerShutdownTest extends KafkaServerTestHarness {
   val host = "localhost"
   val topic = "test"
   val sent1 = List("hello", "there")
   val sent2 = List("more", "messages")
+  val propsToChangeUponRestart = new Properties()
+  var priorConfig: Option[KafkaConfig] = None
+
+  override def generateConfigs: Seq[KafkaConfig] = {
+    priorConfig.foreach { config =>
+      // keep the same log directory
+      val originals = config.originals
+      val logDirsValue = originals.get(KafkaConfig.LogDirsProp)
+      if (logDirsValue != null) {
+        propsToChangeUponRestart.put(KafkaConfig.LogDirsProp, logDirsValue)
+      } else {
+        propsToChangeUponRestart.put(KafkaConfig.LogDirProp, originals.get(KafkaConfig.LogDirProp))
+      }
+    }
+    priorConfig = Some(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnectOrNull).head, propsToChangeUponRestart))
+    Seq(priorConfig.get)
+  }
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
+    // be sure to clear local variables before setting up so that anything leftover from a prior test
+    // won;t impact the initial config for the current test
+    priorConfig = None
+    propsToChangeUponRestart.clear()
     super.setUp(testInfo)
-    val props = TestUtils.createBrokerConfig(0, zkConnect)
-    config = KafkaConfig.fromProps(props)
   }
 
-  @Test
-  def testCleanShutdown(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCleanShutdown(quorum: String): Unit = {
 
-    def createProducer(server: KafkaServer): KafkaProducer[Integer, String] =
+    def createProducer(broker: KafkaBroker): KafkaProducer[Integer, String] =
       TestUtils.createProducer(
-        TestUtils.getBrokerListStrFromServers(Seq(server)),
+        TestUtils.getBrokerListStrFromServers(Seq(broker)),
         keySerializer = new IntegerSerializer,
         valueSerializer = new StringSerializer
       )
 
-    def createConsumer(server: KafkaServer): KafkaConsumer[Integer, String] =
+    def createConsumer(broker: KafkaBroker): KafkaConsumer[Integer, String] =
       TestUtils.createConsumer(
-        TestUtils.getBrokerListStrFromServers(Seq(server)),
+        TestUtils.getBrokerListStrFromServers(Seq(broker)),
         securityProtocol = SecurityProtocol.PLAINTEXT,
         keyDeserializer = new IntegerDeserializer,
         valueDeserializer = new StringDeserializer
       )
 
-    var server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
-    server.startup()
-    var producer = createProducer(server)
+    var producer = createProducer(broker)
 
     // create topic
-    createTopic(zkClient, topic, servers = Seq(server))
+    createTopic(topic)
 
     // send some messages
     sent1.map(value => producer.send(new ProducerRecord(topic, 0, value))).foreach(_.get)
 
     // do a clean shutdown and check that offset checkpoint file exists
-    server.shutdown()
+    shutdownBroker()
     for (logDir <- config.logDirs) {
       val OffsetCheckpointFile = new File(logDir, LogManager.RecoveryPointCheckpointFile)
       assertTrue(OffsetCheckpointFile.exists)
@@ -99,14 +119,13 @@ class ServerShutdownTest extends QuorumTestHarness {
     producer.close()
 
     /* now restart the server and check that the written data is still readable and everything still works */
-    server = new KafkaServer(config)
-    server.startup()
+    restartBroker()
 
     // wait for the broker to receive the update metadata request after startup
-    TestUtils.waitForPartitionMetadata(Seq(server), topic, 0)
+    TestUtils.waitForPartitionMetadata(Seq(broker), topic, 0)
 
-    producer = createProducer(server)
-    val consumer = createConsumer(server)
+    producer = createProducer(broker)
+    val consumer = createConsumer(broker)
     consumer.subscribe(Seq(topic).asJava)
 
     val consumerRecords = TestUtils.consumeRecords(consumer, sent1.size)
@@ -120,66 +139,67 @@ class ServerShutdownTest extends QuorumTestHarness {
 
     consumer.close()
     producer.close()
-    server.shutdown()
-    CoreUtils.delete(server.config.logDirs)
-    verifyNonDaemonThreadsStatus()
   }
 
-  @Test
-  def testCleanShutdownAfterFailedStartup(): Unit = {
-    val newProps = TestUtils.createBrokerConfig(0, zkConnect)
-    newProps.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, "50")
-    newProps.setProperty(KafkaConfig.ZkConnectProp, "some.invalid.hostname.foo.bar.local:65535")
-    val newConfig = KafkaConfig.fromProps(newProps)
-    verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException](newConfig)
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCleanShutdownAfterFailedStartup(quorum: String): Unit = {
+    if (quorum == "zk") {
+      propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, "50")
+      propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectProp, "some.invalid.hostname.foo.bar.local:65535")
+      verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException](quorum)
+    } else {
+      propsToChangeUponRestart.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, "1000")
+      shutdownBroker()
+      shutdownKRaftController()
+      verifyCleanShutdownAfterFailedStartup[CancellationException](quorum)
+    }
   }
 
-  @Test
-  def testCleanShutdownAfterFailedStartupDueToCorruptLogs(): Unit = {
-    val server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
-    server.startup()
-    createTopic(zkClient, topic, servers = Seq(server))
-    server.shutdown()
-    server.awaitShutdown()
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = {
+    createTopic(topic)
+    shutdownBroker()
     config.logDirs.foreach { dirName =>
       val partitionDir = new File(dirName, s"$topic-0")
       partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1))
     }
-    verifyCleanShutdownAfterFailedStartup[KafkaStorageException](config)
+    verifyCleanShutdownAfterFailedStartup[KafkaStorageException](quorum)
   }
 
-  @Test
-  def testCleanShutdownWithZkUnavailable(): Unit = {
-    val server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
-    server.startup()
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
+  def testCleanShutdownWithZkUnavailable(quorum: String): Unit = {
     shutdownZooKeeper()
-    server.shutdown()
-    server.awaitShutdown()
-    CoreUtils.delete(server.config.logDirs)
+    shutdownBroker()
+    CoreUtils.delete(broker.config.logDirs)
+    verifyNonDaemonThreadsStatus()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testCleanShutdownWithKRaftControllerUnavailable(quorum: String): Unit = {
+    shutdownKRaftController()
+    shutdownBroker()
+    CoreUtils.delete(broker.config.logDirs)
     verifyNonDaemonThreadsStatus()
   }
 
-  private def verifyCleanShutdownAfterFailedStartup[E <: Exception](config: KafkaConfig)(implicit exceptionClassTag: ClassTag[E]): Unit = {
-    val server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
+  private def verifyCleanShutdownAfterFailedStartup[E <: Exception](quorum: String)(implicit exceptionClassTag: ClassTag[E]): Unit = {
     try {
-      server.startup()
+      recreateBroker(startup = true)
       fail("Expected KafkaServer setup to fail and throw exception")
-    }
-    catch {
+    } catch {
       // Try to clean up carefully without hanging even if the test fails. This means trying to accurately
       // identify the correct exception, making sure the server was shutdown, and cleaning up if anything
       // goes wrong so that awaitShutdown doesn't hang
       case e: Exception =>
         assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e")
-        assertEquals(BrokerState.NOT_RUNNING, server.brokerState)
-    }
-    finally {
-      if (server.brokerState != BrokerState.NOT_RUNNING)
-        server.shutdown()
-      server.awaitShutdown()
+        assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else BrokerState.SHUTTING_DOWN, brokers.head.brokerState)
+    } finally {
+      shutdownBroker()
     }
-    CoreUtils.delete(server.config.logDirs)
-    verifyNonDaemonThreadsStatus()
   }
 
   private[this] def isNonDaemonKafkaThread(t: Thread): Boolean = {
@@ -192,13 +212,11 @@ class ServerShutdownTest extends QuorumTestHarness {
       .count(isNonDaemonKafkaThread))
   }
 
-  @Test
-  def testConsecutiveShutdown(): Unit = {
-    val server = new KafkaServer(config)
-    server.startup()
-    server.shutdown()
-    server.awaitShutdown()
-    server.shutdown()
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testConsecutiveShutdown(quorum: String): Unit = {
+    shutdownBroker()
+    brokers.head.shutdown()
   }
 
   // Verify that if controller is in the midst of processing a request, shutdown completes
@@ -255,4 +273,14 @@ class ServerShutdownTest extends QuorumTestHarness {
       metrics.close()
     }
   }
+
+  private def config: KafkaConfig = configs.head
+  private def broker: KafkaBroker = brokers.head
+  private def shutdownBroker(): Unit = killBroker(0) // idempotent
+  private def restartBroker(): Unit = {
+    shutdownBroker()
+    restartDeadBrokers(reconfigure = !propsToChangeUponRestart.isEmpty)
+  }
+  private def recreateBroker(startup: Boolean): Unit =
+    recreateBrokers(reconfigure = !propsToChangeUponRestart.isEmpty, startup = startup)
 }
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 0a884f1..a6d4019 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -74,7 +74,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
   def shouldFollowLeaderEpochBasicWorkflow(): Unit = {
 
     //Given 2 brokers
-    brokers = (100 to 101).map(createBroker(_))
+    brokers = (100 to 101).map(createBrokerForId(_))
 
     //A single partition topic with 2 replicas
     TestUtils.createTopic(zkClient, topic, Map(0 -> Seq(100, 101)), brokers)
@@ -183,7 +183,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
   def offsetsShouldNotGoBackwards(): Unit = {
 
     //Given two brokers
-    brokers = (100 to 101).map(createBroker(_))
+    brokers = (100 to 101).map(createBrokerForId(_))
 
     //A single partition topic with 2 replicas
     TestUtils.createTopic(zkClient, topic, Map(0 -> Seq(100, 101)), brokers)
@@ -258,7 +258,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
     val tp = new TopicPartition(topic, 0)
 
     //Given 2 brokers
-    brokers = (100 to 101).map(createBroker(_))
+    brokers = (100 to 101).map(createBrokerForId(_))
 
     //A single partition topic with 2 replicas
     TestUtils.createTopic(zkClient, topic, Map(0 -> Seq(100, 101)), brokers)
@@ -298,7 +298,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
   def logsShouldNotDivergeOnUncleanLeaderElections(): Unit = {
 
     // Given two brokers, unclean leader election is enabled
-    brokers = (100 to 101).map(createBroker(_, enableUncleanLeaderElection = true))
+    brokers = (100 to 101).map(createBrokerForId(_, enableUncleanLeaderElection = true))
 
     // A single partition topic with 2 replicas, min.isr = 1
     TestUtils.createTopic(zkClient, topic, Map(0 -> Seq(100, 101)), brokers,
@@ -463,7 +463,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
     brokers.filter(_.config.brokerId != leader).head
   }
 
-  private def createBroker(id: Int, enableUncleanLeaderElection: Boolean = false): KafkaServer = {
+  private def createBrokerForId(id: Int, enableUncleanLeaderElection: Boolean = false): KafkaServer = {
     val config = createBrokerConfig(id, zkConnect)
     TestUtils.setIbpAndMessageFormatVersions(config, apiVersion)
     config.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, enableUncleanLeaderElection.toString)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f91d80c..7bbb6e8 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -170,12 +170,12 @@ object TestUtils extends Logging {
   }
 
   def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String]): KafkaServer = {
-    createServer(config, time, threadNamePrefix, enableForwarding = false)
+    createServer(config, time, threadNamePrefix, enableForwarding = false, startup = true)
   }
 
-  def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], enableForwarding: Boolean): KafkaServer = {
+  def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], enableForwarding: Boolean, startup: Boolean): KafkaServer = {
     val server = new KafkaServer(config, time, threadNamePrefix, enableForwarding)
-    server.startup()
+    if (startup) server.startup()
     server
   }
 
@@ -241,12 +241,12 @@ object TestUtils extends Logging {
   /**
     * Shutdown `servers` and delete their log directories.
     */
-  def shutdownServers[B <: KafkaBroker](brokers: Seq[B]): Unit = {
+  def shutdownServers[B <: KafkaBroker](brokers: Seq[B], deleteLogDirs: Boolean = true): Unit = {
     import ExecutionContext.Implicits._
     val future = Future.traverse(brokers) { s =>
       Future {
         s.shutdown()
-        CoreUtils.delete(s.config.logDirs)
+        if (deleteLogDirs) CoreUtils.delete(s.config.logDirs)
       }
     }
     Await.result(future, FiniteDuration(5, TimeUnit.MINUTES))